rusty_feeder/exchange/bybit/futures/
feeder.rs

1//! Bybit Futures feeder implementation
2//!
3//! This module implements the Feeder trait for Bybit Futures markets,
4//! providing high-performance market data processing and normalization
5//! with zero heap allocations in critical paths.
6
7use rust_decimal::Decimal;
8use rusty_common::collections::FxHashMap;
9use std::fmt::Debug;
10use std::sync::Arc;
11
12use anyhow::{Result, anyhow};
13use async_trait::async_trait;
14use log::{debug, error, warn};
15use parking_lot::RwLock;
16use quanta::Clock;
17use rusty_model::{
18    data::{
19        book_snapshot::OrderBookSnapshot,
20        market_trade::MarketTrade,
21        orderbook::{OrderBook, PriceLevel},
22        simd_orderbook::{SharedSimdOrderBook, SimdOrderBook},
23    },
24    enums::OrderSide,
25    instruments::InstrumentId,
26    venues::Venue,
27};
28use smallvec::SmallVec;
29use tokio::sync::mpsc;
30
31use crate::feeder::{FeedStats, Feeder, FeederOptions};
32use crate::provider::prelude::*;
33use rusty_model::data::bar::BarType;
34
35use super::data::{MarketTradeResponse, orderbook::OrderbookResponse};
36
37/// Bybit Futures feeder implementation
38#[derive(Debug)]
39pub struct BybitFuturesFeeder {
40    /// Configuration
41    config: ConnectionConfig,
42
43    /// Shared high-precision clock
44    clock: Clock,
45
46    /// Feed statistics per instrument
47    stats: Arc<RwLock<FxHashMap<InstrumentId, FeedStats>>>,
48
49    /// Shared orderbooks per instrument
50    orderbooks: Arc<RwLock<FxHashMap<InstrumentId, SharedSimdOrderBook>>>,
51
52    /// Active trade feeds
53    active_trade_feeds: Arc<RwLock<FxHashMap<InstrumentId, mpsc::Sender<MarketTrade>>>>,
54
55    /// Active depth feeds
56    active_depth_feeds: Arc<RwLock<FxHashMap<InstrumentId, mpsc::Sender<OrderBookSnapshot>>>>,
57}
58
59impl Default for BybitFuturesFeeder {
60    fn default() -> Self {
61        Self::new()
62    }
63}
64
65impl BybitFuturesFeeder {
66    /// Create a new Bybit Futures feeder with default configuration
67    #[must_use]
68    pub fn new() -> Self {
69        Self::with_config(ConnectionConfig::default())
70    }
71
72    /// Create a new Bybit Futures feeder with custom configuration
73    #[must_use]
74    pub fn with_config(config: ConnectionConfig) -> Self {
75        Self {
76            config,
77            clock: Clock::new(),
78            stats: Arc::new(RwLock::new(FxHashMap::default())),
79            orderbooks: Arc::new(RwLock::new(FxHashMap::default())),
80            active_trade_feeds: Arc::new(RwLock::new(FxHashMap::default())),
81            active_depth_feeds: Arc::new(RwLock::new(FxHashMap::default())),
82        }
83    }
84
85    /// Convert Bybit Futures trade message to normalized MarketTrade
86    #[inline]
87    fn convert_trade(&self, trade_msg: &MarketTradeResponse) -> Option<MarketTrade> {
88        let symbol = trade_msg.symbol()?;
89        let price = trade_msg.price_decimal()?;
90        let quantity = trade_msg.size_decimal()?;
91        let is_buy = trade_msg.is_buy()?;
92
93        let direction = if is_buy {
94            OrderSide::Buy
95        } else {
96            OrderSide::Sell
97        };
98
99        let instrument_id = InstrumentId::new(symbol, Venue::Bybit);
100
101        // Create a normalized trade with nanosecond precision timestamps
102        Some(MarketTrade {
103            timestamp: self.clock.now(), // Current time as quanta::Instant
104            exchange_time_ns: trade_msg.timestamp_ns(),
105            price,
106            quantity,
107            direction,
108            instrument_id,
109        })
110    }
111
112    /// Convert Bybit Futures orderbook message to normalized OrderBookSnapshot
113    #[inline]
114    fn convert_orderbook(&self, ob_msg: &OrderbookResponse) -> Option<OrderBookSnapshot> {
115        let symbol = ob_msg.symbol();
116        let parsed = ob_msg.parse();
117
118        let instrument_id = InstrumentId::new(symbol, Venue::Bybit);
119
120        // Create a new OrderBookSnapshot with nanosecond precision timestamp
121        let mut depth = OrderBookSnapshot::new(
122            instrument_id,
123            SmallVec::<[PriceLevel; 64]>::new(), // Empty bids to be filled
124            SmallVec::<[PriceLevel; 64]>::new(), // Empty asks to be filled
125            ob_msg.update_id(),                  // Sequence ID
126            self.clock.raw(),                    // Event timestamp
127            self.clock.raw(),                    // Init timestamp
128        );
129
130        // Add bids and asks
131        for (price, quantity) in &parsed.bids {
132            // Convert f64 to Decimal
133            let price_decimal = Decimal::try_from(*price).unwrap_or(Decimal::ZERO);
134            let quantity_decimal = Decimal::try_from(*quantity).unwrap_or(Decimal::ZERO);
135            depth.add_bid(price_decimal, quantity_decimal);
136        }
137
138        for (price, quantity) in &parsed.asks {
139            // Convert f64 to Decimal
140            let price_decimal = Decimal::try_from(*price).unwrap_or(Decimal::ZERO);
141            let quantity_decimal = Decimal::try_from(*quantity).unwrap_or(Decimal::ZERO);
142            depth.add_ask(price_decimal, quantity_decimal);
143        }
144
145        Some(depth)
146    }
147}
148
149#[async_trait]
150impl Feeder for BybitFuturesFeeder {
151    type DepthMessage = OrderbookResponse;
152    type TradeMessage = MarketTradeResponse;
153
154    // Removed methods that are not part of the Feeder trait
155
156    async fn start_feed_depth(
157        &self,
158        instrument_id: InstrumentId,
159        depth_rx: mpsc::Receiver<Self::DepthMessage>,
160        options: Option<FeederOptions>,
161    ) -> Result<mpsc::Receiver<OrderBookSnapshot>> {
162        let options = options.unwrap_or_default();
163
164        // Create channel for normalized depth messages
165        let (tx, rx) = mpsc::channel(options.channel_buffer_size);
166
167        // Initialize stats for this instrument
168        {
169            let mut stats_map = self.stats.write();
170            stats_map.entry(instrument_id.clone()).or_default();
171        }
172
173        // Store the sender for cleanup
174        self.active_depth_feeds
175            .write()
176            .insert(instrument_id.clone(), tx.clone());
177
178        // Create shared orderbook if needed
179        {
180            let mut ob_map = self.orderbooks.write();
181            if !ob_map.contains_key(&instrument_id) {
182                let ob = OrderBook::<64>::new(
183                    instrument_id.symbol.clone(),
184                    0,                // exchange_timestamp_ns - will be updated on first message
185                    self.clock.raw(), // system_timestamp_ns
186                    SmallVec::<[PriceLevel; 64]>::new(), // empty bids
187                    SmallVec::<[PriceLevel; 64]>::new(), // empty asks
188                );
189                let orderbook = SharedSimdOrderBook::from_orderbook(&ob);
190                ob_map.insert(instrument_id.clone(), orderbook);
191            }
192        }
193
194        // Clone resources for the task
195        let stats = self.stats.clone();
196        let orderbooks = self.orderbooks.clone();
197        let max_depth = options.max_depth_levels;
198        let inst_id = instrument_id.clone();
199        let clock = self.clock.clone();
200
201        // Spawn task to process depth messages
202        tokio::spawn(async move {
203            let mut depth_rx = depth_rx;
204
205            while let Some(msg) = depth_rx.recv().await {
206                // Convert to normalized format
207                // Use batch processing if enabled
208                let start_time = clock.raw();
209
210                // Process this depth message
211                match BybitFuturesFeeder::convert_depth_message(&msg, &inst_id, max_depth, &clock) {
212                    Ok(depth) => {
213                        // Update shared orderbook if available
214                        if let Some(shared_ob) = orderbooks.read().get(&inst_id) {
215                            // Convert depth bids and asks to tuples for model OrderBook and update shared orderbook
216                            let bid_tuples: SmallVec<[(Decimal, Decimal); 64]> = depth
217                                .bids
218                                .iter()
219                                .map(|lvl| (lvl.price, lvl.quantity))
220                                .collect();
221                            let ask_tuples: SmallVec<[(Decimal, Decimal); 64]> = depth
222                                .asks
223                                .iter()
224                                .map(|lvl| (lvl.price, lvl.quantity))
225                                .collect();
226                            let bids_vec: SmallVec<[PriceLevel; 64]> = bid_tuples
227                                .iter()
228                                .map(|&(price, quantity)| PriceLevel::new(price, quantity))
229                                .collect();
230                            let asks_vec: SmallVec<[PriceLevel; 64]> = ask_tuples
231                                .iter()
232                                .map(|&(price, quantity)| PriceLevel::new(price, quantity))
233                                .collect();
234
235                            let ob = rusty_model::data::orderbook::OrderBook::<64>::new(
236                                inst_id.symbol.clone(),
237                                depth.timestamp_event,
238                                clock.raw(),
239                                bids_vec,
240                                asks_vec,
241                            );
242                            shared_ob.write(|orderbook| {
243                                *orderbook = SimdOrderBook::from_orderbook(&ob);
244                            });
245                        }
246
247                        // Forward the message
248                        if let Err(e) = tx.send(depth).await {
249                            error!("Failed to forward depth for {inst_id}: {e}");
250                            break;
251                        }
252
253                        // Update stats
254                        let latency = clock.raw().saturating_sub(start_time);
255                        if let Some(stats) = stats.write().get_mut(&inst_id) {
256                            stats.increment_processed();
257                            stats.add_latency_sample(latency);
258                        }
259                    }
260                    Err(e) => {
261                        warn!("Failed to convert depth for {inst_id}: {e}");
262                        if let Some(stats) = stats.write().get_mut(&inst_id) {
263                            stats.increment_dropped();
264                        }
265                    }
266                }
267            }
268
269            debug!("Depth feed stopped for {inst_id}");
270        });
271
272        Ok(rx)
273    }
274
275    async fn stop_feed_depth(&self, instrument_id: &InstrumentId) -> Result<()> {
276        // Remove from active feeds
277        self.active_depth_feeds.write().remove(instrument_id);
278        Ok(())
279    }
280
281    async fn start_feed_trades(
282        &self,
283        instrument_id: InstrumentId,
284        trade_rx: mpsc::Receiver<Self::TradeMessage>,
285        options: Option<FeederOptions>,
286    ) -> Result<mpsc::Receiver<MarketTrade>> {
287        let options = options.unwrap_or_default();
288
289        // Create channel for normalized trade messages
290        let (tx, rx) = mpsc::channel(options.channel_buffer_size);
291
292        // Initialize stats for this instrument
293        {
294            let mut stats_map = self.stats.write();
295            stats_map.entry(instrument_id.clone()).or_default();
296        }
297
298        // Store the sender for cleanup
299        self.active_trade_feeds
300            .write()
301            .insert(instrument_id.clone(), tx.clone());
302
303        // Clone resources for the task
304        let stats = self.stats.clone();
305        let inst_id = instrument_id.clone();
306        let clock = self.clock.clone();
307
308        // Spawn task to process trade messages
309        tokio::spawn(async move {
310            let mut trade_rx = trade_rx;
311
312            while let Some(msg) = trade_rx.recv().await {
313                // Convert to normalized format
314                let start_time = clock.raw();
315
316                for trade_data in &msg.data {
317                    // Create a normalized Trade
318                    if let Ok(trade) = BybitFuturesFeeder::convert_trade_message(
319                        trade_data, &msg, &inst_id, &clock,
320                    ) {
321                        // Forward the trade
322                        if let Err(e) = tx.send(trade).await {
323                            error!("Failed to forward trade for {inst_id}: {e}");
324                            break;
325                        }
326
327                        // Update stats
328                        let latency = clock.raw().saturating_sub(start_time);
329                        if let Some(stats) = stats.write().get_mut(&inst_id) {
330                            stats.increment_processed();
331                            stats.add_latency_sample(latency);
332                        }
333                    } else {
334                        // Failed to convert
335                        if let Some(stats) = stats.write().get_mut(&inst_id) {
336                            stats.increment_dropped();
337                        }
338                    }
339                }
340            }
341
342            debug!("Trade feed stopped for {inst_id}");
343        });
344
345        Ok(rx)
346    }
347
348    async fn stop_feed_trades(&self, instrument_id: &InstrumentId) -> Result<()> {
349        // Remove from active feeds
350        self.active_trade_feeds.write().remove(instrument_id);
351        Ok(())
352    }
353
354    async fn start_feed_bars(
355        &self,
356        _instrument_id: InstrumentId,
357        _bar_type: BarType,
358        _trade_rx: mpsc::Receiver<MarketTrade>,
359        _options: Option<FeederOptions>,
360    ) -> Result<mpsc::Receiver<rusty_model::data::bar::Bar>> {
361        // Not implemented for this example
362        Err(anyhow!("Bar aggregation not implemented for Bybit Futures"))
363    }
364
365    async fn stop_feed_bars(
366        &self,
367        _instrument_id: &InstrumentId,
368        _bar_type: &BarType,
369    ) -> Result<()> {
370        // Not implemented for this example
371        Ok(())
372    }
373
374    async fn get_shared_orderbook(
375        &self,
376        instrument_id: &InstrumentId,
377    ) -> Result<SharedSimdOrderBook> {
378        // Get shared orderbook if it exists
379        let orderbooks = self.orderbooks.read();
380        if let Some(ob) = orderbooks.get(instrument_id) {
381            return Ok(ob.clone());
382        }
383
384        // Otherwise create a new one
385        drop(orderbooks);
386        let ob = OrderBook::<64>::new(
387            instrument_id.symbol.clone(),
388            0,                // exchange_timestamp_ns - will be updated on first message
389            self.clock.raw(), // system_timestamp_ns
390            SmallVec::<[PriceLevel; 64]>::new(), // empty bids
391            SmallVec::<[PriceLevel; 64]>::new(), // empty asks
392        );
393        let orderbook = SharedSimdOrderBook::from_orderbook(&ob);
394        self.orderbooks
395            .write()
396            .insert(instrument_id.clone(), orderbook.clone());
397
398        Ok(orderbook)
399    }
400
401    async fn get_bar_cache(
402        &self,
403        _instrument_id: &InstrumentId,
404        _bar_type: &BarType,
405        _max_bars: usize,
406    ) -> Result<Arc<RwLock<rusty_model::data::bar::BarCache>>> {
407        // Not implemented for this example
408        Err(anyhow!("Bar caching not implemented for Bybit Futures"))
409    }
410
411    async fn get_stats(&self, instrument_id: &InstrumentId) -> Result<FeedStats> {
412        // Get stats for this instrument
413        let stats = self.stats.read();
414        if let Some(feed_stats) = stats.get(instrument_id) {
415            return Ok(feed_stats.clone());
416        }
417
418        // Return default stats if not found
419        Ok(FeedStats::default())
420    }
421
422    async fn reset_stats(&self) -> Result<()> {
423        // Reset stats for all instruments
424        let mut stats = self.stats.write();
425        for (_, feed_stats) in stats.iter_mut() {
426            *feed_stats = FeedStats::default();
427        }
428
429        Ok(())
430    }
431}
432
433impl BybitFuturesFeeder {
434    /// Static method to convert trade message to normalized MarketTrade
435    #[inline]
436    pub fn convert_trade_message(
437        trade_data: &super::data::trade::TradeData,
438        _msg: &MarketTradeResponse,
439        instrument_id: &InstrumentId,
440        clock: &Clock,
441    ) -> Result<MarketTrade> {
442        // Parse price and quantity
443        let price = trade_data.price.parse::<f64>()?;
444        let quantity = trade_data.size.parse::<f64>()?;
445
446        // Determine order side
447        let direction = match trade_data.side.to_lowercase().as_str() {
448            "buy" => OrderSide::Buy,
449            "sell" => OrderSide::Sell,
450            _ => return Err(anyhow!("Invalid trade side: {}", trade_data.side)),
451        };
452
453        // Parse timestamp
454        let exchange_time = trade_data.timestamp.parse::<u64>()? * 1_000_000; // Convert ms to ns
455
456        // Create normalized trade
457        let trade = MarketTrade {
458            timestamp: clock.now(),
459            exchange_time_ns: exchange_time,
460            price: Decimal::try_from(price).unwrap_or(Decimal::ZERO),
461            quantity: Decimal::try_from(quantity).unwrap_or(Decimal::ZERO),
462            direction,
463            instrument_id: instrument_id.clone(),
464        };
465
466        Ok(trade)
467    }
468
469    /// Static method to convert depth message to normalized OrderBookSnapshot
470    #[inline]
471    pub fn convert_depth_message(
472        msg: &OrderbookResponse,
473        instrument_id: &InstrumentId,
474        max_depth: usize,
475        clock: &Clock,
476    ) -> Result<OrderBookSnapshot> {
477        // Parse the orderbook data
478        let parsed = msg.parse();
479
480        // Create a new OrderBookSnapshot
481        let mut depth = OrderBookSnapshot::new(
482            instrument_id.clone(),
483            SmallVec::<[PriceLevel; 64]>::new(), // Empty bids to be filled
484            SmallVec::<[PriceLevel; 64]>::new(), // Empty asks to be filled
485            parsed.update_id,                    // Sequence ID
486            clock.raw(),                         // Event timestamp
487            clock.raw(),                         // Init timestamp
488        );
489
490        // Add bids (up to max_depth)
491        for (i, (price, quantity)) in parsed.bids.iter().enumerate() {
492            if i >= max_depth {
493                break;
494            }
495            // Convert f64 to Decimal
496            let price_decimal = Decimal::try_from(*price).unwrap_or(Decimal::ZERO);
497            let quantity_decimal = Decimal::try_from(*quantity).unwrap_or(Decimal::ZERO);
498            depth.add_bid(price_decimal, quantity_decimal);
499        }
500
501        // Add asks (up to max_depth)
502        for (i, (price, quantity)) in parsed.asks.iter().enumerate() {
503            if i >= max_depth {
504                break;
505            }
506            // Convert f64 to Decimal
507            let price_decimal = Decimal::try_from(*price).unwrap_or(Decimal::ZERO);
508            let quantity_decimal = Decimal::try_from(*quantity).unwrap_or(Decimal::ZERO);
509            depth.add_ask(price_decimal, quantity_decimal);
510        }
511
512        Ok(depth)
513    }
514}