rusty_feeder/exchange/bybit/futures/
feeder.rs1use rust_decimal::Decimal;
8use rusty_common::collections::FxHashMap;
9use std::fmt::Debug;
10use std::sync::Arc;
11
12use anyhow::{Result, anyhow};
13use async_trait::async_trait;
14use log::{debug, error, warn};
15use parking_lot::RwLock;
16use quanta::Clock;
17use rusty_model::{
18 data::{
19 book_snapshot::OrderBookSnapshot,
20 market_trade::MarketTrade,
21 orderbook::{OrderBook, PriceLevel},
22 simd_orderbook::{SharedSimdOrderBook, SimdOrderBook},
23 },
24 enums::OrderSide,
25 instruments::InstrumentId,
26 venues::Venue,
27};
28use smallvec::SmallVec;
29use tokio::sync::mpsc;
30
31use crate::feeder::{FeedStats, Feeder, FeederOptions};
32use crate::provider::prelude::*;
33use rusty_model::data::bar::BarType;
34
35use super::data::{MarketTradeResponse, orderbook::OrderbookResponse};
36
37#[derive(Debug)]
39pub struct BybitFuturesFeeder {
40 config: ConnectionConfig,
42
43 clock: Clock,
45
46 stats: Arc<RwLock<FxHashMap<InstrumentId, FeedStats>>>,
48
49 orderbooks: Arc<RwLock<FxHashMap<InstrumentId, SharedSimdOrderBook>>>,
51
52 active_trade_feeds: Arc<RwLock<FxHashMap<InstrumentId, mpsc::Sender<MarketTrade>>>>,
54
55 active_depth_feeds: Arc<RwLock<FxHashMap<InstrumentId, mpsc::Sender<OrderBookSnapshot>>>>,
57}
58
59impl Default for BybitFuturesFeeder {
60 fn default() -> Self {
61 Self::new()
62 }
63}
64
65impl BybitFuturesFeeder {
66 #[must_use]
68 pub fn new() -> Self {
69 Self::with_config(ConnectionConfig::default())
70 }
71
72 #[must_use]
74 pub fn with_config(config: ConnectionConfig) -> Self {
75 Self {
76 config,
77 clock: Clock::new(),
78 stats: Arc::new(RwLock::new(FxHashMap::default())),
79 orderbooks: Arc::new(RwLock::new(FxHashMap::default())),
80 active_trade_feeds: Arc::new(RwLock::new(FxHashMap::default())),
81 active_depth_feeds: Arc::new(RwLock::new(FxHashMap::default())),
82 }
83 }
84
85 #[inline]
87 fn convert_trade(&self, trade_msg: &MarketTradeResponse) -> Option<MarketTrade> {
88 let symbol = trade_msg.symbol()?;
89 let price = trade_msg.price_decimal()?;
90 let quantity = trade_msg.size_decimal()?;
91 let is_buy = trade_msg.is_buy()?;
92
93 let direction = if is_buy {
94 OrderSide::Buy
95 } else {
96 OrderSide::Sell
97 };
98
99 let instrument_id = InstrumentId::new(symbol, Venue::Bybit);
100
101 Some(MarketTrade {
103 timestamp: self.clock.now(), exchange_time_ns: trade_msg.timestamp_ns(),
105 price,
106 quantity,
107 direction,
108 instrument_id,
109 })
110 }
111
112 #[inline]
114 fn convert_orderbook(&self, ob_msg: &OrderbookResponse) -> Option<OrderBookSnapshot> {
115 let symbol = ob_msg.symbol();
116 let parsed = ob_msg.parse();
117
118 let instrument_id = InstrumentId::new(symbol, Venue::Bybit);
119
120 let mut depth = OrderBookSnapshot::new(
122 instrument_id,
123 SmallVec::<[PriceLevel; 64]>::new(), SmallVec::<[PriceLevel; 64]>::new(), ob_msg.update_id(), self.clock.raw(), self.clock.raw(), );
129
130 for (price, quantity) in &parsed.bids {
132 let price_decimal = Decimal::try_from(*price).unwrap_or(Decimal::ZERO);
134 let quantity_decimal = Decimal::try_from(*quantity).unwrap_or(Decimal::ZERO);
135 depth.add_bid(price_decimal, quantity_decimal);
136 }
137
138 for (price, quantity) in &parsed.asks {
139 let price_decimal = Decimal::try_from(*price).unwrap_or(Decimal::ZERO);
141 let quantity_decimal = Decimal::try_from(*quantity).unwrap_or(Decimal::ZERO);
142 depth.add_ask(price_decimal, quantity_decimal);
143 }
144
145 Some(depth)
146 }
147}
148
149#[async_trait]
150impl Feeder for BybitFuturesFeeder {
151 type DepthMessage = OrderbookResponse;
152 type TradeMessage = MarketTradeResponse;
153
154 async fn start_feed_depth(
157 &self,
158 instrument_id: InstrumentId,
159 depth_rx: mpsc::Receiver<Self::DepthMessage>,
160 options: Option<FeederOptions>,
161 ) -> Result<mpsc::Receiver<OrderBookSnapshot>> {
162 let options = options.unwrap_or_default();
163
164 let (tx, rx) = mpsc::channel(options.channel_buffer_size);
166
167 {
169 let mut stats_map = self.stats.write();
170 stats_map.entry(instrument_id.clone()).or_default();
171 }
172
173 self.active_depth_feeds
175 .write()
176 .insert(instrument_id.clone(), tx.clone());
177
178 {
180 let mut ob_map = self.orderbooks.write();
181 if !ob_map.contains_key(&instrument_id) {
182 let ob = OrderBook::<64>::new(
183 instrument_id.symbol.clone(),
184 0, self.clock.raw(), SmallVec::<[PriceLevel; 64]>::new(), SmallVec::<[PriceLevel; 64]>::new(), );
189 let orderbook = SharedSimdOrderBook::from_orderbook(&ob);
190 ob_map.insert(instrument_id.clone(), orderbook);
191 }
192 }
193
194 let stats = self.stats.clone();
196 let orderbooks = self.orderbooks.clone();
197 let max_depth = options.max_depth_levels;
198 let inst_id = instrument_id.clone();
199 let clock = self.clock.clone();
200
201 tokio::spawn(async move {
203 let mut depth_rx = depth_rx;
204
205 while let Some(msg) = depth_rx.recv().await {
206 let start_time = clock.raw();
209
210 match BybitFuturesFeeder::convert_depth_message(&msg, &inst_id, max_depth, &clock) {
212 Ok(depth) => {
213 if let Some(shared_ob) = orderbooks.read().get(&inst_id) {
215 let bid_tuples: SmallVec<[(Decimal, Decimal); 64]> = depth
217 .bids
218 .iter()
219 .map(|lvl| (lvl.price, lvl.quantity))
220 .collect();
221 let ask_tuples: SmallVec<[(Decimal, Decimal); 64]> = depth
222 .asks
223 .iter()
224 .map(|lvl| (lvl.price, lvl.quantity))
225 .collect();
226 let bids_vec: SmallVec<[PriceLevel; 64]> = bid_tuples
227 .iter()
228 .map(|&(price, quantity)| PriceLevel::new(price, quantity))
229 .collect();
230 let asks_vec: SmallVec<[PriceLevel; 64]> = ask_tuples
231 .iter()
232 .map(|&(price, quantity)| PriceLevel::new(price, quantity))
233 .collect();
234
235 let ob = rusty_model::data::orderbook::OrderBook::<64>::new(
236 inst_id.symbol.clone(),
237 depth.timestamp_event,
238 clock.raw(),
239 bids_vec,
240 asks_vec,
241 );
242 shared_ob.write(|orderbook| {
243 *orderbook = SimdOrderBook::from_orderbook(&ob);
244 });
245 }
246
247 if let Err(e) = tx.send(depth).await {
249 error!("Failed to forward depth for {inst_id}: {e}");
250 break;
251 }
252
253 let latency = clock.raw().saturating_sub(start_time);
255 if let Some(stats) = stats.write().get_mut(&inst_id) {
256 stats.increment_processed();
257 stats.add_latency_sample(latency);
258 }
259 }
260 Err(e) => {
261 warn!("Failed to convert depth for {inst_id}: {e}");
262 if let Some(stats) = stats.write().get_mut(&inst_id) {
263 stats.increment_dropped();
264 }
265 }
266 }
267 }
268
269 debug!("Depth feed stopped for {inst_id}");
270 });
271
272 Ok(rx)
273 }
274
275 async fn stop_feed_depth(&self, instrument_id: &InstrumentId) -> Result<()> {
276 self.active_depth_feeds.write().remove(instrument_id);
278 Ok(())
279 }
280
281 async fn start_feed_trades(
282 &self,
283 instrument_id: InstrumentId,
284 trade_rx: mpsc::Receiver<Self::TradeMessage>,
285 options: Option<FeederOptions>,
286 ) -> Result<mpsc::Receiver<MarketTrade>> {
287 let options = options.unwrap_or_default();
288
289 let (tx, rx) = mpsc::channel(options.channel_buffer_size);
291
292 {
294 let mut stats_map = self.stats.write();
295 stats_map.entry(instrument_id.clone()).or_default();
296 }
297
298 self.active_trade_feeds
300 .write()
301 .insert(instrument_id.clone(), tx.clone());
302
303 let stats = self.stats.clone();
305 let inst_id = instrument_id.clone();
306 let clock = self.clock.clone();
307
308 tokio::spawn(async move {
310 let mut trade_rx = trade_rx;
311
312 while let Some(msg) = trade_rx.recv().await {
313 let start_time = clock.raw();
315
316 for trade_data in &msg.data {
317 if let Ok(trade) = BybitFuturesFeeder::convert_trade_message(
319 trade_data, &msg, &inst_id, &clock,
320 ) {
321 if let Err(e) = tx.send(trade).await {
323 error!("Failed to forward trade for {inst_id}: {e}");
324 break;
325 }
326
327 let latency = clock.raw().saturating_sub(start_time);
329 if let Some(stats) = stats.write().get_mut(&inst_id) {
330 stats.increment_processed();
331 stats.add_latency_sample(latency);
332 }
333 } else {
334 if let Some(stats) = stats.write().get_mut(&inst_id) {
336 stats.increment_dropped();
337 }
338 }
339 }
340 }
341
342 debug!("Trade feed stopped for {inst_id}");
343 });
344
345 Ok(rx)
346 }
347
348 async fn stop_feed_trades(&self, instrument_id: &InstrumentId) -> Result<()> {
349 self.active_trade_feeds.write().remove(instrument_id);
351 Ok(())
352 }
353
354 async fn start_feed_bars(
355 &self,
356 _instrument_id: InstrumentId,
357 _bar_type: BarType,
358 _trade_rx: mpsc::Receiver<MarketTrade>,
359 _options: Option<FeederOptions>,
360 ) -> Result<mpsc::Receiver<rusty_model::data::bar::Bar>> {
361 Err(anyhow!("Bar aggregation not implemented for Bybit Futures"))
363 }
364
365 async fn stop_feed_bars(
366 &self,
367 _instrument_id: &InstrumentId,
368 _bar_type: &BarType,
369 ) -> Result<()> {
370 Ok(())
372 }
373
374 async fn get_shared_orderbook(
375 &self,
376 instrument_id: &InstrumentId,
377 ) -> Result<SharedSimdOrderBook> {
378 let orderbooks = self.orderbooks.read();
380 if let Some(ob) = orderbooks.get(instrument_id) {
381 return Ok(ob.clone());
382 }
383
384 drop(orderbooks);
386 let ob = OrderBook::<64>::new(
387 instrument_id.symbol.clone(),
388 0, self.clock.raw(), SmallVec::<[PriceLevel; 64]>::new(), SmallVec::<[PriceLevel; 64]>::new(), );
393 let orderbook = SharedSimdOrderBook::from_orderbook(&ob);
394 self.orderbooks
395 .write()
396 .insert(instrument_id.clone(), orderbook.clone());
397
398 Ok(orderbook)
399 }
400
401 async fn get_bar_cache(
402 &self,
403 _instrument_id: &InstrumentId,
404 _bar_type: &BarType,
405 _max_bars: usize,
406 ) -> Result<Arc<RwLock<rusty_model::data::bar::BarCache>>> {
407 Err(anyhow!("Bar caching not implemented for Bybit Futures"))
409 }
410
411 async fn get_stats(&self, instrument_id: &InstrumentId) -> Result<FeedStats> {
412 let stats = self.stats.read();
414 if let Some(feed_stats) = stats.get(instrument_id) {
415 return Ok(feed_stats.clone());
416 }
417
418 Ok(FeedStats::default())
420 }
421
422 async fn reset_stats(&self) -> Result<()> {
423 let mut stats = self.stats.write();
425 for (_, feed_stats) in stats.iter_mut() {
426 *feed_stats = FeedStats::default();
427 }
428
429 Ok(())
430 }
431}
432
433impl BybitFuturesFeeder {
434 #[inline]
436 pub fn convert_trade_message(
437 trade_data: &super::data::trade::TradeData,
438 _msg: &MarketTradeResponse,
439 instrument_id: &InstrumentId,
440 clock: &Clock,
441 ) -> Result<MarketTrade> {
442 let price = trade_data.price.parse::<f64>()?;
444 let quantity = trade_data.size.parse::<f64>()?;
445
446 let direction = match trade_data.side.to_lowercase().as_str() {
448 "buy" => OrderSide::Buy,
449 "sell" => OrderSide::Sell,
450 _ => return Err(anyhow!("Invalid trade side: {}", trade_data.side)),
451 };
452
453 let exchange_time = trade_data.timestamp.parse::<u64>()? * 1_000_000; let trade = MarketTrade {
458 timestamp: clock.now(),
459 exchange_time_ns: exchange_time,
460 price: Decimal::try_from(price).unwrap_or(Decimal::ZERO),
461 quantity: Decimal::try_from(quantity).unwrap_or(Decimal::ZERO),
462 direction,
463 instrument_id: instrument_id.clone(),
464 };
465
466 Ok(trade)
467 }
468
469 #[inline]
471 pub fn convert_depth_message(
472 msg: &OrderbookResponse,
473 instrument_id: &InstrumentId,
474 max_depth: usize,
475 clock: &Clock,
476 ) -> Result<OrderBookSnapshot> {
477 let parsed = msg.parse();
479
480 let mut depth = OrderBookSnapshot::new(
482 instrument_id.clone(),
483 SmallVec::<[PriceLevel; 64]>::new(), SmallVec::<[PriceLevel; 64]>::new(), parsed.update_id, clock.raw(), clock.raw(), );
489
490 for (i, (price, quantity)) in parsed.bids.iter().enumerate() {
492 if i >= max_depth {
493 break;
494 }
495 let price_decimal = Decimal::try_from(*price).unwrap_or(Decimal::ZERO);
497 let quantity_decimal = Decimal::try_from(*quantity).unwrap_or(Decimal::ZERO);
498 depth.add_bid(price_decimal, quantity_decimal);
499 }
500
501 for (i, (price, quantity)) in parsed.asks.iter().enumerate() {
503 if i >= max_depth {
504 break;
505 }
506 let price_decimal = Decimal::try_from(*price).unwrap_or(Decimal::ZERO);
508 let quantity_decimal = Decimal::try_from(*quantity).unwrap_or(Decimal::ZERO);
509 depth.add_ask(price_decimal, quantity_decimal);
510 }
511
512 Ok(depth)
513 }
514}