rusty_feeder/exchange/bybit/feeder.rs
1use rusty_common::collections::FxHashMap;
2use smartstring::alias::String;
3use std::sync::Arc;
4
5use anyhow::Result;
6use async_trait::async_trait;
7use parking_lot::RwLock;
8use quanta::Clock;
9use rust_decimal::Decimal;
10use skiplist::SkipMap;
11use smallvec::SmallVec;
12use tokio::sync::{mpsc, watch};
13
14use crate::feeder::{FeedStats, Feeder, FeederOptions};
15use rusty_model::{
16 PriceLevel,
17 data::{
18 bar::{Bar, BarCache, BarType},
19 book_snapshot::OrderBookSnapshot,
20 market_trade::MarketTrade,
21 simd_orderbook::{SharedSimdOrderBook, SimdOrderBook},
22 },
23 enums::OrderSide,
24 instruments::InstrumentId,
25};
26
27use super::data::{
28 orderbook::{OrderbookResponse, ParsedOrderbookData},
29 trade::TradeResponse as MarketTradeResponse,
30};
31
32/// Bybit market data feeder optimized for HFT applications
33///
34/// This feeder implementation is specifically designed for high-frequency trading (HFT) scenarios
35/// with the following performance characteristics:
36///
37/// - Zero-allocation critical paths using SmallVec and stack allocation
38/// - Lock-minimization with fine-grained locking for maximum concurrency
39/// - Cache-line alignment for optimal CPU cache efficiency
40/// - CPU pinning for dedicated core processing
41/// - Batch processing for efficient trade and orderbook updates
42/// - Comprehensive performance statistics tracking
43/// - Real-time shared orderbooks with minimal contention
44/// - Time synchronization with nanosecond precision
45/// - Efficient statistics and monitoring
46///
47/// # Performance Settings
48///
49/// Performance can be tuned via the `FeederOptions`:
50/// - `channel_buffer_size`: Size of message buffers (default: 1024)
51/// - `max_depth_levels`: Maximum depth levels for orderbooks (default: 100)
52/// - `batch_size`: Batch message processing size (default: 32)
53/// - `cpu_affinity`: Pin to specific CPU core (default: -1, no pinning)
54/// - `use_zero_copy`: Enable zero-copy processing where possible (default: true)
55/// - `max_backlog`: Maximum message backlog before dropping (default: 10000)
56///
57/// # Performance Notes
58///
59/// Software prefetching has been removed as it provided minimal benefit:
60/// - Sequential access patterns are already optimized by CPU hardware prefetcher
61/// - Immediate usage after prefetch negates benefits
62/// - Small data structures (Decimal ~16 bytes) fit multiple elements per cache line
63///
64/// Focus areas for HFT optimization:
65/// - CPU pinning to dedicated cores
66/// - Cache-line aligned data structures
67/// - Minimizing lock contention with SkipMap
68/// - Batch processing for large updates
69/// - Zero-copy where possible
70///
71/// # Examples
72///
73/// ```ignore
74/// use rusty_feeder::bybit::feeder::BybitFeeder;
75/// use rusty_feeder::feeder::{Feeder, FeederOptions};
76/// use rusty_model::instruments::InstrumentId;
77/// use rusty_model::venues::Venue;
78///
79/// async fn subscribe_spot_trades() {
80/// // Create new Bybit feeder
81/// let feeder = BybitFeeder::new();
82///
83/// // Configure options for low latency
84/// let options = FeederOptions {
85/// channel_buffer_size: 4096,
86/// cpu_affinity: 5, // Pin to CPU core 5
87/// batch_size: 64,
88/// use_zero_copy: true,
89/// max_depth_levels: 20,
90/// max_backlog: 50000,
91/// ..Default::default()
92/// };
93///
94/// // Get a receiver for normalized trades
95/// let instrument = InstrumentId::new("BTCUSDT".to_string(), Venue::Bybit);
96/// let trade_rx = feeder.subscribe_trades(instrument.clone()).await.unwrap();
97/// let normalized_trades_rx = feeder.start_feed_trades(
98/// instrument,
99/// trade_rx,
100/// Some(options)
101/// ).await.unwrap();
102/// }
103/// ```
104#[derive(Debug)]
105#[repr(align(64))] // Cache-line alignment for better CPU cache efficiency
106pub struct BybitFeeder {
107 /// Stop signals for active feeds - used to gracefully terminate feed processing
108 stop_signals: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
109
110 /// Shared high-precision clock for time synchronization and latency measurements
111 clock: Clock,
112
113 /// Feed statistics for monitoring performance and detecting issues
114 /// Keyed by feed identifier (e.g., "depth:BTCUSDT", "trades:BTCUSDT")
115 stats: Arc<RwLock<FxHashMap<String, FeedStats>>>,
116
117 /// Shared orderbooks for each instrument, providing real-time market depth
118 /// Keyed by instrument symbol (e.g., "BTCUSDT")
119 orderbooks: Arc<RwLock<FxHashMap<String, SharedSimdOrderBook>>>,
120
121 /// Bar caches for each instrument and interval, storing historical OHLCV data
122 /// Keyed by "symbol:interval" (e.g., "BTCUSDT:60s" for 1-minute bars)
123 bar_caches: Arc<RwLock<FxHashMap<String, Arc<RwLock<BarCache>>>>>,
124}
125
126impl Default for BybitFeeder {
127 fn default() -> Self {
128 Self::new()
129 }
130}
131
132impl BybitFeeder {
133 /// Create a new Bybit feeder with default settings
134 ///
135 /// This constructor initializes a new Bybit feeder with a default high-precision clock
136 /// and empty state containers. The feeder is immediately ready to process market data.
137 ///
138 /// # Example
139 ///
140 /// ```ignore
141 /// use rusty_feeder::bybit::feeder::BybitFeeder;
142 ///
143 /// let feeder = BybitFeeder::new();
144 /// ```
145 #[inline]
146 #[must_use]
147 pub fn new() -> Self {
148 Self {
149 stop_signals: Arc::new(RwLock::new(FxHashMap::default())),
150 clock: Clock::new(),
151 stats: Arc::new(RwLock::new(FxHashMap::default())),
152 orderbooks: Arc::new(RwLock::new(FxHashMap::default())),
153 bar_caches: Arc::new(RwLock::new(FxHashMap::default())),
154 }
155 }
156
157 /// Create a new Bybit feeder with a custom high-precision clock
158 ///
159 /// This constructor allows providing a pre-configured clock, which can be useful
160 /// for:
161 /// - Testing with a controlled clock source
162 /// - Synchronizing multiple feeders with the same clock
163 /// - Using a clock with specific adjustments for latency compensation
164 ///
165 /// # Parameters
166 ///
167 /// * `clock` - A pre-configured `quanta::Clock` instance for high-precision timing
168 ///
169 /// # Example
170 ///
171 /// ```ignore
172 /// use rusty_feeder::bybit::feeder::BybitFeeder;
173 /// use quanta::Clock;
174 ///
175 /// // Create a clock with specific configuration
176 /// let clock = Clock::new();
177 ///
178 /// // Create feeder with the custom clock
179 /// let feeder = BybitFeeder::with_clock(clock);
180 /// ```
181 #[inline]
182 #[must_use]
183 pub fn with_clock(clock: Clock) -> Self {
184 Self {
185 stop_signals: Arc::new(RwLock::new(FxHashMap::default())),
186 clock,
187 stats: Arc::new(RwLock::new(FxHashMap::default())),
188 orderbooks: Arc::new(RwLock::new(FxHashMap::default())),
189 bar_caches: Arc::new(RwLock::new(FxHashMap::default())),
190 }
191 }
192
193 /// Get the current feed statistics for all active feeds
194 ///
195 /// Returns a copy of the current statistics map, which can be used for monitoring
196 /// and debugging the performance of the feeder.
197 ///
198 /// # Returns
199 ///
200 /// * A HashMap mapping feed identifiers to their statistics
201 pub fn get_all_stats(&self) -> FxHashMap<String, FeedStats> {
202 self.stats.read().clone()
203 }
204
205 /// Get the memory usage of this feeder instance in bytes
206 ///
207 /// This method provides an estimate of the memory used by the feeder's
208 /// internal data structures, which can be useful for monitoring resource usage.
209 ///
210 /// # Returns
211 ///
212 /// * Estimated memory usage in bytes
213 pub fn memory_usage(&self) -> usize {
214 let mut total = 0;
215
216 // Add stats memory usage
217 {
218 let stats = self.stats.read();
219 total += std::mem::size_of_val(&*stats);
220
221 // Add stats entries
222 for (key, value) in stats.iter() {
223 total += key.len();
224 total += std::mem::size_of_val(value);
225 }
226 }
227
228 // Add orderbooks memory usage (estimated)
229 {
230 let orderbooks = self.orderbooks.read();
231 for (key, _) in orderbooks.iter() {
232 total += key.len();
233 // Rough estimate for a typical orderbook
234 total += 2 * 100 * std::mem::size_of::<PriceLevel>();
235 }
236 }
237
238 // Add bar caches memory usage (estimated)
239 {
240 let bar_caches = self.bar_caches.read();
241 for (key, cache) in bar_caches.iter() {
242 total += key.len();
243 // TODO: Add proper method to get BarCache size
244 // let cache_read = cache.read();
245 // total += cache_read.get_total_bars_count() * std::mem::size_of::<Bar>();
246 }
247 }
248
249 total
250 }
251}
252
253#[async_trait]
254impl Feeder for BybitFeeder {
255 type DepthMessage = OrderbookResponse;
256 type TradeMessage = MarketTradeResponse;
257
258 /// Get a realtime shared orderbook for a symbol
259 async fn get_shared_orderbook(
260 &self,
261 instrument_id: &InstrumentId,
262 ) -> Result<SharedSimdOrderBook> {
263 // Extract symbol from InstrumentId
264 let symbol = &instrument_id.symbol;
265
266 // Check if we already have an orderbook for this symbol
267 if let Some(orderbook) = self.orderbooks.read().get(symbol) {
268 return Ok(orderbook.clone());
269 }
270
271 // If not found, create a new one
272 let orderbook = rusty_model::data::orderbook::OrderBook::<64>::new(
273 symbol.clone(),
274 0, // exchange_timestamp_ns - will be updated on first message
275 self.clock.raw(), // system_timestamp_ns
276 SmallVec::<[PriceLevel; 64]>::new(), // empty bids
277 SmallVec::<[PriceLevel; 64]>::new(), // empty asks
278 );
279 let shared_orderbook = SharedSimdOrderBook::from_orderbook(&orderbook);
280
281 // Store it for future use
282 self.orderbooks
283 .write()
284 .insert(symbol.clone(), shared_orderbook.clone());
285
286 Ok(shared_orderbook)
287 }
288
289 /// Get a bar cache for a symbol and bar type
290 async fn get_bar_cache(
291 &self,
292 instrument_id: &InstrumentId,
293 bar_type: &BarType,
294 max_bars: usize,
295 ) -> Result<Arc<RwLock<BarCache>>> {
296 // Extract symbol from InstrumentId
297 let symbol = &instrument_id.symbol;
298
299 // Get interval from bar_type
300 let interval_sec = match bar_type.get_spec().aggregation {
301 rusty_model::data::bar::BarAggregation::Second => bar_type.get_spec().step,
302 rusty_model::data::bar::BarAggregation::Minute => bar_type.get_spec().step * 60,
303 rusty_model::data::bar::BarAggregation::Hour => bar_type.get_spec().step * 3600,
304 rusty_model::data::bar::BarAggregation::Day => bar_type.get_spec().step * 86400,
305 _ => return Err(anyhow::anyhow!("Only time-based bars are supported")),
306 };
307
308 // Create cache key based on symbol and interval
309 let cache_key = String::from(format!("{symbol}:{interval_sec}s"));
310
311 // Check if we already have a cache for this symbol and interval
312 if let Some(cache) = self.bar_caches.read().get(&cache_key) {
313 // Check if we need to resize the cache
314 {
315 // Note: BarCache doesn't have resize method, max_bars is set at creation
316 // No need to access the cache since we can't change its capacity
317 }
318 return Ok(cache.clone());
319 }
320
321 // If not found, create a new one with the requested capacity
322 let new_cache = Arc::new(RwLock::new(BarCache::new()));
323
324 // Store it for future use
325 self.bar_caches.write().insert(cache_key, new_cache.clone());
326
327 Ok(new_cache)
328 }
329
330 async fn start_feed_depth(
331 &self,
332 instrument_id: InstrumentId,
333 mut depth_rx: mpsc::Receiver<Self::DepthMessage>,
334 options: Option<FeederOptions>,
335 ) -> Result<mpsc::Receiver<OrderBookSnapshot>> {
336 // Extract symbol from InstrumentId
337 let symbol = instrument_id.symbol.clone();
338
339 // Use provided channel buffer size or default
340 let buffer_size = options
341 .as_ref()
342 .map(|opt| opt.channel_buffer_size)
343 .unwrap_or(1024);
344
345 let (tx, rx) = mpsc::channel(buffer_size);
346
347 // Create a shared orderbook and store it
348 let ob = rusty_model::data::orderbook::OrderBook::<64>::new(
349 symbol.clone(),
350 0, // exchange_timestamp_ns - will be updated on first message
351 self.clock.raw(), // system_timestamp_ns
352 SmallVec::<[PriceLevel; 64]>::new(), // empty bids
353 SmallVec::<[PriceLevel; 64]>::new(), // empty asks
354 );
355 let shared_orderbook = SharedSimdOrderBook::from_orderbook(&ob);
356 self.orderbooks
357 .write()
358 .insert(symbol.clone(), shared_orderbook.clone());
359
360 // Create stop signal
361 let (stop_tx, _) = watch::channel(false);
362 let key = String::from(format!("depth:{symbol}"));
363 self.stop_signals.write().insert(key.clone(), stop_tx);
364 let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
365
366 // Clone clock for use in the task
367 let clock = self.clock.clone();
368
369 // Clone stats for updating feed statistics
370 let stats = self.stats.clone();
371 let stats_key = String::from(format!("depth:{symbol}"));
372
373 // Initialize stats for this feed if not already present
374 stats.write().entry(stats_key.clone()).or_default();
375
376 // Create order book state using SkipMap for efficient price level management
377 // SkipMap provides O(log n) insertion and lookup with sorted iteration
378 let bids = Arc::new(RwLock::new(SkipMap::new()));
379 let asks = Arc::new(RwLock::new(SkipMap::new()));
380 let sequence = Arc::new(RwLock::new(0u64));
381
382 // Get max depth preference from options
383 let max_depth = options
384 .as_ref()
385 .map(|opt| opt.max_depth_levels)
386 .unwrap_or(100);
387
388 // Use CPU pinning if specified in options
389 if let Some(options) = &options
390 && options.cpu_affinity >= 0
391 {
392 #[cfg(target_os = "linux")]
393 {
394 use core_affinity::set_for_current;
395 let _ = set_for_current(core_affinity::CoreId {
396 id: options.cpu_affinity as usize,
397 });
398 }
399 }
400
401 let batch_size = options.as_ref().map(|opt| opt.batch_size).unwrap_or(0);
402
403 tokio::spawn(async move {
404 loop {
405 tokio::select! {
406 // Check for stop signal
407 _ = stop_rx.changed() => {
408 break;
409 }
410
411 // Process incoming depth updates
412 Some(update) = depth_rx.recv() => {
413 // Capture processing start time for latency measurement
414 let process_start = clock.raw();
415
416 // Parse the orderbook data
417 if let Some(parsed) = ParsedOrderbookData::from_response(&update) {
418 // Skip if not the symbol we're interested in
419 if parsed.symbol != symbol {
420 continue;
421 }
422
423 // Check if this is a valid sequence
424 {
425 let current_seq = *sequence.read();
426 if current_seq > 0 && current_seq >= parsed.sequence {
427 // Out of order update, ignore
428 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
429 feed_stats.increment_dropped();
430 }
431 continue;
432 }
433
434 // Update sequence
435 *sequence.write() = parsed.sequence;
436 }
437
438 // Update the bids - use write lock for minimum duration
439 {
440 let mut bids_lock = bids.write();
441
442 // Fast path for small updates (common case) - process in batch
443 if parsed.bids.len() <= 8 {
444 // This is optimized for the common case where we have few price levels
445 // to update, which is typical for Bybit's incremental updates
446 for (price, size) in &parsed.bids {
447 if size.is_zero() {
448 bids_lock.remove(price);
449 } else {
450 bids_lock.insert(*price, *size);
451 }
452 }
453 } else if batch_size > 0 && parsed.bids.len() > batch_size {
454 // Batch processing for large updates
455 // Process in chunks to avoid holding the lock for too long
456 let chunk_size = batch_size.min(parsed.bids.len());
457
458 for chunk in parsed.bids.chunks(chunk_size) {
459 for (price, size) in chunk {
460 if size.is_zero() {
461 bids_lock.remove(price);
462 } else {
463 bids_lock.insert(*price, *size);
464 }
465 }
466 }
467 } else {
468 // Standard case - process all at once
469 for (price, size) in &parsed.bids {
470 if size.is_zero() {
471 bids_lock.remove(price);
472 } else {
473 bids_lock.insert(*price, *size);
474 }
475 }
476 }
477 }
478
479 // Update the asks - use write lock for minimum duration
480 {
481 let mut asks_lock = asks.write();
482
483 // Fast path for small updates (common case) - process in batch
484 if parsed.asks.len() <= 8 {
485 // This is optimized for the common case where we have few price levels
486 // to update, which is typical for Bybit's incremental updates
487 for (price, size) in &parsed.asks {
488 if size.is_zero() {
489 asks_lock.remove(price);
490 } else {
491 asks_lock.insert(*price, *size);
492 }
493 }
494 } else if batch_size > 0 && parsed.asks.len() > batch_size {
495 // Batch processing for large updates
496 // Process in chunks to avoid holding the lock for too long
497 let chunk_size = batch_size.min(parsed.asks.len());
498
499 for chunk in parsed.asks.chunks(chunk_size) {
500 for (price, size) in chunk {
501 if size.is_zero() {
502 asks_lock.remove(price);
503 } else {
504 asks_lock.insert(*price, *size);
505 }
506 }
507 }
508 } else {
509 // Standard case - process all at once
510 for (price, size) in &parsed.asks {
511 if size.is_zero() {
512 asks_lock.remove(price);
513 } else {
514 asks_lock.insert(*price, *size);
515 }
516 }
517 }
518 }
519
520 // Create a snapshot of the current order book using SmallVec for stack allocation
521 // Pre-allocate with capacity to avoid heap allocations
522 let mut bids_vec = smallvec::SmallVec::<[PriceLevel; 64]>::with_capacity(max_depth);
523 let mut asks_vec = smallvec::SmallVec::<[PriceLevel; 64]>::with_capacity(max_depth);
524
525 // Convert bids and asks to SmallVec with minimal lock holding time
526 {
527 let bids_guard = bids.read();
528 let mut count = 0;
529 for (price, size) in bids_guard.iter().rev() { // Reverse to get highest bids first
530 bids_vec.push(PriceLevel::new(*price, *size));
531 count += 1;
532 if count >= max_depth {
533 break;
534 }
535 }
536 }
537
538 {
539 let asks_guard = asks.read();
540 let mut count = 0;
541 for (price, size) in asks_guard.iter() { // Iterate forward to get lowest asks first
542 asks_vec.push(PriceLevel::new(*price, *size));
543 count += 1;
544 if count >= max_depth {
545 break;
546 }
547 }
548 }
549
550 let seq = *sequence.read();
551
552 // Calculate lengths before moving the vectors
553 let bids_len = bids_vec.len();
554 let asks_len = asks_vec.len();
555
556 // Update the shared orderbook for anyone holding a reference
557 let bids_levels: SmallVec<[PriceLevel; 64]> = parsed.bids
558 .iter()
559 .map(|&(price, quantity)| PriceLevel::new(price, quantity))
560 .collect();
561 let asks_levels: SmallVec<[PriceLevel; 64]> = parsed.asks
562 .iter()
563 .map(|&(price, quantity)| PriceLevel::new(price, quantity))
564 .collect();
565
566 let model_orderbook = rusty_model::data::orderbook::OrderBook::<64>::new(
567 instrument_id.to_string(),
568 seq * 1_000_000, // Convert to nanoseconds
569 parsed.timestamp,
570 bids_levels,
571 asks_levels,
572 );
573 shared_orderbook.write(|ob| {
574 *ob = SimdOrderBook::from_orderbook(&model_orderbook);
575 });
576
577 // Create OrderBookSnapshot from the current state
578 let depth = OrderBookSnapshot::new(
579 instrument_id.clone(),
580 bids_vec,
581 asks_vec,
582 seq,
583 parsed.timestamp,
584 process_start,
585 );
586
587 // Calculate processing latency
588 let process_end = clock.raw();
589 let latency_ns = process_end.saturating_sub(process_start);
590
591 // Update statistics
592 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
593 // Update message count
594 feed_stats.increment_processed();
595
596 // Update latency statistics (avg, max, p99)
597 feed_stats.add_latency_sample(latency_ns);
598
599 // Record timestamp of the last update
600 feed_stats.last_update_time = process_end;
601
602 // Estimate memory usage for this orderbook update
603 // We count the orderbook levels and message overhead
604 let orderbook_memory =
605 std::mem::size_of::<OrderBookSnapshot>() +
606 (bids_len + asks_len) * std::mem::size_of::<PriceLevel>();
607
608 // Update memory estimate using the FeedStats helper method
609 feed_stats.update_memory_usage(orderbook_memory);
610 }
611
612 // Send the order book snapshot
613 if tx.send(depth).await.is_err() {
614 // Track dropped message if send fails
615 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
616 feed_stats.increment_dropped();
617 }
618 break;
619 }
620 }
621 }
622 }
623 }
624 });
625
626 Ok(rx)
627 }
628
629 async fn stop_feed_depth(&self, instrument_id: &InstrumentId) -> Result<()> {
630 let symbol = &instrument_id.symbol;
631 let key = String::from(format!("depth:{symbol}"));
632 if let Some(tx) = self.stop_signals.write().remove(&key) {
633 let _ = tx.send(true);
634 }
635
636 Ok(())
637 }
638
639 async fn start_feed_trades(
640 &self,
641 instrument_id: InstrumentId,
642 mut trade_rx: mpsc::Receiver<Self::TradeMessage>,
643 options: Option<FeederOptions>,
644 ) -> Result<mpsc::Receiver<MarketTrade>> {
645 // Extract symbol from InstrumentId
646 let symbol = instrument_id.symbol.clone();
647
648 // Use provided channel buffer size or default
649 let buffer_size = options
650 .as_ref()
651 .map(|opt| opt.channel_buffer_size)
652 .unwrap_or(1024);
653
654 let (tx, rx) = mpsc::channel(buffer_size);
655
656 // Create stop signal
657 let (stop_tx, _) = watch::channel(false);
658 let key = String::from(format!("trades:{symbol}"));
659 self.stop_signals.write().insert(key.clone(), stop_tx);
660 let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
661
662 // Clone clock for use in the task
663 let clock = self.clock.clone();
664
665 // Clone stats for updating feed statistics
666 let stats = self.stats.clone();
667 let stats_key = String::from(format!("trades:{symbol}"));
668
669 // Initialize stats for this feed if not already present
670 stats.write().entry(stats_key.clone()).or_default();
671
672 // Use CPU pinning if specified in options
673 if let Some(options) = &options
674 && options.cpu_affinity >= 0
675 {
676 #[cfg(target_os = "linux")]
677 {
678 use core_affinity::set_for_current;
679 let _ = set_for_current(core_affinity::CoreId {
680 id: options.cpu_affinity as usize,
681 });
682 }
683 }
684
685 // Check if zero-copy processing is enabled
686 let _use_zero_copy = options
687 .as_ref()
688 .map(|opt| opt.use_zero_copy)
689 .unwrap_or(false);
690
691 // Note: Prefetch settings removed - hardware prefetcher handles sequential access better
692 // Focus on other optimizations: CPU pinning, batch processing, cache alignment
693
694 tokio::spawn(async move {
695 loop {
696 tokio::select! {
697 // Check for stop signal
698 _ = stop_rx.changed() => {
699 break;
700 }
701
702 // Process incoming trade messages
703 Some(trade_msg) = trade_rx.recv() => {
704 // Process each trade in the response
705 for trade_data in &trade_msg.data {
706 // Capture trade timestamp and processing start time for latency measurement
707 let event_timestamp = clock.now();
708 let process_start = clock.raw();
709
710 // Skip if not the symbol we're interested in
711 if trade_data.symbol != symbol {
712 continue;
713 }
714
715 // Convert to Trade type
716 let side = match trade_data.side.as_str() {
717 "Buy" => OrderSide::Buy,
718 "Sell" => OrderSide::Sell,
719 _ => {
720 // Update dropped message count on error
721 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
722 feed_stats.increment_dropped();
723 }
724 continue;
725 },
726 };
727
728 let trade = MarketTrade {
729 timestamp: event_timestamp,
730 exchange_time_ns: trade_data.trade_time_ms * 1_000_000, // Convert ms to ns
731 price: trade_data.price,
732 quantity: trade_data.size,
733 direction: side,
734 instrument_id: instrument_id.clone(),
735 };
736
737 // Calculate processing latency
738 let process_end = clock.raw();
739 let latency_ns = process_end.saturating_sub(process_start);
740
741 // Update statistics
742 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
743 // Update message count
744 feed_stats.increment_processed();
745
746 // Update latency statistics (avg, max, p99)
747 feed_stats.add_latency_sample(latency_ns);
748
749 // Record timestamp of the last update
750 feed_stats.last_update_time = process_end;
751
752 // Estimate memory usage for this trade update
753 let memory_estimate = std::mem::size_of::<MarketTrade>();
754
755 // Update memory estimate using the FeedStats helper method
756 feed_stats.update_memory_usage(memory_estimate);
757 }
758
759 // Send the trade
760 if tx.send(trade).await.is_err() {
761 // Update dropped message count on error
762 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
763 feed_stats.increment_dropped();
764 }
765 break;
766 }
767 }
768 }
769 }
770 }
771 });
772
773 Ok(rx)
774 }
775
776 async fn stop_feed_trades(&self, instrument_id: &InstrumentId) -> Result<()> {
777 let symbol = &instrument_id.symbol;
778 let key = String::from(format!("trades:{symbol}"));
779 if let Some(tx) = self.stop_signals.write().remove(&key) {
780 let _ = tx.send(true);
781 }
782
783 Ok(())
784 }
785
786 async fn start_feed_bars(
787 &self,
788 instrument_id: InstrumentId,
789 bar_type: BarType,
790 mut trade_rx: mpsc::Receiver<MarketTrade>,
791 options: Option<crate::feeder::FeederOptions>,
792 ) -> Result<mpsc::Receiver<Bar>> {
793 // Extract symbol from InstrumentId
794 let symbol = instrument_id.symbol.clone();
795
796 // Get interval from bar_type
797 let interval_sec = match bar_type.get_spec().aggregation {
798 rusty_model::data::bar::BarAggregation::Second => bar_type.get_spec().step,
799 rusty_model::data::bar::BarAggregation::Minute => bar_type.get_spec().step * 60,
800 rusty_model::data::bar::BarAggregation::Hour => bar_type.get_spec().step * 3600,
801 rusty_model::data::bar::BarAggregation::Day => bar_type.get_spec().step * 86400,
802 _ => return Err(anyhow::anyhow!("Only time-based bars are supported")),
803 };
804
805 // Use provided channel buffer size or default
806 let buffer_size = options
807 .as_ref()
808 .map(|opt| opt.channel_buffer_size)
809 .unwrap_or(1024);
810
811 let (tx, rx) = mpsc::channel(buffer_size);
812
813 // Create stop signal
814 let (stop_tx, _) = watch::channel(false);
815 let key = String::from(format!("bars:{symbol}:{interval_sec}"));
816 self.stop_signals.write().insert(key.clone(), stop_tx);
817 let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
818
819 // Clone clock for use in the task
820 let clock = self.clock.clone();
821
822 // Clone stats for updating feed statistics
823 let stats = self.stats.clone();
824 let stats_key = String::from(format!("bars:{symbol}:{interval_sec}"));
825
826 // Initialize stats for this feed if not already present
827 stats.write().entry(stats_key.clone()).or_default();
828
829 // Set up bar cache for storing historical bars if not already present
830 let cache_key = String::from(format!("{symbol}:{interval_sec}s"));
831 let bar_cache = if let Some(existing_cache) = self.bar_caches.read().get(&cache_key) {
832 existing_cache.clone()
833 } else {
834 let new_cache = Arc::new(RwLock::new(BarCache::new())); // Default cache
835 self.bar_caches.write().insert(cache_key, new_cache.clone());
836 new_cache
837 };
838
839 // Use CPU pinning if specified in options
840 if let Some(options) = &options
841 && options.cpu_affinity >= 0
842 {
843 #[cfg(target_os = "linux")]
844 {
845 use core_affinity::set_for_current;
846 let _ = set_for_current(core_affinity::CoreId {
847 id: options.cpu_affinity as usize,
848 });
849 }
850 }
851
852 // Use SmallVec for efficient batch processing if batching is enabled
853 let batch_size = options.as_ref().map(|opt| opt.batch_size).unwrap_or(0);
854
855 // Bar aggregation state - using Option<Decimal> for better performance than default values
856 // This avoids unnecessary copies of Decimal values
857 let mut open = None;
858 let mut high = None;
859 let mut low = None;
860 let mut close = None;
861 let mut volume = rust_decimal::Decimal::ZERO;
862 let mut buy_volume = rust_decimal::Decimal::ZERO;
863 let mut sell_volume = rust_decimal::Decimal::ZERO;
864 let mut trade_count = 0u64;
865 let mut last_bar_time = 0u64;
866 let mut last_trade_time = 0u64;
867
868 tokio::spawn(async move {
869 // Pre-allocate a batch buffer if batching is enabled
870 #[allow(unused_mut)] // Might be unused if batch_size is 0
871 let mut trade_batch = if batch_size > 0 {
872 smallvec::SmallVec::<[MarketTrade; 32]>::with_capacity(batch_size)
873 } else {
874 smallvec::SmallVec::<[MarketTrade; 32]>::new()
875 };
876
877 loop {
878 tokio::select! {
879 // Check for stop signal
880 _ = stop_rx.changed() => {
881 // Send the final bar before stopping if we have data
882 if let (Some(o), Some(h), Some(l), Some(c)) = (open, high, low, close)
883 && volume > rust_decimal::Decimal::ZERO {
884 let bar_type = rusty_model::data::bar::BarType::new_standard(
885 instrument_id.symbol.to_string().into(),
886 rusty_model::data::bar::BarAggregation::Second,
887 interval_sec
888 );
889
890 let bar = Bar {
891 bar_type,
892 open: o,
893 high: h,
894 low: l,
895 close: c,
896 volume,
897 timestamp_ns: clock.raw(),
898 };
899
900 // Update bar cache
901 bar_cache.write().add_bar(bar.clone());
902
903 // Send the bar and ignore any send errors since we're shutting down
904 let _ = tx.try_send(bar);
905 }
906 break;
907 }
908
909 // Process incoming trades
910 Some(trade) = trade_rx.recv() => {
911 // Capture processing start time for latency measurement
912 let process_start = clock.raw();
913
914 // If batching is enabled, add to batch or process immediately otherwise
915 if batch_size > 0 {
916 // Remove ineffective batch prefetching:
917 // - Prefetching elements we're about to process immediately
918 // - Small trade objects benefit more from cache locality than prefetch
919 // - Hardware prefetcher already handles sequential access patterns
920
921 trade_batch.push(trade);
922
923 // Process batch if full
924 if trade_batch.len() >= batch_size {
925 // Process the batch of trades
926 for trade in trade_batch.drain(..) {
927 process_trade(
928 &trade,
929 clock.raw(),
930 interval_sec,
931 &mut open,
932 &mut high,
933 &mut low,
934 &mut close,
935 &mut volume,
936 &mut buy_volume,
937 &mut sell_volume,
938 &mut trade_count,
939 &mut last_bar_time,
940 &mut last_trade_time,
941 &tx,
942 &bar_cache,
943 &instrument_id,
944 &symbol,
945 &clock,
946 &stats,
947 &stats_key,
948 process_start,
949 ).await;
950 }
951 }
952 } else {
953 // Process trades one at a time
954 process_trade(
955 &trade,
956 clock.raw(),
957 interval_sec,
958 &mut open,
959 &mut high,
960 &mut low,
961 &mut close,
962 &mut volume,
963 &mut buy_volume,
964 &mut sell_volume,
965 &mut trade_count,
966 &mut last_bar_time,
967 &mut last_trade_time,
968 &tx,
969 &bar_cache,
970 &instrument_id,
971 &symbol,
972 &clock,
973 &stats,
974 &stats_key,
975 process_start,
976 ).await;
977 }
978 }
979 }
980 }
981 });
982
983 Ok(rx)
984 }
985
986 async fn stop_feed_bars(&self, instrument_id: &InstrumentId, bar_type: &BarType) -> Result<()> {
987 let symbol = &instrument_id.symbol;
988
989 // Get interval from bar_type
990 let interval_sec = match bar_type.get_spec().aggregation {
991 rusty_model::data::bar::BarAggregation::Second => bar_type.get_spec().step,
992 rusty_model::data::bar::BarAggregation::Minute => bar_type.get_spec().step * 60,
993 rusty_model::data::bar::BarAggregation::Hour => bar_type.get_spec().step * 3600,
994 rusty_model::data::bar::BarAggregation::Day => bar_type.get_spec().step * 86400,
995 _ => return Err(anyhow::anyhow!("Only time-based bars are supported")),
996 };
997 let key = String::from(format!("bars:{symbol}:{interval_sec}"));
998 if let Some(tx) = self.stop_signals.write().remove(&key) {
999 let _ = tx.send(true);
1000 }
1001
1002 Ok(())
1003 }
1004
1005 /// Get feed statistics for a specific instrument
1006 /// Returns performance data for the instrument's feeds
1007 async fn get_stats(&self, instrument_id: &InstrumentId) -> Result<FeedStats> {
1008 let symbol = &instrument_id.symbol;
1009
1010 // Create a merged stats object from all feeds for this instrument
1011 let mut merged_stats = FeedStats::default();
1012
1013 // Acquire read lock once to minimize contention
1014 let stats_guard = self.stats.read();
1015
1016 // Check for depth feed stats
1017 let depth_key = String::from(format!("depth:{symbol}"));
1018 if let Some(depth_stats) = stats_guard.get(&depth_key) {
1019 merged_stats.messages_processed += depth_stats.messages_processed;
1020 merged_stats.dropped_messages += depth_stats.dropped_messages;
1021
1022 // Take the max of latencies
1023 merged_stats.max_process_latency_ns = merged_stats
1024 .max_process_latency_ns
1025 .max(depth_stats.max_process_latency_ns);
1026
1027 // For avg latency, we need a weighted average
1028 if merged_stats.messages_processed > 0 {
1029 merged_stats.avg_process_latency_ns = (merged_stats.avg_process_latency_ns
1030 * (merged_stats.messages_processed - depth_stats.messages_processed)
1031 + depth_stats.avg_process_latency_ns * depth_stats.messages_processed)
1032 / merged_stats.messages_processed;
1033 } else {
1034 merged_stats.avg_process_latency_ns = depth_stats.avg_process_latency_ns;
1035 }
1036
1037 // Update last update time if more recent
1038 if depth_stats.last_update_time > merged_stats.last_update_time {
1039 merged_stats.last_update_time = depth_stats.last_update_time;
1040 }
1041
1042 // Accumulate memory usage
1043 merged_stats.memory_usage_bytes += depth_stats.memory_usage_bytes;
1044 }
1045
1046 // Check for trades feed stats
1047 let trades_key = String::from(format!("trades:{symbol}"));
1048 if let Some(trades_stats) = stats_guard.get(&trades_key) {
1049 merged_stats.messages_processed += trades_stats.messages_processed;
1050 merged_stats.dropped_messages += trades_stats.dropped_messages;
1051
1052 // Take the max of latencies
1053 merged_stats.max_process_latency_ns = merged_stats
1054 .max_process_latency_ns
1055 .max(trades_stats.max_process_latency_ns);
1056
1057 // For avg latency, we need a weighted average
1058 if merged_stats.messages_processed > 0 {
1059 merged_stats.avg_process_latency_ns = (merged_stats.avg_process_latency_ns
1060 * (merged_stats.messages_processed - trades_stats.messages_processed)
1061 + trades_stats.avg_process_latency_ns * trades_stats.messages_processed)
1062 / merged_stats.messages_processed;
1063 } else {
1064 merged_stats.avg_process_latency_ns = trades_stats.avg_process_latency_ns;
1065 }
1066
1067 // Update last update time if more recent
1068 if trades_stats.last_update_time > merged_stats.last_update_time {
1069 merged_stats.last_update_time = trades_stats.last_update_time;
1070 }
1071
1072 // Accumulate memory usage
1073 merged_stats.memory_usage_bytes += trades_stats.memory_usage_bytes;
1074 }
1075
1076 // Check for bar feed stats - all intervals
1077 for (key, stats) in stats_guard.iter() {
1078 if key.starts_with(&format!("bars:{symbol}:")) {
1079 merged_stats.messages_processed += stats.messages_processed;
1080 merged_stats.dropped_messages += stats.dropped_messages;
1081
1082 // Take the max of latencies
1083 merged_stats.max_process_latency_ns = merged_stats
1084 .max_process_latency_ns
1085 .max(stats.max_process_latency_ns);
1086
1087 // Update last update time if more recent
1088 if stats.last_update_time > merged_stats.last_update_time {
1089 merged_stats.last_update_time = stats.last_update_time;
1090 }
1091
1092 // Accumulate memory usage
1093 merged_stats.memory_usage_bytes += stats.memory_usage_bytes;
1094 }
1095 }
1096
1097 // Calculate messages per second based on last update and clock
1098 if merged_stats.last_update_time > 0 {
1099 let current_time = self.clock.raw();
1100 let time_diff_seconds = (current_time - merged_stats.last_update_time) / 1_000_000_000;
1101
1102 if time_diff_seconds > 0 {
1103 merged_stats.messages_per_second =
1104 merged_stats.messages_processed as f64 / time_diff_seconds as f64;
1105 }
1106 }
1107
1108 Ok(merged_stats)
1109 }
1110
1111 /// Reset all feed statistics
1112 /// Clears all collected performance statistics
1113 async fn reset_stats(&self) -> Result<()> {
1114 // Reset all stats to default values
1115 let mut stats_guard = self.stats.write();
1116
1117 for stats in stats_guard.values_mut() {
1118 *stats = FeedStats::default();
1119 }
1120
1121 Ok(())
1122 }
1123}
1124
1125/// Helper function to process a single trade for bar aggregation
1126/// Extracted to reduce code duplication when processing batches
1127#[inline]
1128#[allow(clippy::too_many_arguments)]
1129async fn process_trade(
1130 trade: &MarketTrade,
1131 now: u64,
1132 interval_sec: u64,
1133 open: &mut Option<Decimal>,
1134 high: &mut Option<Decimal>,
1135 low: &mut Option<Decimal>,
1136 close: &mut Option<Decimal>,
1137 volume: &mut Decimal,
1138 buy_volume: &mut Decimal,
1139 sell_volume: &mut Decimal,
1140 trade_count: &mut u64,
1141 last_bar_time: &mut u64,
1142 last_trade_time: &mut u64,
1143 tx: &mpsc::Sender<Bar>,
1144 bar_cache: &Arc<RwLock<BarCache>>,
1145 instrument_id: &InstrumentId,
1146 _symbol: &str,
1147 clock: &Clock,
1148 stats: &Arc<RwLock<FxHashMap<String, FeedStats>>>,
1149 stats_key: &str,
1150 process_start: u64,
1151) {
1152 // Update last trade time
1153 // Convert Instant to u64 nanoseconds
1154 *last_trade_time = trade
1155 .timestamp
1156 .duration_since(quanta::Instant::recent())
1157 .as_nanos() as u64;
1158
1159 // Calculate current interval
1160 let current_interval = now / 1_000_000_000 / interval_sec;
1161
1162 // Update volume based on trade direction
1163 match trade.direction {
1164 OrderSide::Buy => *buy_volume += trade.quantity,
1165 OrderSide::Sell => *sell_volume += trade.quantity,
1166 }
1167
1168 // Check if we need to start a new bar
1169 if *last_bar_time == 0 {
1170 // First trade
1171 *open = Some(trade.price);
1172 *high = Some(trade.price);
1173 *low = Some(trade.price);
1174 *close = Some(trade.price);
1175 *volume = trade.quantity;
1176 *trade_count = 1;
1177 *last_bar_time = current_interval;
1178 } else if current_interval > *last_bar_time {
1179 // New bar interval - send the current bar and start a new one
1180 if let (Some(o), Some(h), Some(l), Some(c)) = (*open, *high, *low, *close) {
1181 // Create a BarType for the interval
1182 let bar_type = rusty_model::data::bar::BarType::new_standard(
1183 instrument_id.symbol.clone(),
1184 rusty_model::data::bar::BarAggregation::Second,
1185 interval_sec,
1186 );
1187
1188 let bar = Bar {
1189 bar_type,
1190 open: o,
1191 high: h,
1192 low: l,
1193 close: c,
1194 volume: *volume,
1195 timestamp_ns: clock.raw(),
1196 };
1197
1198 // Update bar cache
1199 bar_cache.write().add_bar(bar.clone());
1200
1201 // Send the bar
1202 if tx.send(bar).await.is_err() {
1203 // Update dropped message count on error
1204 if let Some(feed_stats) = stats.write().get_mut(stats_key) {
1205 feed_stats.increment_dropped();
1206 }
1207 return;
1208 }
1209 }
1210
1211 // Start a new bar
1212 *open = Some(trade.price);
1213 *high = Some(trade.price);
1214 *low = Some(trade.price);
1215 *close = Some(trade.price);
1216 *volume = trade.quantity;
1217 *buy_volume = rust_decimal::Decimal::ZERO;
1218 *sell_volume = rust_decimal::Decimal::ZERO;
1219
1220 // Reset volume counts
1221 match trade.direction {
1222 OrderSide::Buy => *buy_volume = trade.quantity,
1223 OrderSide::Sell => *sell_volume = trade.quantity,
1224 }
1225
1226 *trade_count = 1;
1227 *last_bar_time = current_interval;
1228 } else {
1229 // Update the current bar
1230 *high = Some(std::cmp::max(high.unwrap_or(trade.price), trade.price));
1231 *low = Some(std::cmp::min(low.unwrap_or(trade.price), trade.price));
1232 *close = Some(trade.price);
1233 *volume += trade.quantity;
1234 *trade_count += 1;
1235 }
1236
1237 // Calculate processing latency
1238 let process_end = clock.raw();
1239 let latency_ns = process_end.saturating_sub(process_start);
1240
1241 // Update statistics
1242 if let Some(feed_stats) = stats.write().get_mut(stats_key) {
1243 // Update message count
1244 feed_stats.increment_processed();
1245
1246 // Update latency statistics (avg, max, p99)
1247 feed_stats.add_latency_sample(latency_ns);
1248
1249 // Record timestamp of the last update
1250 feed_stats.last_update_time = process_end;
1251
1252 // Estimate memory usage - this is approximate but helpful for monitoring
1253 // Trade size + orderbook memory if this is the first trade
1254 let mut memory_estimate = std::mem::size_of::<MarketTrade>();
1255
1256 // If this is a bar completion, add the bar size
1257 if current_interval > *last_bar_time && *volume > rust_decimal::Decimal::ZERO {
1258 memory_estimate += std::mem::size_of::<Bar>();
1259 }
1260
1261 // Update the memory usage estimate using the FeedStats helper method
1262 feed_stats.update_memory_usage(memory_estimate);
1263 }
1264}