rusty_bin/monitor/
lockfree_stats.rs

1//! Lock-free statistics collection for high-frequency monitoring
2//!
3//! This module provides lock-free atomic statistics collection optimized for
4//! high-frequency trading environments where minimal latency is critical.
5
6use parking_lot::RwLock;
7use quanta::Instant as QuantaInstant;
8use rusty_common::collections::FxHashMap;
9use smartstring::alias::String as SmartString;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13/// Lock-free statistics collector using atomic operations
14#[derive(Debug)]
15pub struct LockFreeStatsCollector {
16    // Core metrics using relaxed ordering for maximum performance
17    events_processed: AtomicU64,
18    trades_processed: AtomicU64,
19    orderbooks_processed: AtomicU64,
20    bytes_serialized: AtomicU64,
21    simd_batches_processed: AtomicU64,
22    buffer_reuse_count: AtomicU64,
23    zero_copy_operations: AtomicU64,
24
25    // Latency tracking with exponential moving average
26    avg_latency_nanos: AtomicU64,
27
28    // Error counters
29    connection_errors: AtomicU64,
30    parsing_errors: AtomicU64,
31    serialization_errors: AtomicU64,
32
33    // Per-exchange counters (using RwLock only for structure changes)
34    exchange_stats: Arc<RwLock<FxHashMap<SmartString, ExchangeStatsAtomic>>>,
35
36    // Start time for rate calculations
37    start_time: QuantaInstant,
38}
39
40/// Atomic statistics for individual exchanges
41#[derive(Debug)]
42struct ExchangeStatsAtomic {
43    messages_received: AtomicU64,
44    bytes_received: AtomicU64,
45    reconnections: AtomicU64,
46    last_message_timestamp: AtomicU64,
47    connection_duration_ms: AtomicU64,
48}
49
50impl ExchangeStatsAtomic {
51    const fn new() -> Self {
52        Self {
53            messages_received: AtomicU64::new(0),
54            bytes_received: AtomicU64::new(0),
55            reconnections: AtomicU64::new(0),
56            last_message_timestamp: AtomicU64::new(0),
57            connection_duration_ms: AtomicU64::new(0),
58        }
59    }
60}
61
62/// Snapshot of current statistics (non-atomic for reading)
63#[derive(Debug, Clone, Default)]
64pub struct StatsSnapshot {
65    /// Total number of events processed since startup
66    pub events_processed: u64,
67    /// Total number of trade messages processed
68    pub trades_processed: u64,
69    /// Total number of orderbook updates processed
70    pub orderbooks_processed: u64,
71    /// Total bytes serialized for outgoing messages
72    pub bytes_serialized: u64,
73    /// Number of SIMD batches processed for vectorized operations
74    pub simd_batches_processed: u64,
75    /// Number of buffer reuse operations to reduce memory allocations
76    pub buffer_reuse_count: u64,
77    /// Number of zero-copy operations performed for high-performance data handling
78    pub zero_copy_operations: u64,
79    /// Current average latency in nanoseconds using exponential moving average
80    pub avg_latency_nanos: u64,
81    /// Total number of connection errors encountered
82    pub connection_errors: u64,
83    /// Total number of parsing errors from malformed messages
84    pub parsing_errors: u64,
85    /// Total number of serialization errors during message encoding
86    pub serialization_errors: u64,
87    /// Current rate of events processed per second
88    pub events_per_second: f64,
89    /// Current rate of bytes processed per second
90    pub bytes_per_second: f64,
91    /// Compression ratio achieved for data transmission (0.0 if not calculated)
92    pub compression_ratio: f64,
93    /// Total uptime in seconds since statistics collection started
94    pub uptime_seconds: f64,
95    /// Per-exchange statistics breakdown
96    pub exchange_stats: FxHashMap<SmartString, ExchangeStats>,
97}
98
99/// Non-atomic exchange statistics for snapshots
100#[derive(Debug, Clone, Default)]
101pub struct ExchangeStats {
102    /// Total number of messages received from this exchange
103    pub messages_received: u64,
104    /// Total bytes received from this exchange
105    pub bytes_received: u64,
106    /// Number of reconnections to this exchange
107    pub reconnections: u64,
108    /// Timestamp of the last message received (nanoseconds since start)
109    pub last_message_timestamp: u64,
110    /// Total connection duration in milliseconds
111    pub connection_duration_ms: u64,
112    /// Current rate of messages received per second from this exchange
113    pub messages_per_second: f64,
114}
115
116impl LockFreeStatsCollector {
117    /// Create a new lock-free statistics collector
118    pub fn new() -> Self {
119        Self {
120            events_processed: AtomicU64::new(0),
121            trades_processed: AtomicU64::new(0),
122            orderbooks_processed: AtomicU64::new(0),
123            bytes_serialized: AtomicU64::new(0),
124            simd_batches_processed: AtomicU64::new(0),
125            buffer_reuse_count: AtomicU64::new(0),
126            zero_copy_operations: AtomicU64::new(0),
127            avg_latency_nanos: AtomicU64::new(0),
128            connection_errors: AtomicU64::new(0),
129            parsing_errors: AtomicU64::new(0),
130            serialization_errors: AtomicU64::new(0),
131            exchange_stats: Arc::new(RwLock::new(FxHashMap::default())),
132            start_time: QuantaInstant::now(),
133        }
134    }
135
136    /// Record a processed event (lock-free, relaxed ordering for maximum performance)
137    #[inline(always)]
138    pub fn record_event(&self) {
139        self.events_processed.fetch_add(1, Ordering::Relaxed);
140    }
141
142    /// Record processed trades in batch (lock-free)
143    #[inline(always)]
144    pub fn record_trades(&self, count: u64) {
145        self.trades_processed.fetch_add(count, Ordering::Relaxed);
146    }
147
148    /// Record processed orderbooks in batch (lock-free)
149    #[inline(always)]
150    pub fn record_orderbooks(&self, count: u64) {
151        self.orderbooks_processed
152            .fetch_add(count, Ordering::Relaxed);
153    }
154
155    /// Record serialized bytes (lock-free)
156    #[inline(always)]
157    pub fn record_bytes_serialized(&self, bytes: u64) {
158        self.bytes_serialized.fetch_add(bytes, Ordering::Relaxed);
159    }
160
161    /// Record SIMD batch processing (lock-free)
162    #[inline(always)]
163    pub fn record_simd_batch(&self) {
164        self.simd_batches_processed.fetch_add(1, Ordering::Relaxed);
165    }
166
167    /// Record buffer reuse (lock-free)
168    #[inline(always)]
169    pub fn record_buffer_reuse(&self) {
170        self.buffer_reuse_count.fetch_add(1, Ordering::Relaxed);
171    }
172
173    /// Record zero-copy operations (lock-free)
174    #[inline(always)]
175    pub fn record_zero_copy_ops(&self, count: u64) {
176        self.zero_copy_operations
177            .fetch_add(count, Ordering::Relaxed);
178    }
179
180    /// Update average latency using exponential moving average (lock-free)
181    #[inline(always)]
182    pub fn update_latency(&self, latency_nanos: u64) {
183        // Use exponential moving average: new_avg = 0.9 * old_avg + 0.1 * new_value
184        // Implemented with atomic operations to avoid locks
185        let current_avg = self.avg_latency_nanos.load(Ordering::Relaxed);
186        let new_avg = if current_avg == 0 {
187            latency_nanos
188        } else {
189            (current_avg * 9 + latency_nanos) / 10
190        };
191
192        // Use compare_exchange_weak for better performance on contended systems
193        let _ = self.avg_latency_nanos.compare_exchange_weak(
194            current_avg,
195            new_avg,
196            Ordering::Relaxed,
197            Ordering::Relaxed,
198        );
199    }
200
201    /// Record connection error (lock-free)
202    #[inline(always)]
203    pub fn record_connection_error(&self) {
204        self.connection_errors.fetch_add(1, Ordering::Relaxed);
205    }
206
207    /// Record parsing error (lock-free)
208    #[inline(always)]
209    pub fn record_parsing_error(&self) {
210        self.parsing_errors.fetch_add(1, Ordering::Relaxed);
211    }
212
213    /// Record serialization error (lock-free)
214    #[inline(always)]
215    pub fn record_serialization_error(&self) {
216        self.serialization_errors.fetch_add(1, Ordering::Relaxed);
217    }
218
219    /// Record exchange message (mostly lock-free, minimal lock contention)
220    #[inline(always)]
221    pub fn record_exchange_message(&self, exchange: &str, bytes: u64) {
222        let exchange_key: SmartString = exchange.into();
223
224        // Fast path: check if exchange stats already exist
225        {
226            let stats_read = self.exchange_stats.read();
227            if let Some(exchange_stats) = stats_read.get(&exchange_key) {
228                exchange_stats
229                    .messages_received
230                    .fetch_add(1, Ordering::Relaxed);
231                exchange_stats
232                    .bytes_received
233                    .fetch_add(bytes, Ordering::Relaxed);
234                exchange_stats.last_message_timestamp.store(
235                    QuantaInstant::now().elapsed().as_nanos() as u64,
236                    Ordering::Relaxed,
237                );
238                return;
239            }
240        }
241
242        // Slow path: need to create new exchange stats (rare)
243        let mut stats_write = self.exchange_stats.write();
244        let exchange_stats = stats_write
245            .entry(exchange_key)
246            .or_insert_with(ExchangeStatsAtomic::new);
247
248        exchange_stats
249            .messages_received
250            .fetch_add(1, Ordering::Relaxed);
251        exchange_stats
252            .bytes_received
253            .fetch_add(bytes, Ordering::Relaxed);
254        exchange_stats.last_message_timestamp.store(
255            QuantaInstant::now().elapsed().as_nanos() as u64,
256            Ordering::Relaxed,
257        );
258    }
259
260    /// Record exchange reconnection (mostly lock-free)
261    #[inline(always)]
262    pub fn record_exchange_reconnection(&self, exchange: &str) {
263        let exchange_key: SmartString = exchange.into();
264
265        // Fast path: check if exchange stats already exist
266        {
267            let stats_read = self.exchange_stats.read();
268            if let Some(exchange_stats) = stats_read.get(&exchange_key) {
269                exchange_stats.reconnections.fetch_add(1, Ordering::Relaxed);
270                return;
271            }
272        }
273
274        // Slow path: create new exchange stats
275        let mut stats_write = self.exchange_stats.write();
276        let exchange_stats = stats_write
277            .entry(exchange_key)
278            .or_insert_with(ExchangeStatsAtomic::new);
279        exchange_stats.reconnections.fetch_add(1, Ordering::Relaxed);
280    }
281
282    /// Get a snapshot of current statistics (mostly lock-free reads)
283    pub fn get_snapshot(&self) -> StatsSnapshot {
284        let uptime_seconds = self.start_time.elapsed().as_secs_f64();
285
286        // All atomic reads with relaxed ordering for maximum performance
287        let events_processed = self.events_processed.load(Ordering::Relaxed);
288        let bytes_serialized = self.bytes_serialized.load(Ordering::Relaxed);
289
290        // Calculate rates
291        let events_per_second = if uptime_seconds > 0.0 {
292            events_processed as f64 / uptime_seconds
293        } else {
294            0.0
295        };
296
297        let bytes_per_second = if uptime_seconds > 0.0 {
298            bytes_serialized as f64 / uptime_seconds
299        } else {
300            0.0
301        };
302
303        // Read exchange stats
304        let mut exchange_stats = FxHashMap::default();
305        {
306            let stats_read = self.exchange_stats.read();
307            for (exchange, atomic_stats) in stats_read.iter() {
308                let messages_received = atomic_stats.messages_received.load(Ordering::Relaxed);
309                let messages_per_second = if uptime_seconds > 0.0 {
310                    messages_received as f64 / uptime_seconds
311                } else {
312                    0.0
313                };
314
315                exchange_stats.insert(
316                    exchange.clone(),
317                    ExchangeStats {
318                        messages_received,
319                        bytes_received: atomic_stats.bytes_received.load(Ordering::Relaxed),
320                        reconnections: atomic_stats.reconnections.load(Ordering::Relaxed),
321                        last_message_timestamp: atomic_stats
322                            .last_message_timestamp
323                            .load(Ordering::Relaxed),
324                        connection_duration_ms: atomic_stats
325                            .connection_duration_ms
326                            .load(Ordering::Relaxed),
327                        messages_per_second,
328                    },
329                );
330            }
331        }
332
333        StatsSnapshot {
334            events_processed,
335            trades_processed: self.trades_processed.load(Ordering::Relaxed),
336            orderbooks_processed: self.orderbooks_processed.load(Ordering::Relaxed),
337            bytes_serialized,
338            simd_batches_processed: self.simd_batches_processed.load(Ordering::Relaxed),
339            buffer_reuse_count: self.buffer_reuse_count.load(Ordering::Relaxed),
340            zero_copy_operations: self.zero_copy_operations.load(Ordering::Relaxed),
341            avg_latency_nanos: self.avg_latency_nanos.load(Ordering::Relaxed),
342            connection_errors: self.connection_errors.load(Ordering::Relaxed),
343            parsing_errors: self.parsing_errors.load(Ordering::Relaxed),
344            serialization_errors: self.serialization_errors.load(Ordering::Relaxed),
345            events_per_second,
346            bytes_per_second,
347            compression_ratio: 0.0, // Calculated separately if needed
348            uptime_seconds,
349            exchange_stats,
350        }
351    }
352
353    /// Reset all statistics (used for benchmarking)
354    pub fn reset(&self) {
355        self.events_processed.store(0, Ordering::Relaxed);
356        self.trades_processed.store(0, Ordering::Relaxed);
357        self.orderbooks_processed.store(0, Ordering::Relaxed);
358        self.bytes_serialized.store(0, Ordering::Relaxed);
359        self.simd_batches_processed.store(0, Ordering::Relaxed);
360        self.buffer_reuse_count.store(0, Ordering::Relaxed);
361        self.zero_copy_operations.store(0, Ordering::Relaxed);
362        self.avg_latency_nanos.store(0, Ordering::Relaxed);
363        self.connection_errors.store(0, Ordering::Relaxed);
364        self.parsing_errors.store(0, Ordering::Relaxed);
365        self.serialization_errors.store(0, Ordering::Relaxed);
366
367        // Clear exchange stats
368        self.exchange_stats.write().clear();
369    }
370
371    /// Get current events per second rate (lock-free)
372    pub fn get_events_per_second(&self) -> f64 {
373        let uptime = self.start_time.elapsed().as_secs_f64();
374        if uptime > 0.0 {
375            self.events_processed.load(Ordering::Relaxed) as f64 / uptime
376        } else {
377            0.0
378        }
379    }
380
381    /// Get current average latency in nanoseconds (lock-free)
382    #[inline(always)]
383    pub fn get_avg_latency_nanos(&self) -> u64 {
384        self.avg_latency_nanos.load(Ordering::Relaxed)
385    }
386}
387
388impl Default for LockFreeStatsCollector {
389    fn default() -> Self {
390        Self::new()
391    }
392}
393
394// Thread-safe implementation
395unsafe impl Send for LockFreeStatsCollector {}
396unsafe impl Sync for LockFreeStatsCollector {}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use std::thread;
402    use std::time::Duration;
403
404    #[test]
405    fn test_lockfree_stats_basic_operations() {
406        let stats = LockFreeStatsCollector::new();
407
408        // Test basic increments
409        stats.record_event();
410        stats.record_trades(5);
411        stats.record_orderbooks(3);
412        stats.record_bytes_serialized(1024);
413
414        let snapshot = stats.get_snapshot();
415        assert_eq!(snapshot.events_processed, 1);
416        assert_eq!(snapshot.trades_processed, 5);
417        assert_eq!(snapshot.orderbooks_processed, 3);
418        assert_eq!(snapshot.bytes_serialized, 1024);
419    }
420
421    #[test]
422    fn test_lockfree_stats_latency_tracking() {
423        let stats = LockFreeStatsCollector::new();
424
425        // Test latency updates
426        stats.update_latency(1000);
427        assert_eq!(stats.get_avg_latency_nanos(), 1000);
428
429        stats.update_latency(2000);
430        // Should be exponential moving average: (1000 * 9 + 2000) / 10 = 1100
431        assert_eq!(stats.get_avg_latency_nanos(), 1100);
432    }
433
434    #[test]
435    fn test_lockfree_stats_exchange_tracking() {
436        let stats = LockFreeStatsCollector::new();
437
438        // Test exchange message recording
439        stats.record_exchange_message("binance", 512);
440        stats.record_exchange_message("binance", 256);
441        stats.record_exchange_message("coinbase", 1024);
442
443        let snapshot = stats.get_snapshot();
444        assert_eq!(snapshot.exchange_stats.len(), 2);
445
446        let binance_stats = snapshot.exchange_stats.get("binance").unwrap();
447        assert_eq!(binance_stats.messages_received, 2);
448        assert_eq!(binance_stats.bytes_received, 768);
449
450        let coinbase_stats = snapshot.exchange_stats.get("coinbase").unwrap();
451        assert_eq!(coinbase_stats.messages_received, 1);
452        assert_eq!(coinbase_stats.bytes_received, 1024);
453    }
454
455    #[test]
456    fn test_lockfree_stats_concurrent_access() {
457        let stats = Arc::new(LockFreeStatsCollector::new());
458        let mut handles = Vec::new();
459
460        // Spawn multiple threads to test concurrent access
461        for i in 0..10 {
462            let stats_clone = Arc::clone(&stats);
463            let handle = thread::spawn(move || {
464                for _ in 0..1000 {
465                    stats_clone.record_event();
466                    stats_clone.record_trades(1);
467                    stats_clone.record_exchange_message("test", 100);
468                    stats_clone.update_latency(i * 100);
469                }
470            });
471            handles.push(handle);
472        }
473
474        // Wait for all threads to complete
475        for handle in handles {
476            handle.join().unwrap();
477        }
478
479        let snapshot = stats.get_snapshot();
480        assert_eq!(snapshot.events_processed, 10000);
481        assert_eq!(snapshot.trades_processed, 10000);
482
483        let test_stats = snapshot.exchange_stats.get("test").unwrap();
484        assert_eq!(test_stats.messages_received, 10000);
485        assert_eq!(test_stats.bytes_received, 1000000);
486    }
487
488    #[test]
489    fn test_lockfree_stats_reset() {
490        let stats = LockFreeStatsCollector::new();
491
492        // Add some data
493        stats.record_event();
494        stats.record_trades(5);
495        stats.record_exchange_message("test", 1024);
496
497        // Reset and verify
498        stats.reset();
499        let snapshot = stats.get_snapshot();
500        assert_eq!(snapshot.events_processed, 0);
501        assert_eq!(snapshot.trades_processed, 0);
502        assert_eq!(snapshot.exchange_stats.len(), 0);
503    }
504
505    #[test]
506    fn test_lockfree_stats_rates_calculation() {
507        let stats = LockFreeStatsCollector::new();
508
509        // Record some events
510        for _ in 0..100 {
511            stats.record_event();
512        }
513        for _ in 0..50 {
514            stats.record_bytes_serialized(1024);
515        }
516
517        // Wait a bit to get meaningful rates
518        thread::sleep(Duration::from_millis(100));
519
520        let snapshot = stats.get_snapshot();
521        assert!(snapshot.events_per_second > 0.0);
522        assert!(snapshot.bytes_per_second > 0.0);
523        assert!(snapshot.uptime_seconds > 0.0);
524    }
525}