rusty_bin/monitor/
lockfree_exchange_manager.rs

1//! Lock-free exchange manager for high-frequency trading monitoring
2//!
3//! This module provides a lock-free exchange connection manager that eliminates
4//! contention in critical paths while maintaining thread safety through atomic operations.
5
6use crate::monitor::collector::{
7    CollectionTask, DataType, MarketDataEvent, Result, exchange_client::ExchangeClient,
8};
9use crossbeam_utils::CachePadded;
10use flume::Sender;
11use parking_lot::RwLock;
12use quanta::Clock;
13use rusty_common::collections::FxHashMap;
14use smartstring::alias::String as SmartString;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
17use std::time::Instant;
18
19/// Atomic exchange connection state
20#[derive(Debug)]
21struct AtomicExchangeState {
22    /// Whether the exchange is currently connected
23    connected: AtomicBool,
24    /// Last successful message timestamp
25    last_message_timestamp: AtomicU64,
26    /// Number of reconnection attempts
27    reconnection_attempts: AtomicUsize,
28    /// Connection establishment timestamp
29    connection_start_time: AtomicU64,
30    /// Total messages received
31    messages_received: AtomicU64,
32    /// Total bytes received
33    bytes_received: AtomicU64,
34    /// Last error timestamp
35    last_error_timestamp: AtomicU64,
36    /// Whether health check is currently in progress
37    health_check_in_progress: AtomicBool,
38}
39
40impl AtomicExchangeState {
41    const fn new() -> Self {
42        Self {
43            connected: AtomicBool::new(false),
44            last_message_timestamp: AtomicU64::new(0),
45            reconnection_attempts: AtomicUsize::new(0),
46            connection_start_time: AtomicU64::new(0),
47            messages_received: AtomicU64::new(0),
48            bytes_received: AtomicU64::new(0),
49            last_error_timestamp: AtomicU64::new(0),
50            health_check_in_progress: AtomicBool::new(false),
51        }
52    }
53
54    /// Mark exchange as connected (lock-free)
55    #[inline(always)]
56    fn set_connected(&self, connected: bool) {
57        self.connected.store(connected, Ordering::Release);
58        if connected {
59            self.connection_start_time
60                .store(Clock::new().raw(), Ordering::Relaxed);
61        }
62    }
63
64    /// Check if exchange is connected (lock-free)
65    #[inline(always)]
66    fn is_connected(&self) -> bool {
67        self.connected.load(Ordering::Acquire)
68    }
69
70    /// Record a received message (lock-free)
71    #[inline(always)]
72    fn record_message(&self, bytes: u64) {
73        self.messages_received.fetch_add(1, Ordering::Relaxed);
74        self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
75        self.last_message_timestamp
76            .store(Clock::new().raw(), Ordering::Relaxed);
77    }
78
79    /// Record an error (lock-free)
80    #[inline(always)]
81    fn record_error(&self) {
82        self.last_error_timestamp
83            .store(Clock::new().raw(), Ordering::Relaxed);
84        self.set_connected(false);
85    }
86
87    /// Increment reconnection attempts (lock-free)
88    #[inline(always)]
89    fn increment_reconnection_attempts(&self) -> usize {
90        self.reconnection_attempts.fetch_add(1, Ordering::Relaxed)
91    }
92
93    /// Reset reconnection attempts (lock-free)
94    #[inline(always)]
95    fn reset_reconnection_attempts(&self) {
96        self.reconnection_attempts.store(0, Ordering::Relaxed);
97    }
98
99    /// Start health check if not already in progress (lock-free)
100    #[inline(always)]
101    fn try_start_health_check(&self) -> bool {
102        self.health_check_in_progress
103            .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
104            .is_ok()
105    }
106
107    /// Finish health check (lock-free)
108    #[inline(always)]
109    fn finish_health_check(&self) {
110        self.health_check_in_progress
111            .store(false, Ordering::Release);
112    }
113}
114
115/// Exchange connection metadata with atomic fields
116struct ExchangeMetadata {
117    /// Exchange client (wrapped in RwLock for rare mutations)
118    client: RwLock<Option<ExchangeClient>>,
119    /// Atomic state for frequent reads
120    state: AtomicExchangeState,
121    /// Active collection tasks
122    active_tasks: RwLock<FxHashMap<String, CollectionTask>>,
123    /// Last health check timestamp
124    last_health_check: AtomicU64,
125}
126
127impl ExchangeMetadata {
128    fn new() -> Self {
129        Self {
130            client: RwLock::new(None),
131            state: AtomicExchangeState::new(),
132            active_tasks: RwLock::new(FxHashMap::default()),
133            last_health_check: AtomicU64::new(0),
134        }
135    }
136}
137
138/// Lock-free exchange manager optimized for high-frequency operations
139pub struct LockFreeExchangeManager {
140    /// Exchange metadata by name (uses RwLock only for structural changes)
141    exchanges: RwLock<FxHashMap<SmartString, Arc<ExchangeMetadata>>>,
142
143    /// Global atomic counters for fast statistics
144    total_exchanges: CachePadded<AtomicUsize>,
145    total_active_tasks: CachePadded<AtomicUsize>,
146    total_messages_received: CachePadded<AtomicU64>,
147    total_bytes_received: CachePadded<AtomicU64>,
148    total_errors: CachePadded<AtomicU64>,
149
150    /// Manager state
151    manager_start_time: Instant,
152    shutdown_requested: AtomicBool,
153}
154
155impl LockFreeExchangeManager {
156    /// Create a new lock-free exchange manager
157    pub fn new() -> Self {
158        Self {
159            exchanges: RwLock::new(FxHashMap::default()),
160            total_exchanges: CachePadded::new(AtomicUsize::new(0)),
161            total_active_tasks: CachePadded::new(AtomicUsize::new(0)),
162            total_messages_received: CachePadded::new(AtomicU64::new(0)),
163            total_bytes_received: CachePadded::new(AtomicU64::new(0)),
164            total_errors: CachePadded::new(AtomicU64::new(0)),
165            manager_start_time: Instant::now(),
166            shutdown_requested: AtomicBool::new(false),
167        }
168    }
169
170    /// Add an exchange to the manager (infrequent operation, uses lock)
171    pub async fn add_exchange(
172        &self,
173        exchange_name: SmartString,
174        mut client: ExchangeClient,
175    ) -> Result<()> {
176        // Connect the client first
177        match client.connect().await {
178            Ok(()) => {
179                log::info!("Successfully connected to exchange: {exchange_name}");
180
181                // Create metadata
182                let metadata = Arc::new(ExchangeMetadata::new());
183                metadata.state.set_connected(true);
184                *metadata.client.write() = Some(client);
185
186                // Add to exchanges map
187                {
188                    let mut exchanges = self.exchanges.write();
189                    exchanges.insert(exchange_name.clone(), metadata);
190                    self.total_exchanges.fetch_add(1, Ordering::Relaxed);
191                }
192
193                Ok(())
194            }
195            Err(e) => {
196                log::error!("Failed to connect to exchange {exchange_name}: {e}");
197
198                // Still add to map but mark as disconnected for retry
199                let metadata = Arc::new(ExchangeMetadata::new());
200                metadata.state.record_error();
201                *metadata.client.write() = Some(client);
202
203                {
204                    let mut exchanges = self.exchanges.write();
205                    exchanges.insert(exchange_name, metadata);
206                    self.total_exchanges.fetch_add(1, Ordering::Relaxed);
207                }
208
209                self.total_errors.fetch_add(1, Ordering::Relaxed);
210                Err(e)
211            }
212        }
213    }
214
215    /// Remove an exchange from the manager (infrequent operation)
216    pub async fn remove_exchange(&self, exchange_name: &str) -> Result<()> {
217        let exchange_key: SmartString = exchange_name.into();
218
219        let metadata = {
220            let exchanges = self.exchanges.read();
221            exchanges.get(&exchange_key).cloned()
222        };
223
224        if let Some(metadata) = metadata {
225            // Disconnect client if present
226            let client_opt = {
227                let mut client_lock = metadata.client.write();
228                client_lock.take()
229            }; // Lock is dropped here
230
231            if let Some(mut client) = client_opt
232                && let Err(e) = client.disconnect().await
233            {
234                log::error!("Error disconnecting exchange {exchange_name}: {e}");
235            }
236
237            // Mark as disconnected
238            metadata.state.set_connected(false);
239
240            // Remove from map
241            {
242                let mut exchanges = self.exchanges.write();
243                exchanges.remove(&exchange_key);
244                self.total_exchanges.fetch_sub(1, Ordering::Relaxed);
245            }
246
247            log::info!("Successfully removed exchange: {exchange_name}");
248        }
249
250        Ok(())
251    }
252
253    /// Start collection for an exchange (mostly lock-free)
254    pub async fn start_collection(
255        &self,
256        exchange_name: &str,
257        symbols: Vec<SmartString>,
258        data_types: Vec<DataType>,
259        event_sender: Sender<MarketDataEvent>,
260    ) -> Result<()> {
261        let exchange_key: SmartString = exchange_name.into();
262
263        let metadata = {
264            let exchanges = self.exchanges.read();
265            exchanges.get(&exchange_key).cloned()
266        };
267
268        let metadata = metadata.ok_or_else(|| {
269            Box::new(std::io::Error::new(
270                std::io::ErrorKind::NotFound,
271                format!("Exchange {exchange_name} not found"),
272            )) as Box<dyn std::error::Error + Send + Sync>
273        })?;
274
275        // Check if connected (lock-free)
276        if !metadata.state.is_connected() {
277            return Err(Box::new(std::io::Error::new(
278                std::io::ErrorKind::NotConnected,
279                format!("Exchange {exchange_name} is not connected"),
280            )) as Box<dyn std::error::Error + Send + Sync>)?;
281        }
282
283        // Start collection on client with panic safety
284        {
285            let has_client = metadata.client.read().is_some();
286            if !has_client {
287                return Err(Box::new(std::io::Error::new(
288                    std::io::ErrorKind::NotFound,
289                    format!("No client for exchange {exchange_name}"),
290                ))
291                    as Box<dyn std::error::Error + Send + Sync>)?;
292            }
293        }
294
295        // PANIC SAFETY: Use a guard to ensure client is always put back
296        struct ClientGuard<'a> {
297            client: Option<ExchangeClient>,
298            storage: &'a RwLock<Option<ExchangeClient>>,
299        }
300
301        impl<'a> Drop for ClientGuard<'a> {
302            fn drop(&mut self) {
303                if let Some(client) = self.client.take() {
304                    *self.storage.write() = Some(client);
305                }
306            }
307        }
308
309        // Take client out with guard protection
310        let mut guard = ClientGuard {
311            client: metadata.client.write().take(),
312            storage: &metadata.client,
313        };
314
315        if let Some(client) = &mut guard.client {
316            client
317                .start_collection(symbols.clone(), data_types.clone(), event_sender)
318                .await?;
319        }
320
321        // Add tasks to tracking
322        {
323            let mut active_tasks = metadata.active_tasks.write();
324            let mut task_count = 0;
325
326            for symbol in &symbols {
327                for data_type in &data_types {
328                    let task_id = format!("{exchange_name}:{symbol}:{data_type:?}");
329                    active_tasks.insert(
330                        task_id,
331                        CollectionTask {
332                            exchange: exchange_key.clone(),
333                            symbol: symbol.clone(),
334                            data_types: vec![*data_type],
335                            enabled: true,
336                            retry_count: 0,
337                            max_retries: 3,
338                            backoff_ms: 1000,
339                        },
340                    );
341                    task_count += 1;
342                }
343            }
344
345            self.total_active_tasks
346                .fetch_add(task_count, Ordering::Relaxed);
347        }
348
349        log::info!(
350            "Started collection for exchange {} with {} symbols and {} data types",
351            exchange_name,
352            symbols.len(),
353            data_types.len()
354        );
355
356        Ok(())
357    }
358
359    /// Record a message for an exchange (lock-free)
360    #[inline(always)]
361    pub fn record_message(&self, exchange_name: &str, bytes: u64) {
362        let exchange_key: SmartString = exchange_name.into();
363
364        // Always update global counters first
365        self.total_messages_received.fetch_add(1, Ordering::Relaxed);
366        self.total_bytes_received
367            .fetch_add(bytes, Ordering::Relaxed);
368
369        // Fast path: get metadata without lock
370        let metadata = {
371            let exchanges = self.exchanges.read();
372            exchanges.get(&exchange_key).cloned()
373        };
374
375        if let Some(metadata) = metadata {
376            metadata.state.record_message(bytes);
377        }
378    }
379
380    /// Record an error for an exchange (lock-free)
381    #[inline(always)]
382    pub fn record_error(&self, exchange_name: &str) {
383        let exchange_key: SmartString = exchange_name.into();
384
385        // Always update global counter first
386        self.total_errors.fetch_add(1, Ordering::Relaxed);
387
388        let metadata = {
389            let exchanges = self.exchanges.read();
390            exchanges.get(&exchange_key).cloned()
391        };
392
393        if let Some(metadata) = metadata {
394            metadata.state.record_error();
395        }
396    }
397
398    /// Get connection status for all exchanges (mostly lock-free reads)
399    pub fn get_connection_status(&self) -> FxHashMap<SmartString, bool> {
400        let mut status_map = FxHashMap::default();
401
402        let exchanges = self.exchanges.read();
403        for (exchange_name, metadata) in exchanges.iter() {
404            status_map.insert(exchange_name.clone(), metadata.state.is_connected());
405        }
406
407        status_map
408    }
409
410    /// Perform health check on all exchanges (lock-free coordination)
411    pub async fn health_check(&self) -> FxHashMap<SmartString, bool> {
412        let mut health_status = FxHashMap::default();
413
414        let exchanges = {
415            let exchanges = self.exchanges.read();
416            exchanges.clone()
417        };
418
419        for (exchange_name, metadata) in exchanges {
420            // Try to start health check (lock-free)
421            if metadata.state.try_start_health_check() {
422                let is_healthy = {
423                    // For now, just use the atomic connection state to avoid
424                    // complex async handling in this health check
425                    // A more sophisticated health check can be implemented later
426                    metadata.state.is_connected()
427                };
428
429                // Update connection state if health check failed
430                if !is_healthy {
431                    metadata.state.record_error();
432                }
433
434                metadata
435                    .last_health_check
436                    .store(Clock::new().raw(), Ordering::Relaxed);
437                metadata.state.finish_health_check();
438
439                health_status.insert(exchange_name, is_healthy);
440            } else {
441                // Health check already in progress, use cached connection status
442                health_status.insert(exchange_name, metadata.state.is_connected());
443            }
444        }
445
446        health_status
447    }
448
449    /// Get comprehensive statistics (mostly lock-free reads)
450    pub fn get_statistics(&self) -> ExchangeManagerStats {
451        let uptime_seconds = self.manager_start_time.elapsed().as_secs_f64();
452
453        // Global atomic reads
454        let total_messages = self.total_messages_received.load(Ordering::Relaxed);
455        let total_bytes = self.total_bytes_received.load(Ordering::Relaxed);
456
457        // Calculate rates
458        let messages_per_second = if uptime_seconds > 0.0 {
459            total_messages as f64 / uptime_seconds
460        } else {
461            0.0
462        };
463
464        let bytes_per_second = if uptime_seconds > 0.0 {
465            total_bytes as f64 / uptime_seconds
466        } else {
467            0.0
468        };
469
470        // Per-exchange statistics
471        let mut exchange_stats = FxHashMap::default();
472        {
473            let exchanges = self.exchanges.read();
474            for (exchange_name, metadata) in exchanges.iter() {
475                let exchange_messages = metadata.state.messages_received.load(Ordering::Relaxed);
476                let exchange_bytes = metadata.state.bytes_received.load(Ordering::Relaxed);
477                let exchange_rate = if uptime_seconds > 0.0 {
478                    exchange_messages as f64 / uptime_seconds
479                } else {
480                    0.0
481                };
482
483                exchange_stats.insert(
484                    exchange_name.clone(),
485                    ExchangeStats {
486                        connected: metadata.state.is_connected(),
487                        messages_received: exchange_messages,
488                        bytes_received: exchange_bytes,
489                        messages_per_second: exchange_rate,
490                        reconnection_attempts: metadata
491                            .state
492                            .reconnection_attempts
493                            .load(Ordering::Relaxed),
494                        last_message_timestamp: metadata
495                            .state
496                            .last_message_timestamp
497                            .load(Ordering::Relaxed),
498                        connection_start_time: metadata
499                            .state
500                            .connection_start_time
501                            .load(Ordering::Relaxed),
502                    },
503                );
504            }
505        }
506
507        ExchangeManagerStats {
508            total_exchanges: self.total_exchanges.load(Ordering::Relaxed),
509            total_active_tasks: self.total_active_tasks.load(Ordering::Relaxed),
510            total_messages_received: total_messages,
511            total_bytes_received: total_bytes,
512            total_errors: self.total_errors.load(Ordering::Relaxed),
513            messages_per_second,
514            bytes_per_second,
515            uptime_seconds,
516            exchange_stats,
517        }
518    }
519
520    /// Request shutdown (lock-free)
521    pub fn request_shutdown(&self) {
522        self.shutdown_requested.store(true, Ordering::Release);
523    }
524
525    /// Check if shutdown was requested (lock-free)
526    pub fn is_shutdown_requested(&self) -> bool {
527        self.shutdown_requested.load(Ordering::Acquire)
528    }
529
530    /// Shutdown all exchanges
531    pub async fn shutdown(&self) -> Result<()> {
532        self.request_shutdown();
533
534        let exchanges = {
535            let mut exchanges = self.exchanges.write();
536            std::mem::take(&mut *exchanges)
537        };
538
539        for (exchange_name, metadata) in exchanges {
540            // Take client out of lock to avoid holding across await
541            let client_opt = {
542                let mut client_lock = metadata.client.write();
543                client_lock.take()
544            };
545
546            if let Some(mut client) = client_opt
547                && let Err(e) = client.disconnect().await
548            {
549                log::error!("Error disconnecting exchange {exchange_name} during shutdown: {e}");
550            }
551            metadata.state.set_connected(false);
552        }
553
554        // Reset counters
555        self.total_exchanges.store(0, Ordering::Relaxed);
556        self.total_active_tasks.store(0, Ordering::Relaxed);
557
558        log::info!("Exchange manager shutdown completed");
559        Ok(())
560    }
561}
562
563/// Statistics for the exchange manager
564///
565/// Provides comprehensive performance metrics and operational statistics
566/// for the lock-free exchange manager, including global counters and
567/// per-exchange breakdown.
568#[derive(Debug, Clone)]
569pub struct ExchangeManagerStats {
570    /// Total number of exchanges currently registered
571    pub total_exchanges: usize,
572    /// Total number of active collection tasks across all exchanges
573    pub total_active_tasks: usize,
574    /// Total messages received from all exchanges
575    pub total_messages_received: u64,
576    /// Total bytes received from all exchanges
577    pub total_bytes_received: u64,
578    /// Total errors encountered across all exchanges
579    pub total_errors: u64,
580    /// Average messages per second across all exchanges
581    pub messages_per_second: f64,
582    /// Average bytes per second across all exchanges
583    pub bytes_per_second: f64,
584    /// Manager uptime in seconds since initialization
585    pub uptime_seconds: f64,
586    /// Per-exchange statistics breakdown
587    pub exchange_stats: FxHashMap<SmartString, ExchangeStats>,
588}
589
590/// Statistics for individual exchanges
591///
592/// Provides detailed performance metrics and connection information
593/// for a specific exchange, including message throughput, connection
594/// status, and error tracking.
595#[derive(Debug, Clone)]
596pub struct ExchangeStats {
597    /// Whether the exchange is currently connected
598    pub connected: bool,
599    /// Total messages received from this exchange
600    pub messages_received: u64,
601    /// Total bytes received from this exchange
602    pub bytes_received: u64,
603    /// Average messages per second for this exchange
604    pub messages_per_second: f64,
605    /// Number of reconnection attempts made
606    pub reconnection_attempts: usize,
607    /// Timestamp of the last message received (nanoseconds)
608    pub last_message_timestamp: u64,
609    /// Timestamp when the connection was established (nanoseconds)
610    pub connection_start_time: u64,
611}
612
613impl Default for LockFreeExchangeManager {
614    fn default() -> Self {
615        Self::new()
616    }
617}
618
619// Thread safety
620unsafe impl Send for LockFreeExchangeManager {}
621unsafe impl Sync for LockFreeExchangeManager {}
622
623#[cfg(test)]
624mod tests {
625    use super::*;
626    use std::thread;
627    use std::time::Duration;
628
629    #[tokio::test]
630    async fn test_lockfree_exchange_manager_basic_operations() {
631        let manager = LockFreeExchangeManager::new();
632
633        // Test initial state
634        let stats = manager.get_statistics();
635        assert_eq!(stats.total_exchanges, 0);
636        assert_eq!(stats.total_messages_received, 0);
637
638        // Test shutdown
639        assert!(!manager.is_shutdown_requested());
640        manager.request_shutdown();
641        assert!(manager.is_shutdown_requested());
642    }
643
644    #[tokio::test]
645    async fn test_lockfree_exchange_manager_message_recording() {
646        let manager = Arc::new(LockFreeExchangeManager::new());
647
648        // Test concurrent message recording
649        let mut handles = Vec::new();
650        for i in 0..10 {
651            let manager_clone = Arc::clone(&manager);
652            let handle = thread::spawn(move || {
653                for j in 0..100 {
654                    manager_clone.record_message(&format!("exchange_{i}"), j as u64);
655                }
656            });
657            handles.push(handle);
658        }
659
660        // Wait for all threads
661        for handle in handles {
662            handle.join().unwrap();
663        }
664
665        let stats = manager.get_statistics();
666        assert_eq!(stats.total_messages_received, 1000); // 10 threads * 100 messages
667        assert!(stats.total_bytes_received > 0);
668    }
669
670    #[tokio::test]
671    async fn test_lockfree_exchange_manager_error_recording() {
672        let manager = LockFreeExchangeManager::new();
673
674        // Test error recording
675        manager.record_error("test_exchange");
676        manager.record_error("test_exchange");
677
678        let stats = manager.get_statistics();
679        assert_eq!(stats.total_errors, 2);
680    }
681
682    #[test]
683    fn test_atomic_exchange_state() {
684        let state = AtomicExchangeState::new();
685
686        // Test initial state
687        assert!(!state.is_connected());
688        assert_eq!(state.reconnection_attempts.load(Ordering::Relaxed), 0);
689
690        // Test connection state changes
691        state.set_connected(true);
692        assert!(state.is_connected());
693
694        state.record_error();
695        assert!(!state.is_connected());
696
697        // Test message recording
698        state.record_message(1024);
699        assert_eq!(state.messages_received.load(Ordering::Relaxed), 1);
700        assert_eq!(state.bytes_received.load(Ordering::Relaxed), 1024);
701
702        // Test reconnection attempts
703        state.increment_reconnection_attempts();
704        state.increment_reconnection_attempts();
705        assert_eq!(state.reconnection_attempts.load(Ordering::Relaxed), 2);
706
707        state.reset_reconnection_attempts();
708        assert_eq!(state.reconnection_attempts.load(Ordering::Relaxed), 0);
709
710        // Test health check coordination
711        assert!(state.try_start_health_check());
712        assert!(!state.try_start_health_check()); // Should fail the second time
713
714        state.finish_health_check();
715        assert!(state.try_start_health_check()); // Should succeed again
716    }
717
718    #[tokio::test]
719    async fn test_lockfree_exchange_manager_statistics() {
720        let manager = LockFreeExchangeManager::new();
721
722        // Record some activity
723        manager.record_message("exchange1", 1000);
724        manager.record_message("exchange2", 2000);
725        manager.record_error("exchange1");
726
727        // Wait a bit for time-based calculations
728        thread::sleep(Duration::from_millis(100));
729
730        let stats = manager.get_statistics();
731        assert_eq!(stats.total_messages_received, 2);
732        assert_eq!(stats.total_bytes_received, 3000);
733        assert_eq!(stats.total_errors, 1);
734        assert!(stats.messages_per_second > 0.0);
735        assert!(stats.bytes_per_second > 0.0);
736        assert!(stats.uptime_seconds > 0.0);
737    }
738}