rusty_feeder/exchange/bybit/
feeder.rs

1use rusty_common::collections::FxHashMap;
2use smartstring::alias::String;
3use std::sync::Arc;
4
5use anyhow::Result;
6use async_trait::async_trait;
7use parking_lot::RwLock;
8use quanta::Clock;
9use rust_decimal::Decimal;
10use skiplist::SkipMap;
11use smallvec::SmallVec;
12use tokio::sync::{mpsc, watch};
13
14use crate::feeder::{FeedStats, Feeder, FeederOptions};
15use rusty_model::{
16    PriceLevel,
17    data::{
18        bar::{Bar, BarCache, BarType},
19        book_snapshot::OrderBookSnapshot,
20        market_trade::MarketTrade,
21        simd_orderbook::{SharedSimdOrderBook, SimdOrderBook},
22    },
23    enums::OrderSide,
24    instruments::InstrumentId,
25};
26
27use super::data::{
28    orderbook::{OrderbookResponse, ParsedOrderbookData},
29    trade::TradeResponse as MarketTradeResponse,
30};
31
32/// Bybit market data feeder optimized for HFT applications
33///
34/// This feeder implementation is specifically designed for high-frequency trading (HFT) scenarios
35/// with the following performance characteristics:
36///
37/// - Zero-allocation critical paths using SmallVec and stack allocation
38/// - Lock-minimization with fine-grained locking for maximum concurrency
39/// - Cache-line alignment for optimal CPU cache efficiency
40/// - CPU pinning for dedicated core processing
41/// - Batch processing for efficient trade and orderbook updates
42/// - Comprehensive performance statistics tracking
43/// - Real-time shared orderbooks with minimal contention
44/// - Time synchronization with nanosecond precision
45/// - Efficient statistics and monitoring
46///
47/// # Performance Settings
48///
49/// Performance can be tuned via the `FeederOptions`:
50/// - `channel_buffer_size`: Size of message buffers (default: 1024)
51/// - `max_depth_levels`: Maximum depth levels for orderbooks (default: 100)
52/// - `batch_size`: Batch message processing size (default: 32)
53/// - `cpu_affinity`: Pin to specific CPU core (default: -1, no pinning)
54/// - `use_zero_copy`: Enable zero-copy processing where possible (default: true)
55/// - `max_backlog`: Maximum message backlog before dropping (default: 10000)
56///
57/// # Performance Notes
58///
59/// Software prefetching has been removed as it provided minimal benefit:
60/// - Sequential access patterns are already optimized by CPU hardware prefetcher
61/// - Immediate usage after prefetch negates benefits
62/// - Small data structures (Decimal ~16 bytes) fit multiple elements per cache line
63///
64/// Focus areas for HFT optimization:
65/// - CPU pinning to dedicated cores
66/// - Cache-line aligned data structures
67/// - Minimizing lock contention with SkipMap
68/// - Batch processing for large updates
69/// - Zero-copy where possible
70///
71/// # Examples
72///
73/// ```ignore
74/// use rusty_feeder::bybit::feeder::BybitFeeder;
75/// use rusty_feeder::feeder::{Feeder, FeederOptions};
76/// use rusty_model::instruments::InstrumentId;
77/// use rusty_model::venues::Venue;
78///
79/// async fn subscribe_spot_trades() {
80///     // Create new Bybit feeder
81///     let feeder = BybitFeeder::new();
82///
83///     // Configure options for low latency
84///     let options = FeederOptions {
85///         channel_buffer_size: 4096,
86///         cpu_affinity: 5, // Pin to CPU core 5
87///         batch_size: 64,
88///         use_zero_copy: true,
89///         max_depth_levels: 20,
90///         max_backlog: 50000,
91///         ..Default::default()
92///     };
93///
94///     // Get a receiver for normalized trades
95///     let instrument = InstrumentId::new("BTCUSDT".to_string(), Venue::Bybit);
96///     let trade_rx = feeder.subscribe_trades(instrument.clone()).await.unwrap();
97///     let normalized_trades_rx = feeder.start_feed_trades(
98///         instrument,
99///         trade_rx,
100///         Some(options)
101///     ).await.unwrap();
102/// }
103/// ```
104#[derive(Debug)]
105#[repr(align(64))] // Cache-line alignment for better CPU cache efficiency
106pub struct BybitFeeder {
107    /// Stop signals for active feeds - used to gracefully terminate feed processing
108    stop_signals: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
109
110    /// Shared high-precision clock for time synchronization and latency measurements
111    clock: Clock,
112
113    /// Feed statistics for monitoring performance and detecting issues
114    /// Keyed by feed identifier (e.g., "depth:BTCUSDT", "trades:BTCUSDT")
115    stats: Arc<RwLock<FxHashMap<String, FeedStats>>>,
116
117    /// Shared orderbooks for each instrument, providing real-time market depth
118    /// Keyed by instrument symbol (e.g., "BTCUSDT")
119    orderbooks: Arc<RwLock<FxHashMap<String, SharedSimdOrderBook>>>,
120
121    /// Bar caches for each instrument and interval, storing historical OHLCV data
122    /// Keyed by "symbol:interval" (e.g., "BTCUSDT:60s" for 1-minute bars)
123    bar_caches: Arc<RwLock<FxHashMap<String, Arc<RwLock<BarCache>>>>>,
124}
125
126impl Default for BybitFeeder {
127    fn default() -> Self {
128        Self::new()
129    }
130}
131
132impl BybitFeeder {
133    /// Create a new Bybit feeder with default settings
134    ///
135    /// This constructor initializes a new Bybit feeder with a default high-precision clock
136    /// and empty state containers. The feeder is immediately ready to process market data.
137    ///
138    /// # Example
139    ///
140    /// ```ignore
141    /// use rusty_feeder::bybit::feeder::BybitFeeder;
142    ///
143    /// let feeder = BybitFeeder::new();
144    /// ```
145    #[inline]
146    #[must_use]
147    pub fn new() -> Self {
148        Self {
149            stop_signals: Arc::new(RwLock::new(FxHashMap::default())),
150            clock: Clock::new(),
151            stats: Arc::new(RwLock::new(FxHashMap::default())),
152            orderbooks: Arc::new(RwLock::new(FxHashMap::default())),
153            bar_caches: Arc::new(RwLock::new(FxHashMap::default())),
154        }
155    }
156
157    /// Create a new Bybit feeder with a custom high-precision clock
158    ///
159    /// This constructor allows providing a pre-configured clock, which can be useful
160    /// for:
161    /// - Testing with a controlled clock source
162    /// - Synchronizing multiple feeders with the same clock
163    /// - Using a clock with specific adjustments for latency compensation
164    ///
165    /// # Parameters
166    ///
167    /// * `clock` - A pre-configured `quanta::Clock` instance for high-precision timing
168    ///
169    /// # Example
170    ///
171    /// ```ignore
172    /// use rusty_feeder::bybit::feeder::BybitFeeder;
173    /// use quanta::Clock;
174    ///
175    /// // Create a clock with specific configuration
176    /// let clock = Clock::new();
177    ///
178    /// // Create feeder with the custom clock
179    /// let feeder = BybitFeeder::with_clock(clock);
180    /// ```
181    #[inline]
182    #[must_use]
183    pub fn with_clock(clock: Clock) -> Self {
184        Self {
185            stop_signals: Arc::new(RwLock::new(FxHashMap::default())),
186            clock,
187            stats: Arc::new(RwLock::new(FxHashMap::default())),
188            orderbooks: Arc::new(RwLock::new(FxHashMap::default())),
189            bar_caches: Arc::new(RwLock::new(FxHashMap::default())),
190        }
191    }
192
193    /// Get the current feed statistics for all active feeds
194    ///
195    /// Returns a copy of the current statistics map, which can be used for monitoring
196    /// and debugging the performance of the feeder.
197    ///
198    /// # Returns
199    ///
200    /// * A HashMap mapping feed identifiers to their statistics
201    pub fn get_all_stats(&self) -> FxHashMap<String, FeedStats> {
202        self.stats.read().clone()
203    }
204
205    /// Get the memory usage of this feeder instance in bytes
206    ///
207    /// This method provides an estimate of the memory used by the feeder's
208    /// internal data structures, which can be useful for monitoring resource usage.
209    ///
210    /// # Returns
211    ///
212    /// * Estimated memory usage in bytes
213    pub fn memory_usage(&self) -> usize {
214        let mut total = 0;
215
216        // Add stats memory usage
217        {
218            let stats = self.stats.read();
219            total += std::mem::size_of_val(&*stats);
220
221            // Add stats entries
222            for (key, value) in stats.iter() {
223                total += key.len();
224                total += std::mem::size_of_val(value);
225            }
226        }
227
228        // Add orderbooks memory usage (estimated)
229        {
230            let orderbooks = self.orderbooks.read();
231            for (key, _) in orderbooks.iter() {
232                total += key.len();
233                // Rough estimate for a typical orderbook
234                total += 2 * 100 * std::mem::size_of::<PriceLevel>();
235            }
236        }
237
238        // Add bar caches memory usage (estimated)
239        {
240            let bar_caches = self.bar_caches.read();
241            for (key, cache) in bar_caches.iter() {
242                total += key.len();
243                // TODO: Add proper method to get BarCache size
244                // let cache_read = cache.read();
245                // total += cache_read.get_total_bars_count() * std::mem::size_of::<Bar>();
246            }
247        }
248
249        total
250    }
251}
252
253#[async_trait]
254impl Feeder for BybitFeeder {
255    type DepthMessage = OrderbookResponse;
256    type TradeMessage = MarketTradeResponse;
257
258    /// Get a realtime shared orderbook for a symbol
259    async fn get_shared_orderbook(
260        &self,
261        instrument_id: &InstrumentId,
262    ) -> Result<SharedSimdOrderBook> {
263        // Extract symbol from InstrumentId
264        let symbol = &instrument_id.symbol;
265
266        // Check if we already have an orderbook for this symbol
267        if let Some(orderbook) = self.orderbooks.read().get(symbol) {
268            return Ok(orderbook.clone());
269        }
270
271        // If not found, create a new one
272        let orderbook = rusty_model::data::orderbook::OrderBook::<64>::new(
273            symbol.clone(),
274            0,                // exchange_timestamp_ns - will be updated on first message
275            self.clock.raw(), // system_timestamp_ns
276            SmallVec::<[PriceLevel; 64]>::new(), // empty bids
277            SmallVec::<[PriceLevel; 64]>::new(), // empty asks
278        );
279        let shared_orderbook = SharedSimdOrderBook::from_orderbook(&orderbook);
280
281        // Store it for future use
282        self.orderbooks
283            .write()
284            .insert(symbol.clone(), shared_orderbook.clone());
285
286        Ok(shared_orderbook)
287    }
288
289    /// Get a bar cache for a symbol and bar type
290    async fn get_bar_cache(
291        &self,
292        instrument_id: &InstrumentId,
293        bar_type: &BarType,
294        max_bars: usize,
295    ) -> Result<Arc<RwLock<BarCache>>> {
296        // Extract symbol from InstrumentId
297        let symbol = &instrument_id.symbol;
298
299        // Get interval from bar_type
300        let interval_sec = match bar_type.get_spec().aggregation {
301            rusty_model::data::bar::BarAggregation::Second => bar_type.get_spec().step,
302            rusty_model::data::bar::BarAggregation::Minute => bar_type.get_spec().step * 60,
303            rusty_model::data::bar::BarAggregation::Hour => bar_type.get_spec().step * 3600,
304            rusty_model::data::bar::BarAggregation::Day => bar_type.get_spec().step * 86400,
305            _ => return Err(anyhow::anyhow!("Only time-based bars are supported")),
306        };
307
308        // Create cache key based on symbol and interval
309        let cache_key = String::from(format!("{symbol}:{interval_sec}s"));
310
311        // Check if we already have a cache for this symbol and interval
312        if let Some(cache) = self.bar_caches.read().get(&cache_key) {
313            // Check if we need to resize the cache
314            {
315                // Note: BarCache doesn't have resize method, max_bars is set at creation
316                // No need to access the cache since we can't change its capacity
317            }
318            return Ok(cache.clone());
319        }
320
321        // If not found, create a new one with the requested capacity
322        let new_cache = Arc::new(RwLock::new(BarCache::new()));
323
324        // Store it for future use
325        self.bar_caches.write().insert(cache_key, new_cache.clone());
326
327        Ok(new_cache)
328    }
329
330    async fn start_feed_depth(
331        &self,
332        instrument_id: InstrumentId,
333        mut depth_rx: mpsc::Receiver<Self::DepthMessage>,
334        options: Option<FeederOptions>,
335    ) -> Result<mpsc::Receiver<OrderBookSnapshot>> {
336        // Extract symbol from InstrumentId
337        let symbol = instrument_id.symbol.clone();
338
339        // Use provided channel buffer size or default
340        let buffer_size = options
341            .as_ref()
342            .map(|opt| opt.channel_buffer_size)
343            .unwrap_or(1024);
344
345        let (tx, rx) = mpsc::channel(buffer_size);
346
347        // Create a shared orderbook and store it
348        let ob = rusty_model::data::orderbook::OrderBook::<64>::new(
349            symbol.clone(),
350            0,                // exchange_timestamp_ns - will be updated on first message
351            self.clock.raw(), // system_timestamp_ns
352            SmallVec::<[PriceLevel; 64]>::new(), // empty bids
353            SmallVec::<[PriceLevel; 64]>::new(), // empty asks
354        );
355        let shared_orderbook = SharedSimdOrderBook::from_orderbook(&ob);
356        self.orderbooks
357            .write()
358            .insert(symbol.clone(), shared_orderbook.clone());
359
360        // Create stop signal
361        let (stop_tx, _) = watch::channel(false);
362        let key = String::from(format!("depth:{symbol}"));
363        self.stop_signals.write().insert(key.clone(), stop_tx);
364        let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
365
366        // Clone clock for use in the task
367        let clock = self.clock.clone();
368
369        // Clone stats for updating feed statistics
370        let stats = self.stats.clone();
371        let stats_key = String::from(format!("depth:{symbol}"));
372
373        // Initialize stats for this feed if not already present
374        stats.write().entry(stats_key.clone()).or_default();
375
376        // Create order book state using SkipMap for efficient price level management
377        // SkipMap provides O(log n) insertion and lookup with sorted iteration
378        let bids = Arc::new(RwLock::new(SkipMap::new()));
379        let asks = Arc::new(RwLock::new(SkipMap::new()));
380        let sequence = Arc::new(RwLock::new(0u64));
381
382        // Get max depth preference from options
383        let max_depth = options
384            .as_ref()
385            .map(|opt| opt.max_depth_levels)
386            .unwrap_or(100);
387
388        // Use CPU pinning if specified in options
389        if let Some(options) = &options
390            && options.cpu_affinity >= 0
391        {
392            #[cfg(target_os = "linux")]
393            {
394                use core_affinity::set_for_current;
395                let _ = set_for_current(core_affinity::CoreId {
396                    id: options.cpu_affinity as usize,
397                });
398            }
399        }
400
401        let batch_size = options.as_ref().map(|opt| opt.batch_size).unwrap_or(0);
402
403        tokio::spawn(async move {
404            loop {
405                tokio::select! {
406                    // Check for stop signal
407                    _ = stop_rx.changed() => {
408                        break;
409                    }
410
411                    // Process incoming depth updates
412                    Some(update) = depth_rx.recv() => {
413                        // Capture processing start time for latency measurement
414                        let process_start = clock.raw();
415
416                        // Parse the orderbook data
417                        if let Some(parsed) = ParsedOrderbookData::from_response(&update) {
418                            // Skip if not the symbol we're interested in
419                            if parsed.symbol != symbol {
420                                continue;
421                            }
422
423                            // Check if this is a valid sequence
424                            {
425                                let current_seq = *sequence.read();
426                                if current_seq > 0 && current_seq >= parsed.sequence {
427                                    // Out of order update, ignore
428                                    if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
429                                        feed_stats.increment_dropped();
430                                    }
431                                    continue;
432                                }
433
434                                // Update sequence
435                                *sequence.write() = parsed.sequence;
436                            }
437
438                            // Update the bids - use write lock for minimum duration
439                            {
440                                let mut bids_lock = bids.write();
441
442                                // Fast path for small updates (common case) - process in batch
443                                if parsed.bids.len() <= 8 {
444                                    // This is optimized for the common case where we have few price levels
445                                    // to update, which is typical for Bybit's incremental updates
446                                    for (price, size) in &parsed.bids {
447                                        if size.is_zero() {
448                                            bids_lock.remove(price);
449                                        } else {
450                                            bids_lock.insert(*price, *size);
451                                        }
452                                    }
453                                } else if batch_size > 0 && parsed.bids.len() > batch_size {
454                                    // Batch processing for large updates
455                                    // Process in chunks to avoid holding the lock for too long
456                                    let chunk_size = batch_size.min(parsed.bids.len());
457
458                                    for chunk in parsed.bids.chunks(chunk_size) {
459                                        for (price, size) in chunk {
460                                            if size.is_zero() {
461                                                bids_lock.remove(price);
462                                            } else {
463                                                bids_lock.insert(*price, *size);
464                                            }
465                                        }
466                                    }
467                                } else {
468                                    // Standard case - process all at once
469                                    for (price, size) in &parsed.bids {
470                                        if size.is_zero() {
471                                            bids_lock.remove(price);
472                                        } else {
473                                            bids_lock.insert(*price, *size);
474                                        }
475                                    }
476                                }
477                            }
478
479                            // Update the asks - use write lock for minimum duration
480                            {
481                                let mut asks_lock = asks.write();
482
483                                // Fast path for small updates (common case) - process in batch
484                                if parsed.asks.len() <= 8 {
485                                    // This is optimized for the common case where we have few price levels
486                                    // to update, which is typical for Bybit's incremental updates
487                                    for (price, size) in &parsed.asks {
488                                        if size.is_zero() {
489                                            asks_lock.remove(price);
490                                        } else {
491                                            asks_lock.insert(*price, *size);
492                                        }
493                                    }
494                                } else if batch_size > 0 && parsed.asks.len() > batch_size {
495                                    // Batch processing for large updates
496                                    // Process in chunks to avoid holding the lock for too long
497                                    let chunk_size = batch_size.min(parsed.asks.len());
498
499                                    for chunk in parsed.asks.chunks(chunk_size) {
500                                        for (price, size) in chunk {
501                                            if size.is_zero() {
502                                                asks_lock.remove(price);
503                                            } else {
504                                                asks_lock.insert(*price, *size);
505                                            }
506                                        }
507                                    }
508                                } else {
509                                    // Standard case - process all at once
510                                    for (price, size) in &parsed.asks {
511                                        if size.is_zero() {
512                                            asks_lock.remove(price);
513                                        } else {
514                                            asks_lock.insert(*price, *size);
515                                        }
516                                    }
517                                }
518                            }
519
520                            // Create a snapshot of the current order book using SmallVec for stack allocation
521                            // Pre-allocate with capacity to avoid heap allocations
522                            let mut bids_vec = smallvec::SmallVec::<[PriceLevel; 64]>::with_capacity(max_depth);
523                            let mut asks_vec = smallvec::SmallVec::<[PriceLevel; 64]>::with_capacity(max_depth);
524
525                            // Convert bids and asks to SmallVec with minimal lock holding time
526                            {
527                                let bids_guard = bids.read();
528                                let mut count = 0;
529                                for (price, size) in bids_guard.iter().rev() { // Reverse to get highest bids first
530                                    bids_vec.push(PriceLevel::new(*price, *size));
531                                    count += 1;
532                                    if count >= max_depth {
533                                        break;
534                                    }
535                                }
536                            }
537
538                            {
539                                let asks_guard = asks.read();
540                                let mut count = 0;
541                                for (price, size) in asks_guard.iter() { // Iterate forward to get lowest asks first
542                                    asks_vec.push(PriceLevel::new(*price, *size));
543                                    count += 1;
544                                    if count >= max_depth {
545                                        break;
546                                    }
547                                }
548                            }
549
550                            let seq = *sequence.read();
551
552                            // Calculate lengths before moving the vectors
553                            let bids_len = bids_vec.len();
554                            let asks_len = asks_vec.len();
555
556                            // Update the shared orderbook for anyone holding a reference
557                            let bids_levels: SmallVec<[PriceLevel; 64]> = parsed.bids
558                                .iter()
559                                .map(|&(price, quantity)| PriceLevel::new(price, quantity))
560                                .collect();
561                            let asks_levels: SmallVec<[PriceLevel; 64]> = parsed.asks
562                                .iter()
563                                .map(|&(price, quantity)| PriceLevel::new(price, quantity))
564                                .collect();
565
566                            let model_orderbook = rusty_model::data::orderbook::OrderBook::<64>::new(
567                                instrument_id.to_string(),
568                                seq * 1_000_000, // Convert to nanoseconds
569                                parsed.timestamp,
570                                bids_levels,
571                            asks_levels,
572                            );
573                            shared_orderbook.write(|ob| {
574                                *ob = SimdOrderBook::from_orderbook(&model_orderbook);
575                            });
576
577                            // Create OrderBookSnapshot from the current state
578                            let depth = OrderBookSnapshot::new(
579                                instrument_id.clone(),
580                                bids_vec,
581                                asks_vec,
582                                seq,
583                                parsed.timestamp,
584                                process_start,
585                            );
586
587                            // Calculate processing latency
588                            let process_end = clock.raw();
589                            let latency_ns = process_end.saturating_sub(process_start);
590
591                            // Update statistics
592                            if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
593                                // Update message count
594                                feed_stats.increment_processed();
595
596                                // Update latency statistics (avg, max, p99)
597                                feed_stats.add_latency_sample(latency_ns);
598
599                                // Record timestamp of the last update
600                                feed_stats.last_update_time = process_end;
601
602                                // Estimate memory usage for this orderbook update
603                                // We count the orderbook levels and message overhead
604                                let orderbook_memory =
605                                    std::mem::size_of::<OrderBookSnapshot>() +
606                                    (bids_len + asks_len) * std::mem::size_of::<PriceLevel>();
607
608                                // Update memory estimate using the FeedStats helper method
609                                feed_stats.update_memory_usage(orderbook_memory);
610                            }
611
612                            // Send the order book snapshot
613                            if tx.send(depth).await.is_err() {
614                                // Track dropped message if send fails
615                                if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
616                                    feed_stats.increment_dropped();
617                                }
618                                break;
619                            }
620                        }
621                    }
622                }
623            }
624        });
625
626        Ok(rx)
627    }
628
629    async fn stop_feed_depth(&self, instrument_id: &InstrumentId) -> Result<()> {
630        let symbol = &instrument_id.symbol;
631        let key = String::from(format!("depth:{symbol}"));
632        if let Some(tx) = self.stop_signals.write().remove(&key) {
633            let _ = tx.send(true);
634        }
635
636        Ok(())
637    }
638
639    async fn start_feed_trades(
640        &self,
641        instrument_id: InstrumentId,
642        mut trade_rx: mpsc::Receiver<Self::TradeMessage>,
643        options: Option<FeederOptions>,
644    ) -> Result<mpsc::Receiver<MarketTrade>> {
645        // Extract symbol from InstrumentId
646        let symbol = instrument_id.symbol.clone();
647
648        // Use provided channel buffer size or default
649        let buffer_size = options
650            .as_ref()
651            .map(|opt| opt.channel_buffer_size)
652            .unwrap_or(1024);
653
654        let (tx, rx) = mpsc::channel(buffer_size);
655
656        // Create stop signal
657        let (stop_tx, _) = watch::channel(false);
658        let key = String::from(format!("trades:{symbol}"));
659        self.stop_signals.write().insert(key.clone(), stop_tx);
660        let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
661
662        // Clone clock for use in the task
663        let clock = self.clock.clone();
664
665        // Clone stats for updating feed statistics
666        let stats = self.stats.clone();
667        let stats_key = String::from(format!("trades:{symbol}"));
668
669        // Initialize stats for this feed if not already present
670        stats.write().entry(stats_key.clone()).or_default();
671
672        // Use CPU pinning if specified in options
673        if let Some(options) = &options
674            && options.cpu_affinity >= 0
675        {
676            #[cfg(target_os = "linux")]
677            {
678                use core_affinity::set_for_current;
679                let _ = set_for_current(core_affinity::CoreId {
680                    id: options.cpu_affinity as usize,
681                });
682            }
683        }
684
685        // Check if zero-copy processing is enabled
686        let _use_zero_copy = options
687            .as_ref()
688            .map(|opt| opt.use_zero_copy)
689            .unwrap_or(false);
690
691        // Note: Prefetch settings removed - hardware prefetcher handles sequential access better
692        // Focus on other optimizations: CPU pinning, batch processing, cache alignment
693
694        tokio::spawn(async move {
695            loop {
696                tokio::select! {
697                    // Check for stop signal
698                    _ = stop_rx.changed() => {
699                        break;
700                    }
701
702                    // Process incoming trade messages
703                    Some(trade_msg) = trade_rx.recv() => {
704                        // Process each trade in the response
705                        for trade_data in &trade_msg.data {
706                            // Capture trade timestamp and processing start time for latency measurement
707                            let event_timestamp = clock.now();
708                            let process_start = clock.raw();
709
710                            // Skip if not the symbol we're interested in
711                            if trade_data.symbol != symbol {
712                                continue;
713                            }
714
715                            // Convert to Trade type
716                            let side = match trade_data.side.as_str() {
717                                "Buy" => OrderSide::Buy,
718                                "Sell" => OrderSide::Sell,
719                                _ => {
720                                    // Update dropped message count on error
721                                    if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
722                                        feed_stats.increment_dropped();
723                                    }
724                                    continue;
725                                },
726                            };
727
728                            let trade = MarketTrade {
729                                timestamp: event_timestamp,
730                                exchange_time_ns: trade_data.trade_time_ms * 1_000_000, // Convert ms to ns
731                                price: trade_data.price,
732                                quantity: trade_data.size,
733                                direction: side,
734                                instrument_id: instrument_id.clone(),
735                            };
736
737                            // Calculate processing latency
738                            let process_end = clock.raw();
739                            let latency_ns = process_end.saturating_sub(process_start);
740
741                            // Update statistics
742                            if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
743                                // Update message count
744                                feed_stats.increment_processed();
745
746                                // Update latency statistics (avg, max, p99)
747                                feed_stats.add_latency_sample(latency_ns);
748
749                                // Record timestamp of the last update
750                                feed_stats.last_update_time = process_end;
751
752                                // Estimate memory usage for this trade update
753                                let memory_estimate = std::mem::size_of::<MarketTrade>();
754
755                                // Update memory estimate using the FeedStats helper method
756                                feed_stats.update_memory_usage(memory_estimate);
757                            }
758
759                            // Send the trade
760                            if tx.send(trade).await.is_err() {
761                                // Update dropped message count on error
762                                if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
763                                    feed_stats.increment_dropped();
764                                }
765                                break;
766                            }
767                        }
768                    }
769                }
770            }
771        });
772
773        Ok(rx)
774    }
775
776    async fn stop_feed_trades(&self, instrument_id: &InstrumentId) -> Result<()> {
777        let symbol = &instrument_id.symbol;
778        let key = String::from(format!("trades:{symbol}"));
779        if let Some(tx) = self.stop_signals.write().remove(&key) {
780            let _ = tx.send(true);
781        }
782
783        Ok(())
784    }
785
786    async fn start_feed_bars(
787        &self,
788        instrument_id: InstrumentId,
789        bar_type: BarType,
790        mut trade_rx: mpsc::Receiver<MarketTrade>,
791        options: Option<crate::feeder::FeederOptions>,
792    ) -> Result<mpsc::Receiver<Bar>> {
793        // Extract symbol from InstrumentId
794        let symbol = instrument_id.symbol.clone();
795
796        // Get interval from bar_type
797        let interval_sec = match bar_type.get_spec().aggregation {
798            rusty_model::data::bar::BarAggregation::Second => bar_type.get_spec().step,
799            rusty_model::data::bar::BarAggregation::Minute => bar_type.get_spec().step * 60,
800            rusty_model::data::bar::BarAggregation::Hour => bar_type.get_spec().step * 3600,
801            rusty_model::data::bar::BarAggregation::Day => bar_type.get_spec().step * 86400,
802            _ => return Err(anyhow::anyhow!("Only time-based bars are supported")),
803        };
804
805        // Use provided channel buffer size or default
806        let buffer_size = options
807            .as_ref()
808            .map(|opt| opt.channel_buffer_size)
809            .unwrap_or(1024);
810
811        let (tx, rx) = mpsc::channel(buffer_size);
812
813        // Create stop signal
814        let (stop_tx, _) = watch::channel(false);
815        let key = String::from(format!("bars:{symbol}:{interval_sec}"));
816        self.stop_signals.write().insert(key.clone(), stop_tx);
817        let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
818
819        // Clone clock for use in the task
820        let clock = self.clock.clone();
821
822        // Clone stats for updating feed statistics
823        let stats = self.stats.clone();
824        let stats_key = String::from(format!("bars:{symbol}:{interval_sec}"));
825
826        // Initialize stats for this feed if not already present
827        stats.write().entry(stats_key.clone()).or_default();
828
829        // Set up bar cache for storing historical bars if not already present
830        let cache_key = String::from(format!("{symbol}:{interval_sec}s"));
831        let bar_cache = if let Some(existing_cache) = self.bar_caches.read().get(&cache_key) {
832            existing_cache.clone()
833        } else {
834            let new_cache = Arc::new(RwLock::new(BarCache::new())); // Default cache
835            self.bar_caches.write().insert(cache_key, new_cache.clone());
836            new_cache
837        };
838
839        // Use CPU pinning if specified in options
840        if let Some(options) = &options
841            && options.cpu_affinity >= 0
842        {
843            #[cfg(target_os = "linux")]
844            {
845                use core_affinity::set_for_current;
846                let _ = set_for_current(core_affinity::CoreId {
847                    id: options.cpu_affinity as usize,
848                });
849            }
850        }
851
852        // Use SmallVec for efficient batch processing if batching is enabled
853        let batch_size = options.as_ref().map(|opt| opt.batch_size).unwrap_or(0);
854
855        // Bar aggregation state - using Option<Decimal> for better performance than default values
856        // This avoids unnecessary copies of Decimal values
857        let mut open = None;
858        let mut high = None;
859        let mut low = None;
860        let mut close = None;
861        let mut volume = rust_decimal::Decimal::ZERO;
862        let mut buy_volume = rust_decimal::Decimal::ZERO;
863        let mut sell_volume = rust_decimal::Decimal::ZERO;
864        let mut trade_count = 0u64;
865        let mut last_bar_time = 0u64;
866        let mut last_trade_time = 0u64;
867
868        tokio::spawn(async move {
869            // Pre-allocate a batch buffer if batching is enabled
870            #[allow(unused_mut)] // Might be unused if batch_size is 0
871            let mut trade_batch = if batch_size > 0 {
872                smallvec::SmallVec::<[MarketTrade; 32]>::with_capacity(batch_size)
873            } else {
874                smallvec::SmallVec::<[MarketTrade; 32]>::new()
875            };
876
877            loop {
878                tokio::select! {
879                    // Check for stop signal
880                    _ = stop_rx.changed() => {
881                        // Send the final bar before stopping if we have data
882                        if let (Some(o), Some(h), Some(l), Some(c)) = (open, high, low, close)
883                            && volume > rust_decimal::Decimal::ZERO {
884                                let bar_type = rusty_model::data::bar::BarType::new_standard(
885                                    instrument_id.symbol.to_string().into(),
886                                    rusty_model::data::bar::BarAggregation::Second,
887                                    interval_sec
888                                );
889
890                                let bar = Bar {
891                                    bar_type,
892                                    open: o,
893                                    high: h,
894                                    low: l,
895                                    close: c,
896                                    volume,
897                                    timestamp_ns: clock.raw(),
898                                };
899
900                                // Update bar cache
901                                bar_cache.write().add_bar(bar.clone());
902
903                                // Send the bar and ignore any send errors since we're shutting down
904                                let _ = tx.try_send(bar);
905                            }
906                        break;
907                    }
908
909                    // Process incoming trades
910                    Some(trade) = trade_rx.recv() => {
911                        // Capture processing start time for latency measurement
912                        let process_start = clock.raw();
913
914                        // If batching is enabled, add to batch or process immediately otherwise
915                        if batch_size > 0 {
916                            // Remove ineffective batch prefetching:
917                            // - Prefetching elements we're about to process immediately
918                            // - Small trade objects benefit more from cache locality than prefetch
919                            // - Hardware prefetcher already handles sequential access patterns
920
921                            trade_batch.push(trade);
922
923                            // Process batch if full
924                            if trade_batch.len() >= batch_size {
925                                // Process the batch of trades
926                                for trade in trade_batch.drain(..) {
927                                    process_trade(
928                                        &trade,
929                                        clock.raw(),
930                                        interval_sec,
931                                        &mut open,
932                                        &mut high,
933                                        &mut low,
934                                        &mut close,
935                                        &mut volume,
936                                        &mut buy_volume,
937                                        &mut sell_volume,
938                                        &mut trade_count,
939                                        &mut last_bar_time,
940                                        &mut last_trade_time,
941                                        &tx,
942                                        &bar_cache,
943                                        &instrument_id,
944                                        &symbol,
945                                        &clock,
946                                        &stats,
947                                        &stats_key,
948                                        process_start,
949                                    ).await;
950                                }
951                            }
952                        } else {
953                            // Process trades one at a time
954                            process_trade(
955                                &trade,
956                                clock.raw(),
957                                interval_sec,
958                                &mut open,
959                                &mut high,
960                                &mut low,
961                                &mut close,
962                                &mut volume,
963                                &mut buy_volume,
964                                &mut sell_volume,
965                                &mut trade_count,
966                                &mut last_bar_time,
967                                &mut last_trade_time,
968                                &tx,
969                                &bar_cache,
970                                &instrument_id,
971                                &symbol,
972                                &clock,
973                                &stats,
974                                &stats_key,
975                                process_start,
976                            ).await;
977                        }
978                    }
979                }
980            }
981        });
982
983        Ok(rx)
984    }
985
986    async fn stop_feed_bars(&self, instrument_id: &InstrumentId, bar_type: &BarType) -> Result<()> {
987        let symbol = &instrument_id.symbol;
988
989        // Get interval from bar_type
990        let interval_sec = match bar_type.get_spec().aggregation {
991            rusty_model::data::bar::BarAggregation::Second => bar_type.get_spec().step,
992            rusty_model::data::bar::BarAggregation::Minute => bar_type.get_spec().step * 60,
993            rusty_model::data::bar::BarAggregation::Hour => bar_type.get_spec().step * 3600,
994            rusty_model::data::bar::BarAggregation::Day => bar_type.get_spec().step * 86400,
995            _ => return Err(anyhow::anyhow!("Only time-based bars are supported")),
996        };
997        let key = String::from(format!("bars:{symbol}:{interval_sec}"));
998        if let Some(tx) = self.stop_signals.write().remove(&key) {
999            let _ = tx.send(true);
1000        }
1001
1002        Ok(())
1003    }
1004
1005    /// Get feed statistics for a specific instrument
1006    /// Returns performance data for the instrument's feeds
1007    async fn get_stats(&self, instrument_id: &InstrumentId) -> Result<FeedStats> {
1008        let symbol = &instrument_id.symbol;
1009
1010        // Create a merged stats object from all feeds for this instrument
1011        let mut merged_stats = FeedStats::default();
1012
1013        // Acquire read lock once to minimize contention
1014        let stats_guard = self.stats.read();
1015
1016        // Check for depth feed stats
1017        let depth_key = String::from(format!("depth:{symbol}"));
1018        if let Some(depth_stats) = stats_guard.get(&depth_key) {
1019            merged_stats.messages_processed += depth_stats.messages_processed;
1020            merged_stats.dropped_messages += depth_stats.dropped_messages;
1021
1022            // Take the max of latencies
1023            merged_stats.max_process_latency_ns = merged_stats
1024                .max_process_latency_ns
1025                .max(depth_stats.max_process_latency_ns);
1026
1027            // For avg latency, we need a weighted average
1028            if merged_stats.messages_processed > 0 {
1029                merged_stats.avg_process_latency_ns = (merged_stats.avg_process_latency_ns
1030                    * (merged_stats.messages_processed - depth_stats.messages_processed)
1031                    + depth_stats.avg_process_latency_ns * depth_stats.messages_processed)
1032                    / merged_stats.messages_processed;
1033            } else {
1034                merged_stats.avg_process_latency_ns = depth_stats.avg_process_latency_ns;
1035            }
1036
1037            // Update last update time if more recent
1038            if depth_stats.last_update_time > merged_stats.last_update_time {
1039                merged_stats.last_update_time = depth_stats.last_update_time;
1040            }
1041
1042            // Accumulate memory usage
1043            merged_stats.memory_usage_bytes += depth_stats.memory_usage_bytes;
1044        }
1045
1046        // Check for trades feed stats
1047        let trades_key = String::from(format!("trades:{symbol}"));
1048        if let Some(trades_stats) = stats_guard.get(&trades_key) {
1049            merged_stats.messages_processed += trades_stats.messages_processed;
1050            merged_stats.dropped_messages += trades_stats.dropped_messages;
1051
1052            // Take the max of latencies
1053            merged_stats.max_process_latency_ns = merged_stats
1054                .max_process_latency_ns
1055                .max(trades_stats.max_process_latency_ns);
1056
1057            // For avg latency, we need a weighted average
1058            if merged_stats.messages_processed > 0 {
1059                merged_stats.avg_process_latency_ns = (merged_stats.avg_process_latency_ns
1060                    * (merged_stats.messages_processed - trades_stats.messages_processed)
1061                    + trades_stats.avg_process_latency_ns * trades_stats.messages_processed)
1062                    / merged_stats.messages_processed;
1063            } else {
1064                merged_stats.avg_process_latency_ns = trades_stats.avg_process_latency_ns;
1065            }
1066
1067            // Update last update time if more recent
1068            if trades_stats.last_update_time > merged_stats.last_update_time {
1069                merged_stats.last_update_time = trades_stats.last_update_time;
1070            }
1071
1072            // Accumulate memory usage
1073            merged_stats.memory_usage_bytes += trades_stats.memory_usage_bytes;
1074        }
1075
1076        // Check for bar feed stats - all intervals
1077        for (key, stats) in stats_guard.iter() {
1078            if key.starts_with(&format!("bars:{symbol}:")) {
1079                merged_stats.messages_processed += stats.messages_processed;
1080                merged_stats.dropped_messages += stats.dropped_messages;
1081
1082                // Take the max of latencies
1083                merged_stats.max_process_latency_ns = merged_stats
1084                    .max_process_latency_ns
1085                    .max(stats.max_process_latency_ns);
1086
1087                // Update last update time if more recent
1088                if stats.last_update_time > merged_stats.last_update_time {
1089                    merged_stats.last_update_time = stats.last_update_time;
1090                }
1091
1092                // Accumulate memory usage
1093                merged_stats.memory_usage_bytes += stats.memory_usage_bytes;
1094            }
1095        }
1096
1097        // Calculate messages per second based on last update and clock
1098        if merged_stats.last_update_time > 0 {
1099            let current_time = self.clock.raw();
1100            let time_diff_seconds = (current_time - merged_stats.last_update_time) / 1_000_000_000;
1101
1102            if time_diff_seconds > 0 {
1103                merged_stats.messages_per_second =
1104                    merged_stats.messages_processed as f64 / time_diff_seconds as f64;
1105            }
1106        }
1107
1108        Ok(merged_stats)
1109    }
1110
1111    /// Reset all feed statistics
1112    /// Clears all collected performance statistics
1113    async fn reset_stats(&self) -> Result<()> {
1114        // Reset all stats to default values
1115        let mut stats_guard = self.stats.write();
1116
1117        for stats in stats_guard.values_mut() {
1118            *stats = FeedStats::default();
1119        }
1120
1121        Ok(())
1122    }
1123}
1124
1125/// Helper function to process a single trade for bar aggregation
1126/// Extracted to reduce code duplication when processing batches
1127#[inline]
1128#[allow(clippy::too_many_arguments)]
1129async fn process_trade(
1130    trade: &MarketTrade,
1131    now: u64,
1132    interval_sec: u64,
1133    open: &mut Option<Decimal>,
1134    high: &mut Option<Decimal>,
1135    low: &mut Option<Decimal>,
1136    close: &mut Option<Decimal>,
1137    volume: &mut Decimal,
1138    buy_volume: &mut Decimal,
1139    sell_volume: &mut Decimal,
1140    trade_count: &mut u64,
1141    last_bar_time: &mut u64,
1142    last_trade_time: &mut u64,
1143    tx: &mpsc::Sender<Bar>,
1144    bar_cache: &Arc<RwLock<BarCache>>,
1145    instrument_id: &InstrumentId,
1146    _symbol: &str,
1147    clock: &Clock,
1148    stats: &Arc<RwLock<FxHashMap<String, FeedStats>>>,
1149    stats_key: &str,
1150    process_start: u64,
1151) {
1152    // Update last trade time
1153    // Convert Instant to u64 nanoseconds
1154    *last_trade_time = trade
1155        .timestamp
1156        .duration_since(quanta::Instant::recent())
1157        .as_nanos() as u64;
1158
1159    // Calculate current interval
1160    let current_interval = now / 1_000_000_000 / interval_sec;
1161
1162    // Update volume based on trade direction
1163    match trade.direction {
1164        OrderSide::Buy => *buy_volume += trade.quantity,
1165        OrderSide::Sell => *sell_volume += trade.quantity,
1166    }
1167
1168    // Check if we need to start a new bar
1169    if *last_bar_time == 0 {
1170        // First trade
1171        *open = Some(trade.price);
1172        *high = Some(trade.price);
1173        *low = Some(trade.price);
1174        *close = Some(trade.price);
1175        *volume = trade.quantity;
1176        *trade_count = 1;
1177        *last_bar_time = current_interval;
1178    } else if current_interval > *last_bar_time {
1179        // New bar interval - send the current bar and start a new one
1180        if let (Some(o), Some(h), Some(l), Some(c)) = (*open, *high, *low, *close) {
1181            // Create a BarType for the interval
1182            let bar_type = rusty_model::data::bar::BarType::new_standard(
1183                instrument_id.symbol.clone(),
1184                rusty_model::data::bar::BarAggregation::Second,
1185                interval_sec,
1186            );
1187
1188            let bar = Bar {
1189                bar_type,
1190                open: o,
1191                high: h,
1192                low: l,
1193                close: c,
1194                volume: *volume,
1195                timestamp_ns: clock.raw(),
1196            };
1197
1198            // Update bar cache
1199            bar_cache.write().add_bar(bar.clone());
1200
1201            // Send the bar
1202            if tx.send(bar).await.is_err() {
1203                // Update dropped message count on error
1204                if let Some(feed_stats) = stats.write().get_mut(stats_key) {
1205                    feed_stats.increment_dropped();
1206                }
1207                return;
1208            }
1209        }
1210
1211        // Start a new bar
1212        *open = Some(trade.price);
1213        *high = Some(trade.price);
1214        *low = Some(trade.price);
1215        *close = Some(trade.price);
1216        *volume = trade.quantity;
1217        *buy_volume = rust_decimal::Decimal::ZERO;
1218        *sell_volume = rust_decimal::Decimal::ZERO;
1219
1220        // Reset volume counts
1221        match trade.direction {
1222            OrderSide::Buy => *buy_volume = trade.quantity,
1223            OrderSide::Sell => *sell_volume = trade.quantity,
1224        }
1225
1226        *trade_count = 1;
1227        *last_bar_time = current_interval;
1228    } else {
1229        // Update the current bar
1230        *high = Some(std::cmp::max(high.unwrap_or(trade.price), trade.price));
1231        *low = Some(std::cmp::min(low.unwrap_or(trade.price), trade.price));
1232        *close = Some(trade.price);
1233        *volume += trade.quantity;
1234        *trade_count += 1;
1235    }
1236
1237    // Calculate processing latency
1238    let process_end = clock.raw();
1239    let latency_ns = process_end.saturating_sub(process_start);
1240
1241    // Update statistics
1242    if let Some(feed_stats) = stats.write().get_mut(stats_key) {
1243        // Update message count
1244        feed_stats.increment_processed();
1245
1246        // Update latency statistics (avg, max, p99)
1247        feed_stats.add_latency_sample(latency_ns);
1248
1249        // Record timestamp of the last update
1250        feed_stats.last_update_time = process_end;
1251
1252        // Estimate memory usage - this is approximate but helpful for monitoring
1253        // Trade size + orderbook memory if this is the first trade
1254        let mut memory_estimate = std::mem::size_of::<MarketTrade>();
1255
1256        // If this is a bar completion, add the bar size
1257        if current_interval > *last_bar_time && *volume > rust_decimal::Decimal::ZERO {
1258            memory_estimate += std::mem::size_of::<Bar>();
1259        }
1260
1261        // Update the memory usage estimate using the FeedStats helper method
1262        feed_stats.update_memory_usage(memory_estimate);
1263    }
1264}