rusty_feeder/
feeder.rs

1use std::fmt::Debug;
2use std::sync::Arc;
3
4use anyhow::Result;
5use async_trait::async_trait;
6use parking_lot::RwLock;
7use rusty_model::{
8    data::{
9        bar::{Bar, BarCache, BarType},
10        book_snapshot::OrderBookSnapshot,
11        market_trade::MarketTrade,
12        simd_orderbook::SharedSimdOrderBook,
13    },
14    enums::OrderSide,
15    instruments::InstrumentId,
16};
17use smallvec::SmallVec;
18use tokio::sync::mpsc;
19
20/// Performance statistics for feed processing in HFT applications
21///
22/// This structure provides comprehensive statistics for monitoring, tuning, and
23/// debugging high-frequency trading data feeds. It tracks latency, throughput,
24/// and resource usage metrics that are critical for HFT operations.
25///
26/// Key features:
27/// - Zero-allocation percentile calculations using fixed-size arrays
28/// - Efficient EWMA (Exponentially Weighted Moving Average) for smooth metrics
29/// - Cache-line alignment for optimal CPU cache efficiency
30/// - Detailed memory usage tracking for resource monitoring
31/// - Thread-safe and lock-free metrics updates
32///
33/// # Performance Characteristics
34///
35/// The statistics collection is designed to have minimal impact on the critical path:
36/// - O(1) update operations for most metrics
37/// - O(n log n) for percentile calculations but limited to small fixed-size arrays
38/// - Zero heap allocations during normal operation
39/// - Minimal cache contention through 64-byte alignment
40///
41/// # Example Usage
42///
43/// ```no_run
44/// use rusty_feeder::feeder::FeedStats;
45///
46/// // Create new statistics tracker
47/// let mut stats = FeedStats::default();
48///
49/// // Update with a latency sample
50/// stats.add_latency_sample(250); // 250 nanoseconds
51///
52/// // Track memory usage
53/// stats.update_memory_usage(1024); // 1KB
54///
55/// // Print current statistics
56/// println!("Avg latency: {}ns, P99: {}ns, Max: {}ns",
57///     stats.avg_process_latency_ns,
58///     stats.p99_process_latency_ns,
59///     stats.max_process_latency_ns);
60/// ```
61#[derive(Debug, Clone)]
62#[repr(align(64))] // Cache-line alignment for better CPU cache efficiency
63pub struct FeedStats {
64    /// Total messages processed
65    pub messages_processed: u64,
66
67    /// Messages processed per second (smoothed)
68    pub messages_per_second: f64,
69
70    /// Average processing latency (nanoseconds)
71    pub avg_process_latency_ns: u64,
72
73    /// Maximum processing latency observed (nanoseconds)
74    pub max_process_latency_ns: u64,
75
76    /// 99th percentile processing latency (nanoseconds)
77    pub p99_process_latency_ns: u64,
78
79    /// Total dropped messages
80    pub dropped_messages: u64,
81
82    /// Last update timestamp (nanoseconds)
83    pub last_update_time: u64,
84
85    /// Memory usage for this feed (bytes)
86    pub memory_usage_bytes: usize,
87
88    /// Recent latency samples for calculating percentiles
89    /// Using a fixed-size array to avoid heap allocations
90    recent_latencies: [u64; 100],
91
92    /// Current position in the recent_latencies circular buffer
93    recent_latencies_pos: usize,
94
95    /// Number of samples collected in the circular buffer
96    recent_latencies_count: usize,
97}
98
99impl Default for FeedStats {
100    fn default() -> Self {
101        Self {
102            messages_processed: 0,
103            messages_per_second: 0.0,
104            avg_process_latency_ns: 0,
105            max_process_latency_ns: 0,
106            p99_process_latency_ns: 0,
107            dropped_messages: 0,
108            last_update_time: 0,
109            memory_usage_bytes: 0,
110            recent_latencies: [0; 100],
111            recent_latencies_pos: 0,
112            recent_latencies_count: 0,
113        }
114    }
115}
116
117impl FeedStats {
118    /// Add a new latency sample and update all latency statistics
119    ///
120    /// This method efficiently updates all latency-related metrics in one call:
121    /// - Exponentially Weighted Moving Average (EWMA) for stable average latency
122    /// - Maximum observed latency
123    /// - 99th percentile latency using a rolling window
124    ///
125    /// Latency metrics are critical for high-frequency trading systems where
126    /// microsecond or even nanosecond differences can impact trading performance.
127    ///
128    /// # Parameters
129    ///
130    /// * `latency_ns` - The measured latency in nanoseconds
131    ///
132    /// # Performance Characteristics
133    ///
134    /// - Updating avg and max latency is O(1)
135    /// - P99 calculation is O(n log n) but is only performed when enough samples are collected
136    /// - Uses fixed-size arrays (zero heap allocation)
137    /// - The implementation uses an insertion sort for small arrays which is more efficient
138    ///   than quicksort for arrays of size ~100 elements
139    #[inline]
140    pub fn add_latency_sample(&mut self, latency_ns: u64) {
141        // Update exponentially weighted moving average (EWMA)
142        self.avg_process_latency_ns = (self.avg_process_latency_ns * 9 + latency_ns) / 10;
143
144        // Update max latency
145        self.max_process_latency_ns = self.max_process_latency_ns.max(latency_ns);
146
147        // Store in circular buffer for percentile calculation
148        self.recent_latencies[self.recent_latencies_pos] = latency_ns;
149        self.recent_latencies_pos = (self.recent_latencies_pos + 1) % 100;
150        if self.recent_latencies_count < 100 {
151            self.recent_latencies_count += 1;
152        }
153
154        // Update p99 latency if we have enough samples
155        if self.recent_latencies_count >= 10 {
156            self.update_p99_latency();
157        }
158    }
159
160    /// Calculate the 99th percentile latency from the recent samples
161    #[inline]
162    fn update_p99_latency(&mut self) {
163        // Make a copy of the recent latencies using a stack-allocated array
164        let mut latencies = [0u64; 100];
165        let count = self.recent_latencies_count;
166
167        // Copy only the elements that have been initialized
168        latencies[..count].copy_from_slice(&self.recent_latencies[..count]);
169
170        // Sort the latencies in ascending order
171        // For small arrays, this is faster than quicksort
172        // Using insertion sort as it's efficient for small arrays
173        for i in 1..count {
174            let key = latencies[i];
175            let mut j = i;
176            while j > 0 && latencies[j - 1] > key {
177                latencies[j] = latencies[j - 1];
178                j -= 1;
179            }
180            latencies[j] = key;
181        }
182
183        // Calculate the 99th percentile index
184        let p99_index = ((count as f64) * 0.99) as usize;
185
186        // If we have enough samples, use the calculated index
187        // Otherwise, use the highest value we have
188        let index = if p99_index < count {
189            p99_index
190        } else {
191            count - 1
192        };
193
194        // Update the p99 latency
195        self.p99_process_latency_ns = latencies[index];
196    }
197
198    /// Increment the count of dropped messages
199    ///
200    /// This method should be called whenever a message is dropped due to
201    /// errors, buffer overflow, or other exceptional conditions. Tracking
202    /// dropped messages is critical for high-frequency trading systems
203    /// to detect data quality issues.
204    ///
205    /// # Performance Impact
206    ///
207    /// This operation is O(1) and has minimal performance impact.
208    #[inline]
209    pub const fn increment_dropped(&mut self) {
210        self.dropped_messages += 1;
211    }
212
213    /// Increment the count of processed messages
214    ///
215    /// This method should be called for every successfully processed message.
216    /// The counter is used to calculate throughput and other performance metrics.
217    ///
218    /// # Performance Impact
219    ///
220    /// This operation is O(1) and has minimal performance impact.
221    #[inline]
222    pub const fn increment_processed(&mut self) {
223        self.messages_processed += 1;
224    }
225
226    /// Update the memory usage estimate for resource monitoring
227    #[inline]
228    ///
229    /// This method applies an exponentially weighted moving average (EWMA) to
230    /// produce a stable estimate of memory usage over time, which is useful
231    /// for detecting memory leaks and tracking resource utilization.
232    ///
233    /// # Parameters
234    ///
235    /// * `new_bytes` - The latest memory usage sample in bytes
236    ///
237    /// # Notes
238    ///
239    /// The EWMA formula used gives 90% weight to historical data and 10% to
240    /// the new sample, which provides good stability while still responding
241    /// to significant changes. This approach is particularly useful for
242    /// high-frequency trading applications where small variations in memory
243    /// usage are expected but sustained trends are important to track.
244    ///
245    /// Memory usage is tracked per feed, allowing for granular monitoring
246    /// of different market data streams.
247    pub const fn update_memory_usage(&mut self, new_bytes: usize) {
248        // Using EWMA to stabilize the memory usage estimate
249        self.memory_usage_bytes = (self.memory_usage_bytes * 9 + new_bytes) / 10;
250    }
251}
252
253/// Configuration options for feeder performance tuning.
254///
255/// These options control various aspects of feed processing performance.
256/// The settings are optimized for high-frequency trading scenarios.
257///
258/// Fields:
259/// - `channel_buffer_size`: Channel buffer size for processed data.
260/// - `max_depth_levels`: Maximum depth levels to process for orderbooks.
261/// - `batch_size`: Batch size for processing messages (0 = no batching).
262/// - `cpu_affinity`: CPU core affinity for processing threads (-1 = no affinity).
263/// - `use_zero_copy`: Whether to use zero-copy processing when possible.
264/// - `max_backlog`: Maximum message backlog before dropping.
265///
266/// Note: The "prefetch options" field has been removed in this version.
267///
268#[derive(Debug, Clone)]
269pub struct FeederOptions {
270    /// Channel buffer size for processed data
271    pub channel_buffer_size: usize,
272
273    /// Maximum depth levels to process for orderbooks
274    pub max_depth_levels: usize,
275
276    /// Batch size for processing messages (0 = no batching)
277    pub batch_size: usize,
278
279    /// CPU core affinity for processing threads (-1 = no affinity)
280    pub cpu_affinity: i32,
281
282    /// Whether to use zero-copy processing when possible
283    pub use_zero_copy: bool,
284
285    /// Maximum message backlog before dropping
286    pub max_backlog: usize,
287}
288
289impl Default for FeederOptions {
290    fn default() -> Self {
291        Self {
292            channel_buffer_size: 1024,
293            max_depth_levels: 100,
294            batch_size: 32,
295            cpu_affinity: -1,
296            use_zero_copy: true,
297            max_backlog: 10000,
298        }
299    }
300}
301
302/// High-performance feeder interface for processing and normalizing exchange data
303/// Optimized for minimal allocations and low latency
304#[async_trait]
305pub trait Feeder: Send + Sync + 'static + Debug {
306    /// Raw message type from the exchange for depth/orderbook
307    type DepthMessage: Send + 'static;
308
309    /// Raw message type from the exchange for trades
310    type TradeMessage: Send + 'static;
311
312    /// Start processing and normalizing orderbook depth data
313    /// Processes raw depth messages into standardized order book depth format
314    async fn start_feed_depth(
315        &self,
316        instrument_id: InstrumentId,
317        depth_rx: mpsc::Receiver<Self::DepthMessage>,
318        options: Option<FeederOptions>,
319    ) -> Result<mpsc::Receiver<OrderBookSnapshot>>;
320
321    /// Stop processing orderbook depth data for a specific instrument
322    async fn stop_feed_depth(&self, instrument_id: &InstrumentId) -> Result<()>;
323
324    /// Start processing and normalizing trade data
325    /// Processes raw trade messages into standardized trade format
326    async fn start_feed_trades(
327        &self,
328        instrument_id: InstrumentId,
329        trade_rx: mpsc::Receiver<Self::TradeMessage>,
330        options: Option<FeederOptions>,
331    ) -> Result<mpsc::Receiver<MarketTrade>>;
332
333    /// Stop processing trade data for a specific instrument
334    async fn stop_feed_trades(&self, instrument_id: &InstrumentId) -> Result<()>;
335
336    /// Start generating bars from trade data
337    /// Aggregates trade data into OHLCV bars of the specified type
338    async fn start_feed_bars(
339        &self,
340        instrument_id: InstrumentId,
341        bar_type: BarType,
342        trade_rx: mpsc::Receiver<MarketTrade>,
343        options: Option<FeederOptions>,
344    ) -> Result<mpsc::Receiver<Bar>>;
345
346    /// Stop generating bars for a specific instrument and bar type
347    async fn stop_feed_bars(&self, instrument_id: &InstrumentId, bar_type: &BarType) -> Result<()>;
348
349    /// Get a real-time shared orderbook for a specific instrument
350    /// Returns a thread-safe shared orderbook that's kept up-to-date
351    async fn get_shared_orderbook(
352        &self,
353        instrument_id: &InstrumentId,
354    ) -> Result<SharedSimdOrderBook>;
355
356    /// Get a bar cache for a specific instrument and bar type
357    /// Returns a bar cache with recent bar data
358    async fn get_bar_cache(
359        &self,
360        instrument_id: &InstrumentId,
361        bar_type: &BarType,
362        max_bars: usize,
363    ) -> Result<Arc<RwLock<BarCache>>>;
364
365    /// Get feed statistics for a specific instrument
366    async fn get_stats(&self, _instrument_id: &InstrumentId) -> Result<FeedStats> {
367        Ok(FeedStats::default())
368    }
369
370    /// Reset all feed statistics
371    async fn reset_stats(&self) -> Result<()> {
372        Ok(())
373    }
374}
375
376/// Advanced orderbook analytics for HFT applications
377pub trait OrderbookAnalytics {
378    /// Calculate market impact for a given notional amount
379    /// Returns the estimated price impact as a percentage
380    fn calculate_market_impact(&self, notional_amount: f64, side: OrderSide) -> f64;
381
382    /// Get spread as a percentage of mid price
383    fn get_spread_percent(&self) -> f64;
384
385    /// Get the effective price for a given quantity
386    /// Returns the volume-weighted average execution price
387    fn get_effective_price(&self, quantity: f64, side: OrderSide) -> Option<f64>;
388
389    /// Get orderbook imbalance metric
390    /// Returns a value between -1.0 (all asks) and 1.0 (all bids)
391    fn get_imbalance(&self, levels: usize) -> f64;
392
393    /// Calculate liquidity at specified price levels
394    /// Returns total volume available within the price range
395    fn calculate_liquidity(&self, price_range_percent: f64, side: OrderSide) -> f64;
396
397    /// Calculate order book resistance levels
398    /// Returns price levels with significant volume
399    fn find_resistance_levels(&self, min_volume: f64, side: OrderSide)
400    -> SmallVec<[(f64, f64); 8]>;
401
402    /// Calculate weighted mid price based on volume
403    /// Returns a more accurate mid price weighted by volume at each level
404    fn weighted_mid_price(&self, depth_levels: usize) -> Option<f64>;
405
406    /// Calculate stability of the orderbook
407    /// Returns a stability score between 0.0 (unstable) and 1.0 (stable)
408    fn stability_score(&self) -> f64;
409
410    /// Calculate market pressure indicator
411    /// Returns a value between -1.0 (sell pressure) and 1.0 (buy pressure)
412    fn market_pressure(&self) -> f64;
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418
419    #[test]
420    fn test_feed_stats_default() {
421        let stats = FeedStats::default();
422        assert_eq!(stats.messages_processed, 0);
423        assert_eq!(stats.messages_per_second, 0.0);
424        assert_eq!(stats.avg_process_latency_ns, 0);
425        assert_eq!(stats.max_process_latency_ns, 0);
426        assert_eq!(stats.p99_process_latency_ns, 0);
427        assert_eq!(stats.dropped_messages, 0);
428        assert_eq!(stats.last_update_time, 0);
429        assert_eq!(stats.memory_usage_bytes, 0);
430    }
431
432    #[test]
433    fn test_feed_stats_increment_processed() {
434        let mut stats = FeedStats::default();
435
436        // Test single increment
437        stats.increment_processed();
438        assert_eq!(stats.messages_processed, 1);
439
440        // Test multiple increments
441        for _ in 0..10 {
442            stats.increment_processed();
443        }
444        assert_eq!(stats.messages_processed, 11);
445    }
446
447    #[test]
448    fn test_feed_stats_increment_dropped() {
449        let mut stats = FeedStats::default();
450
451        // Test single increment
452        stats.increment_dropped();
453        assert_eq!(stats.dropped_messages, 1);
454
455        // Test multiple increments
456        for _ in 0..5 {
457            stats.increment_dropped();
458        }
459        assert_eq!(stats.dropped_messages, 6);
460    }
461
462    #[test]
463    fn test_feed_stats_update_memory_usage() {
464        let mut stats = FeedStats::default();
465
466        // Test initial update
467        stats.update_memory_usage(1000);
468        assert_eq!(stats.memory_usage_bytes, 100); // (0*9 + 1000) / 10 = 100
469
470        // Test subsequent update with EWMA
471        stats.update_memory_usage(2000);
472        assert_eq!(stats.memory_usage_bytes, 290); // (100*9 + 2000) / 10 = 290
473
474        // Test another update
475        stats.update_memory_usage(3000);
476        assert_eq!(stats.memory_usage_bytes, 561); // (290*9 + 3000) / 10 = 561
477    }
478
479    #[test]
480    fn test_feed_stats_add_latency_sample() {
481        let mut stats = FeedStats::default();
482
483        // Test single sample
484        stats.add_latency_sample(100);
485        assert_eq!(stats.avg_process_latency_ns, 10); // (0*9 + 100) / 10 = 10
486        assert_eq!(stats.max_process_latency_ns, 100);
487
488        // Not enough samples for p99 yet
489        assert_eq!(stats.p99_process_latency_ns, 0);
490
491        // Add more samples to trigger p99 calculation
492        for i in 1..15 {
493            stats.add_latency_sample(i * 100);
494        }
495
496        // Check that max latency is updated
497        assert_eq!(stats.max_process_latency_ns, 1400);
498
499        // Check that p99 latency is calculated (should be close to max with 15 samples)
500        assert!(stats.p99_process_latency_ns > 0);
501        assert!(stats.p99_process_latency_ns <= stats.max_process_latency_ns);
502    }
503
504    #[test]
505    fn test_feed_stats_p99_calculation() {
506        let mut stats = FeedStats::default();
507
508        // Add 100 samples with values 1 to 100
509        for i in 1..=100 {
510            stats.add_latency_sample(i);
511        }
512
513        // P99 should be very close to 99
514        assert!(stats.p99_process_latency_ns >= 98);
515        assert!(stats.p99_process_latency_ns <= 100);
516
517        // Max should be 100
518        assert_eq!(stats.max_process_latency_ns, 100);
519    }
520
521    #[test]
522    fn test_feeder_options_default() {
523        let options = FeederOptions::default();
524        assert_eq!(options.channel_buffer_size, 1024);
525        assert_eq!(options.max_depth_levels, 100);
526        assert_eq!(options.batch_size, 32);
527        assert_eq!(options.cpu_affinity, -1);
528        assert!(options.use_zero_copy);
529        assert_eq!(options.max_backlog, 10000);
530    }
531}