1use parking_lot::RwLock;
7use quanta::Instant as QuantaInstant;
8use rusty_common::collections::FxHashMap;
9use smartstring::alias::String as SmartString;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13#[derive(Debug)]
15pub struct LockFreeStatsCollector {
16 events_processed: AtomicU64,
18 trades_processed: AtomicU64,
19 orderbooks_processed: AtomicU64,
20 bytes_serialized: AtomicU64,
21 simd_batches_processed: AtomicU64,
22 buffer_reuse_count: AtomicU64,
23 zero_copy_operations: AtomicU64,
24
25 avg_latency_nanos: AtomicU64,
27
28 connection_errors: AtomicU64,
30 parsing_errors: AtomicU64,
31 serialization_errors: AtomicU64,
32
33 exchange_stats: Arc<RwLock<FxHashMap<SmartString, ExchangeStatsAtomic>>>,
35
36 start_time: QuantaInstant,
38}
39
40#[derive(Debug)]
42struct ExchangeStatsAtomic {
43 messages_received: AtomicU64,
44 bytes_received: AtomicU64,
45 reconnections: AtomicU64,
46 last_message_timestamp: AtomicU64,
47 connection_duration_ms: AtomicU64,
48}
49
50impl ExchangeStatsAtomic {
51 const fn new() -> Self {
52 Self {
53 messages_received: AtomicU64::new(0),
54 bytes_received: AtomicU64::new(0),
55 reconnections: AtomicU64::new(0),
56 last_message_timestamp: AtomicU64::new(0),
57 connection_duration_ms: AtomicU64::new(0),
58 }
59 }
60}
61
62#[derive(Debug, Clone, Default)]
64pub struct StatsSnapshot {
65 pub events_processed: u64,
67 pub trades_processed: u64,
69 pub orderbooks_processed: u64,
71 pub bytes_serialized: u64,
73 pub simd_batches_processed: u64,
75 pub buffer_reuse_count: u64,
77 pub zero_copy_operations: u64,
79 pub avg_latency_nanos: u64,
81 pub connection_errors: u64,
83 pub parsing_errors: u64,
85 pub serialization_errors: u64,
87 pub events_per_second: f64,
89 pub bytes_per_second: f64,
91 pub compression_ratio: f64,
93 pub uptime_seconds: f64,
95 pub exchange_stats: FxHashMap<SmartString, ExchangeStats>,
97}
98
99#[derive(Debug, Clone, Default)]
101pub struct ExchangeStats {
102 pub messages_received: u64,
104 pub bytes_received: u64,
106 pub reconnections: u64,
108 pub last_message_timestamp: u64,
110 pub connection_duration_ms: u64,
112 pub messages_per_second: f64,
114}
115
116impl LockFreeStatsCollector {
117 pub fn new() -> Self {
119 Self {
120 events_processed: AtomicU64::new(0),
121 trades_processed: AtomicU64::new(0),
122 orderbooks_processed: AtomicU64::new(0),
123 bytes_serialized: AtomicU64::new(0),
124 simd_batches_processed: AtomicU64::new(0),
125 buffer_reuse_count: AtomicU64::new(0),
126 zero_copy_operations: AtomicU64::new(0),
127 avg_latency_nanos: AtomicU64::new(0),
128 connection_errors: AtomicU64::new(0),
129 parsing_errors: AtomicU64::new(0),
130 serialization_errors: AtomicU64::new(0),
131 exchange_stats: Arc::new(RwLock::new(FxHashMap::default())),
132 start_time: QuantaInstant::now(),
133 }
134 }
135
136 #[inline(always)]
138 pub fn record_event(&self) {
139 self.events_processed.fetch_add(1, Ordering::Relaxed);
140 }
141
142 #[inline(always)]
144 pub fn record_trades(&self, count: u64) {
145 self.trades_processed.fetch_add(count, Ordering::Relaxed);
146 }
147
148 #[inline(always)]
150 pub fn record_orderbooks(&self, count: u64) {
151 self.orderbooks_processed
152 .fetch_add(count, Ordering::Relaxed);
153 }
154
155 #[inline(always)]
157 pub fn record_bytes_serialized(&self, bytes: u64) {
158 self.bytes_serialized.fetch_add(bytes, Ordering::Relaxed);
159 }
160
161 #[inline(always)]
163 pub fn record_simd_batch(&self) {
164 self.simd_batches_processed.fetch_add(1, Ordering::Relaxed);
165 }
166
167 #[inline(always)]
169 pub fn record_buffer_reuse(&self) {
170 self.buffer_reuse_count.fetch_add(1, Ordering::Relaxed);
171 }
172
173 #[inline(always)]
175 pub fn record_zero_copy_ops(&self, count: u64) {
176 self.zero_copy_operations
177 .fetch_add(count, Ordering::Relaxed);
178 }
179
180 #[inline(always)]
182 pub fn update_latency(&self, latency_nanos: u64) {
183 let current_avg = self.avg_latency_nanos.load(Ordering::Relaxed);
186 let new_avg = if current_avg == 0 {
187 latency_nanos
188 } else {
189 (current_avg * 9 + latency_nanos) / 10
190 };
191
192 let _ = self.avg_latency_nanos.compare_exchange_weak(
194 current_avg,
195 new_avg,
196 Ordering::Relaxed,
197 Ordering::Relaxed,
198 );
199 }
200
201 #[inline(always)]
203 pub fn record_connection_error(&self) {
204 self.connection_errors.fetch_add(1, Ordering::Relaxed);
205 }
206
207 #[inline(always)]
209 pub fn record_parsing_error(&self) {
210 self.parsing_errors.fetch_add(1, Ordering::Relaxed);
211 }
212
213 #[inline(always)]
215 pub fn record_serialization_error(&self) {
216 self.serialization_errors.fetch_add(1, Ordering::Relaxed);
217 }
218
219 #[inline(always)]
221 pub fn record_exchange_message(&self, exchange: &str, bytes: u64) {
222 let exchange_key: SmartString = exchange.into();
223
224 {
226 let stats_read = self.exchange_stats.read();
227 if let Some(exchange_stats) = stats_read.get(&exchange_key) {
228 exchange_stats
229 .messages_received
230 .fetch_add(1, Ordering::Relaxed);
231 exchange_stats
232 .bytes_received
233 .fetch_add(bytes, Ordering::Relaxed);
234 exchange_stats.last_message_timestamp.store(
235 QuantaInstant::now().elapsed().as_nanos() as u64,
236 Ordering::Relaxed,
237 );
238 return;
239 }
240 }
241
242 let mut stats_write = self.exchange_stats.write();
244 let exchange_stats = stats_write
245 .entry(exchange_key)
246 .or_insert_with(ExchangeStatsAtomic::new);
247
248 exchange_stats
249 .messages_received
250 .fetch_add(1, Ordering::Relaxed);
251 exchange_stats
252 .bytes_received
253 .fetch_add(bytes, Ordering::Relaxed);
254 exchange_stats.last_message_timestamp.store(
255 QuantaInstant::now().elapsed().as_nanos() as u64,
256 Ordering::Relaxed,
257 );
258 }
259
260 #[inline(always)]
262 pub fn record_exchange_reconnection(&self, exchange: &str) {
263 let exchange_key: SmartString = exchange.into();
264
265 {
267 let stats_read = self.exchange_stats.read();
268 if let Some(exchange_stats) = stats_read.get(&exchange_key) {
269 exchange_stats.reconnections.fetch_add(1, Ordering::Relaxed);
270 return;
271 }
272 }
273
274 let mut stats_write = self.exchange_stats.write();
276 let exchange_stats = stats_write
277 .entry(exchange_key)
278 .or_insert_with(ExchangeStatsAtomic::new);
279 exchange_stats.reconnections.fetch_add(1, Ordering::Relaxed);
280 }
281
282 pub fn get_snapshot(&self) -> StatsSnapshot {
284 let uptime_seconds = self.start_time.elapsed().as_secs_f64();
285
286 let events_processed = self.events_processed.load(Ordering::Relaxed);
288 let bytes_serialized = self.bytes_serialized.load(Ordering::Relaxed);
289
290 let events_per_second = if uptime_seconds > 0.0 {
292 events_processed as f64 / uptime_seconds
293 } else {
294 0.0
295 };
296
297 let bytes_per_second = if uptime_seconds > 0.0 {
298 bytes_serialized as f64 / uptime_seconds
299 } else {
300 0.0
301 };
302
303 let mut exchange_stats = FxHashMap::default();
305 {
306 let stats_read = self.exchange_stats.read();
307 for (exchange, atomic_stats) in stats_read.iter() {
308 let messages_received = atomic_stats.messages_received.load(Ordering::Relaxed);
309 let messages_per_second = if uptime_seconds > 0.0 {
310 messages_received as f64 / uptime_seconds
311 } else {
312 0.0
313 };
314
315 exchange_stats.insert(
316 exchange.clone(),
317 ExchangeStats {
318 messages_received,
319 bytes_received: atomic_stats.bytes_received.load(Ordering::Relaxed),
320 reconnections: atomic_stats.reconnections.load(Ordering::Relaxed),
321 last_message_timestamp: atomic_stats
322 .last_message_timestamp
323 .load(Ordering::Relaxed),
324 connection_duration_ms: atomic_stats
325 .connection_duration_ms
326 .load(Ordering::Relaxed),
327 messages_per_second,
328 },
329 );
330 }
331 }
332
333 StatsSnapshot {
334 events_processed,
335 trades_processed: self.trades_processed.load(Ordering::Relaxed),
336 orderbooks_processed: self.orderbooks_processed.load(Ordering::Relaxed),
337 bytes_serialized,
338 simd_batches_processed: self.simd_batches_processed.load(Ordering::Relaxed),
339 buffer_reuse_count: self.buffer_reuse_count.load(Ordering::Relaxed),
340 zero_copy_operations: self.zero_copy_operations.load(Ordering::Relaxed),
341 avg_latency_nanos: self.avg_latency_nanos.load(Ordering::Relaxed),
342 connection_errors: self.connection_errors.load(Ordering::Relaxed),
343 parsing_errors: self.parsing_errors.load(Ordering::Relaxed),
344 serialization_errors: self.serialization_errors.load(Ordering::Relaxed),
345 events_per_second,
346 bytes_per_second,
347 compression_ratio: 0.0, uptime_seconds,
349 exchange_stats,
350 }
351 }
352
353 pub fn reset(&self) {
355 self.events_processed.store(0, Ordering::Relaxed);
356 self.trades_processed.store(0, Ordering::Relaxed);
357 self.orderbooks_processed.store(0, Ordering::Relaxed);
358 self.bytes_serialized.store(0, Ordering::Relaxed);
359 self.simd_batches_processed.store(0, Ordering::Relaxed);
360 self.buffer_reuse_count.store(0, Ordering::Relaxed);
361 self.zero_copy_operations.store(0, Ordering::Relaxed);
362 self.avg_latency_nanos.store(0, Ordering::Relaxed);
363 self.connection_errors.store(0, Ordering::Relaxed);
364 self.parsing_errors.store(0, Ordering::Relaxed);
365 self.serialization_errors.store(0, Ordering::Relaxed);
366
367 self.exchange_stats.write().clear();
369 }
370
371 pub fn get_events_per_second(&self) -> f64 {
373 let uptime = self.start_time.elapsed().as_secs_f64();
374 if uptime > 0.0 {
375 self.events_processed.load(Ordering::Relaxed) as f64 / uptime
376 } else {
377 0.0
378 }
379 }
380
381 #[inline(always)]
383 pub fn get_avg_latency_nanos(&self) -> u64 {
384 self.avg_latency_nanos.load(Ordering::Relaxed)
385 }
386}
387
388impl Default for LockFreeStatsCollector {
389 fn default() -> Self {
390 Self::new()
391 }
392}
393
394unsafe impl Send for LockFreeStatsCollector {}
396unsafe impl Sync for LockFreeStatsCollector {}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401 use std::thread;
402 use std::time::Duration;
403
404 #[test]
405 fn test_lockfree_stats_basic_operations() {
406 let stats = LockFreeStatsCollector::new();
407
408 stats.record_event();
410 stats.record_trades(5);
411 stats.record_orderbooks(3);
412 stats.record_bytes_serialized(1024);
413
414 let snapshot = stats.get_snapshot();
415 assert_eq!(snapshot.events_processed, 1);
416 assert_eq!(snapshot.trades_processed, 5);
417 assert_eq!(snapshot.orderbooks_processed, 3);
418 assert_eq!(snapshot.bytes_serialized, 1024);
419 }
420
421 #[test]
422 fn test_lockfree_stats_latency_tracking() {
423 let stats = LockFreeStatsCollector::new();
424
425 stats.update_latency(1000);
427 assert_eq!(stats.get_avg_latency_nanos(), 1000);
428
429 stats.update_latency(2000);
430 assert_eq!(stats.get_avg_latency_nanos(), 1100);
432 }
433
434 #[test]
435 fn test_lockfree_stats_exchange_tracking() {
436 let stats = LockFreeStatsCollector::new();
437
438 stats.record_exchange_message("binance", 512);
440 stats.record_exchange_message("binance", 256);
441 stats.record_exchange_message("coinbase", 1024);
442
443 let snapshot = stats.get_snapshot();
444 assert_eq!(snapshot.exchange_stats.len(), 2);
445
446 let binance_stats = snapshot.exchange_stats.get("binance").unwrap();
447 assert_eq!(binance_stats.messages_received, 2);
448 assert_eq!(binance_stats.bytes_received, 768);
449
450 let coinbase_stats = snapshot.exchange_stats.get("coinbase").unwrap();
451 assert_eq!(coinbase_stats.messages_received, 1);
452 assert_eq!(coinbase_stats.bytes_received, 1024);
453 }
454
455 #[test]
456 fn test_lockfree_stats_concurrent_access() {
457 let stats = Arc::new(LockFreeStatsCollector::new());
458 let mut handles = Vec::new();
459
460 for i in 0..10 {
462 let stats_clone = Arc::clone(&stats);
463 let handle = thread::spawn(move || {
464 for _ in 0..1000 {
465 stats_clone.record_event();
466 stats_clone.record_trades(1);
467 stats_clone.record_exchange_message("test", 100);
468 stats_clone.update_latency(i * 100);
469 }
470 });
471 handles.push(handle);
472 }
473
474 for handle in handles {
476 handle.join().unwrap();
477 }
478
479 let snapshot = stats.get_snapshot();
480 assert_eq!(snapshot.events_processed, 10000);
481 assert_eq!(snapshot.trades_processed, 10000);
482
483 let test_stats = snapshot.exchange_stats.get("test").unwrap();
484 assert_eq!(test_stats.messages_received, 10000);
485 assert_eq!(test_stats.bytes_received, 1000000);
486 }
487
488 #[test]
489 fn test_lockfree_stats_reset() {
490 let stats = LockFreeStatsCollector::new();
491
492 stats.record_event();
494 stats.record_trades(5);
495 stats.record_exchange_message("test", 1024);
496
497 stats.reset();
499 let snapshot = stats.get_snapshot();
500 assert_eq!(snapshot.events_processed, 0);
501 assert_eq!(snapshot.trades_processed, 0);
502 assert_eq!(snapshot.exchange_stats.len(), 0);
503 }
504
505 #[test]
506 fn test_lockfree_stats_rates_calculation() {
507 let stats = LockFreeStatsCollector::new();
508
509 for _ in 0..100 {
511 stats.record_event();
512 }
513 for _ in 0..50 {
514 stats.record_bytes_serialized(1024);
515 }
516
517 thread::sleep(Duration::from_millis(100));
519
520 let snapshot = stats.get_snapshot();
521 assert!(snapshot.events_per_second > 0.0);
522 assert!(snapshot.bytes_per_second > 0.0);
523 assert!(snapshot.uptime_seconds > 0.0);
524 }
525}