1use crate::monitor::collector::{
7 CollectionTask, DataType, MarketDataEvent, Result, exchange_client::ExchangeClient,
8};
9use crossbeam_utils::CachePadded;
10use flume::Sender;
11use parking_lot::RwLock;
12use quanta::Clock;
13use rusty_common::collections::FxHashMap;
14use smartstring::alias::String as SmartString;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
17use std::time::Instant;
18
19#[derive(Debug)]
21struct AtomicExchangeState {
22 connected: AtomicBool,
24 last_message_timestamp: AtomicU64,
26 reconnection_attempts: AtomicUsize,
28 connection_start_time: AtomicU64,
30 messages_received: AtomicU64,
32 bytes_received: AtomicU64,
34 last_error_timestamp: AtomicU64,
36 health_check_in_progress: AtomicBool,
38}
39
40impl AtomicExchangeState {
41 const fn new() -> Self {
42 Self {
43 connected: AtomicBool::new(false),
44 last_message_timestamp: AtomicU64::new(0),
45 reconnection_attempts: AtomicUsize::new(0),
46 connection_start_time: AtomicU64::new(0),
47 messages_received: AtomicU64::new(0),
48 bytes_received: AtomicU64::new(0),
49 last_error_timestamp: AtomicU64::new(0),
50 health_check_in_progress: AtomicBool::new(false),
51 }
52 }
53
54 #[inline(always)]
56 fn set_connected(&self, connected: bool) {
57 self.connected.store(connected, Ordering::Release);
58 if connected {
59 self.connection_start_time
60 .store(Clock::new().raw(), Ordering::Relaxed);
61 }
62 }
63
64 #[inline(always)]
66 fn is_connected(&self) -> bool {
67 self.connected.load(Ordering::Acquire)
68 }
69
70 #[inline(always)]
72 fn record_message(&self, bytes: u64) {
73 self.messages_received.fetch_add(1, Ordering::Relaxed);
74 self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
75 self.last_message_timestamp
76 .store(Clock::new().raw(), Ordering::Relaxed);
77 }
78
79 #[inline(always)]
81 fn record_error(&self) {
82 self.last_error_timestamp
83 .store(Clock::new().raw(), Ordering::Relaxed);
84 self.set_connected(false);
85 }
86
87 #[inline(always)]
89 fn increment_reconnection_attempts(&self) -> usize {
90 self.reconnection_attempts.fetch_add(1, Ordering::Relaxed)
91 }
92
93 #[inline(always)]
95 fn reset_reconnection_attempts(&self) {
96 self.reconnection_attempts.store(0, Ordering::Relaxed);
97 }
98
99 #[inline(always)]
101 fn try_start_health_check(&self) -> bool {
102 self.health_check_in_progress
103 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
104 .is_ok()
105 }
106
107 #[inline(always)]
109 fn finish_health_check(&self) {
110 self.health_check_in_progress
111 .store(false, Ordering::Release);
112 }
113}
114
115struct ExchangeMetadata {
117 client: RwLock<Option<ExchangeClient>>,
119 state: AtomicExchangeState,
121 active_tasks: RwLock<FxHashMap<String, CollectionTask>>,
123 last_health_check: AtomicU64,
125}
126
127impl ExchangeMetadata {
128 fn new() -> Self {
129 Self {
130 client: RwLock::new(None),
131 state: AtomicExchangeState::new(),
132 active_tasks: RwLock::new(FxHashMap::default()),
133 last_health_check: AtomicU64::new(0),
134 }
135 }
136}
137
138pub struct LockFreeExchangeManager {
140 exchanges: RwLock<FxHashMap<SmartString, Arc<ExchangeMetadata>>>,
142
143 total_exchanges: CachePadded<AtomicUsize>,
145 total_active_tasks: CachePadded<AtomicUsize>,
146 total_messages_received: CachePadded<AtomicU64>,
147 total_bytes_received: CachePadded<AtomicU64>,
148 total_errors: CachePadded<AtomicU64>,
149
150 manager_start_time: Instant,
152 shutdown_requested: AtomicBool,
153}
154
155impl LockFreeExchangeManager {
156 pub fn new() -> Self {
158 Self {
159 exchanges: RwLock::new(FxHashMap::default()),
160 total_exchanges: CachePadded::new(AtomicUsize::new(0)),
161 total_active_tasks: CachePadded::new(AtomicUsize::new(0)),
162 total_messages_received: CachePadded::new(AtomicU64::new(0)),
163 total_bytes_received: CachePadded::new(AtomicU64::new(0)),
164 total_errors: CachePadded::new(AtomicU64::new(0)),
165 manager_start_time: Instant::now(),
166 shutdown_requested: AtomicBool::new(false),
167 }
168 }
169
170 pub async fn add_exchange(
172 &self,
173 exchange_name: SmartString,
174 mut client: ExchangeClient,
175 ) -> Result<()> {
176 match client.connect().await {
178 Ok(()) => {
179 log::info!("Successfully connected to exchange: {exchange_name}");
180
181 let metadata = Arc::new(ExchangeMetadata::new());
183 metadata.state.set_connected(true);
184 *metadata.client.write() = Some(client);
185
186 {
188 let mut exchanges = self.exchanges.write();
189 exchanges.insert(exchange_name.clone(), metadata);
190 self.total_exchanges.fetch_add(1, Ordering::Relaxed);
191 }
192
193 Ok(())
194 }
195 Err(e) => {
196 log::error!("Failed to connect to exchange {exchange_name}: {e}");
197
198 let metadata = Arc::new(ExchangeMetadata::new());
200 metadata.state.record_error();
201 *metadata.client.write() = Some(client);
202
203 {
204 let mut exchanges = self.exchanges.write();
205 exchanges.insert(exchange_name, metadata);
206 self.total_exchanges.fetch_add(1, Ordering::Relaxed);
207 }
208
209 self.total_errors.fetch_add(1, Ordering::Relaxed);
210 Err(e)
211 }
212 }
213 }
214
215 pub async fn remove_exchange(&self, exchange_name: &str) -> Result<()> {
217 let exchange_key: SmartString = exchange_name.into();
218
219 let metadata = {
220 let exchanges = self.exchanges.read();
221 exchanges.get(&exchange_key).cloned()
222 };
223
224 if let Some(metadata) = metadata {
225 let client_opt = {
227 let mut client_lock = metadata.client.write();
228 client_lock.take()
229 }; if let Some(mut client) = client_opt
232 && let Err(e) = client.disconnect().await
233 {
234 log::error!("Error disconnecting exchange {exchange_name}: {e}");
235 }
236
237 metadata.state.set_connected(false);
239
240 {
242 let mut exchanges = self.exchanges.write();
243 exchanges.remove(&exchange_key);
244 self.total_exchanges.fetch_sub(1, Ordering::Relaxed);
245 }
246
247 log::info!("Successfully removed exchange: {exchange_name}");
248 }
249
250 Ok(())
251 }
252
253 pub async fn start_collection(
255 &self,
256 exchange_name: &str,
257 symbols: Vec<SmartString>,
258 data_types: Vec<DataType>,
259 event_sender: Sender<MarketDataEvent>,
260 ) -> Result<()> {
261 let exchange_key: SmartString = exchange_name.into();
262
263 let metadata = {
264 let exchanges = self.exchanges.read();
265 exchanges.get(&exchange_key).cloned()
266 };
267
268 let metadata = metadata.ok_or_else(|| {
269 Box::new(std::io::Error::new(
270 std::io::ErrorKind::NotFound,
271 format!("Exchange {exchange_name} not found"),
272 )) as Box<dyn std::error::Error + Send + Sync>
273 })?;
274
275 if !metadata.state.is_connected() {
277 return Err(Box::new(std::io::Error::new(
278 std::io::ErrorKind::NotConnected,
279 format!("Exchange {exchange_name} is not connected"),
280 )) as Box<dyn std::error::Error + Send + Sync>)?;
281 }
282
283 {
285 let has_client = metadata.client.read().is_some();
286 if !has_client {
287 return Err(Box::new(std::io::Error::new(
288 std::io::ErrorKind::NotFound,
289 format!("No client for exchange {exchange_name}"),
290 ))
291 as Box<dyn std::error::Error + Send + Sync>)?;
292 }
293 }
294
295 struct ClientGuard<'a> {
297 client: Option<ExchangeClient>,
298 storage: &'a RwLock<Option<ExchangeClient>>,
299 }
300
301 impl<'a> Drop for ClientGuard<'a> {
302 fn drop(&mut self) {
303 if let Some(client) = self.client.take() {
304 *self.storage.write() = Some(client);
305 }
306 }
307 }
308
309 let mut guard = ClientGuard {
311 client: metadata.client.write().take(),
312 storage: &metadata.client,
313 };
314
315 if let Some(client) = &mut guard.client {
316 client
317 .start_collection(symbols.clone(), data_types.clone(), event_sender)
318 .await?;
319 }
320
321 {
323 let mut active_tasks = metadata.active_tasks.write();
324 let mut task_count = 0;
325
326 for symbol in &symbols {
327 for data_type in &data_types {
328 let task_id = format!("{exchange_name}:{symbol}:{data_type:?}");
329 active_tasks.insert(
330 task_id,
331 CollectionTask {
332 exchange: exchange_key.clone(),
333 symbol: symbol.clone(),
334 data_types: vec![*data_type],
335 enabled: true,
336 retry_count: 0,
337 max_retries: 3,
338 backoff_ms: 1000,
339 },
340 );
341 task_count += 1;
342 }
343 }
344
345 self.total_active_tasks
346 .fetch_add(task_count, Ordering::Relaxed);
347 }
348
349 log::info!(
350 "Started collection for exchange {} with {} symbols and {} data types",
351 exchange_name,
352 symbols.len(),
353 data_types.len()
354 );
355
356 Ok(())
357 }
358
359 #[inline(always)]
361 pub fn record_message(&self, exchange_name: &str, bytes: u64) {
362 let exchange_key: SmartString = exchange_name.into();
363
364 self.total_messages_received.fetch_add(1, Ordering::Relaxed);
366 self.total_bytes_received
367 .fetch_add(bytes, Ordering::Relaxed);
368
369 let metadata = {
371 let exchanges = self.exchanges.read();
372 exchanges.get(&exchange_key).cloned()
373 };
374
375 if let Some(metadata) = metadata {
376 metadata.state.record_message(bytes);
377 }
378 }
379
380 #[inline(always)]
382 pub fn record_error(&self, exchange_name: &str) {
383 let exchange_key: SmartString = exchange_name.into();
384
385 self.total_errors.fetch_add(1, Ordering::Relaxed);
387
388 let metadata = {
389 let exchanges = self.exchanges.read();
390 exchanges.get(&exchange_key).cloned()
391 };
392
393 if let Some(metadata) = metadata {
394 metadata.state.record_error();
395 }
396 }
397
398 pub fn get_connection_status(&self) -> FxHashMap<SmartString, bool> {
400 let mut status_map = FxHashMap::default();
401
402 let exchanges = self.exchanges.read();
403 for (exchange_name, metadata) in exchanges.iter() {
404 status_map.insert(exchange_name.clone(), metadata.state.is_connected());
405 }
406
407 status_map
408 }
409
410 pub async fn health_check(&self) -> FxHashMap<SmartString, bool> {
412 let mut health_status = FxHashMap::default();
413
414 let exchanges = {
415 let exchanges = self.exchanges.read();
416 exchanges.clone()
417 };
418
419 for (exchange_name, metadata) in exchanges {
420 if metadata.state.try_start_health_check() {
422 let is_healthy = {
423 metadata.state.is_connected()
427 };
428
429 if !is_healthy {
431 metadata.state.record_error();
432 }
433
434 metadata
435 .last_health_check
436 .store(Clock::new().raw(), Ordering::Relaxed);
437 metadata.state.finish_health_check();
438
439 health_status.insert(exchange_name, is_healthy);
440 } else {
441 health_status.insert(exchange_name, metadata.state.is_connected());
443 }
444 }
445
446 health_status
447 }
448
449 pub fn get_statistics(&self) -> ExchangeManagerStats {
451 let uptime_seconds = self.manager_start_time.elapsed().as_secs_f64();
452
453 let total_messages = self.total_messages_received.load(Ordering::Relaxed);
455 let total_bytes = self.total_bytes_received.load(Ordering::Relaxed);
456
457 let messages_per_second = if uptime_seconds > 0.0 {
459 total_messages as f64 / uptime_seconds
460 } else {
461 0.0
462 };
463
464 let bytes_per_second = if uptime_seconds > 0.0 {
465 total_bytes as f64 / uptime_seconds
466 } else {
467 0.0
468 };
469
470 let mut exchange_stats = FxHashMap::default();
472 {
473 let exchanges = self.exchanges.read();
474 for (exchange_name, metadata) in exchanges.iter() {
475 let exchange_messages = metadata.state.messages_received.load(Ordering::Relaxed);
476 let exchange_bytes = metadata.state.bytes_received.load(Ordering::Relaxed);
477 let exchange_rate = if uptime_seconds > 0.0 {
478 exchange_messages as f64 / uptime_seconds
479 } else {
480 0.0
481 };
482
483 exchange_stats.insert(
484 exchange_name.clone(),
485 ExchangeStats {
486 connected: metadata.state.is_connected(),
487 messages_received: exchange_messages,
488 bytes_received: exchange_bytes,
489 messages_per_second: exchange_rate,
490 reconnection_attempts: metadata
491 .state
492 .reconnection_attempts
493 .load(Ordering::Relaxed),
494 last_message_timestamp: metadata
495 .state
496 .last_message_timestamp
497 .load(Ordering::Relaxed),
498 connection_start_time: metadata
499 .state
500 .connection_start_time
501 .load(Ordering::Relaxed),
502 },
503 );
504 }
505 }
506
507 ExchangeManagerStats {
508 total_exchanges: self.total_exchanges.load(Ordering::Relaxed),
509 total_active_tasks: self.total_active_tasks.load(Ordering::Relaxed),
510 total_messages_received: total_messages,
511 total_bytes_received: total_bytes,
512 total_errors: self.total_errors.load(Ordering::Relaxed),
513 messages_per_second,
514 bytes_per_second,
515 uptime_seconds,
516 exchange_stats,
517 }
518 }
519
520 pub fn request_shutdown(&self) {
522 self.shutdown_requested.store(true, Ordering::Release);
523 }
524
525 pub fn is_shutdown_requested(&self) -> bool {
527 self.shutdown_requested.load(Ordering::Acquire)
528 }
529
530 pub async fn shutdown(&self) -> Result<()> {
532 self.request_shutdown();
533
534 let exchanges = {
535 let mut exchanges = self.exchanges.write();
536 std::mem::take(&mut *exchanges)
537 };
538
539 for (exchange_name, metadata) in exchanges {
540 let client_opt = {
542 let mut client_lock = metadata.client.write();
543 client_lock.take()
544 };
545
546 if let Some(mut client) = client_opt
547 && let Err(e) = client.disconnect().await
548 {
549 log::error!("Error disconnecting exchange {exchange_name} during shutdown: {e}");
550 }
551 metadata.state.set_connected(false);
552 }
553
554 self.total_exchanges.store(0, Ordering::Relaxed);
556 self.total_active_tasks.store(0, Ordering::Relaxed);
557
558 log::info!("Exchange manager shutdown completed");
559 Ok(())
560 }
561}
562
563#[derive(Debug, Clone)]
569pub struct ExchangeManagerStats {
570 pub total_exchanges: usize,
572 pub total_active_tasks: usize,
574 pub total_messages_received: u64,
576 pub total_bytes_received: u64,
578 pub total_errors: u64,
580 pub messages_per_second: f64,
582 pub bytes_per_second: f64,
584 pub uptime_seconds: f64,
586 pub exchange_stats: FxHashMap<SmartString, ExchangeStats>,
588}
589
590#[derive(Debug, Clone)]
596pub struct ExchangeStats {
597 pub connected: bool,
599 pub messages_received: u64,
601 pub bytes_received: u64,
603 pub messages_per_second: f64,
605 pub reconnection_attempts: usize,
607 pub last_message_timestamp: u64,
609 pub connection_start_time: u64,
611}
612
613impl Default for LockFreeExchangeManager {
614 fn default() -> Self {
615 Self::new()
616 }
617}
618
619unsafe impl Send for LockFreeExchangeManager {}
621unsafe impl Sync for LockFreeExchangeManager {}
622
623#[cfg(test)]
624mod tests {
625 use super::*;
626 use std::thread;
627 use std::time::Duration;
628
629 #[tokio::test]
630 async fn test_lockfree_exchange_manager_basic_operations() {
631 let manager = LockFreeExchangeManager::new();
632
633 let stats = manager.get_statistics();
635 assert_eq!(stats.total_exchanges, 0);
636 assert_eq!(stats.total_messages_received, 0);
637
638 assert!(!manager.is_shutdown_requested());
640 manager.request_shutdown();
641 assert!(manager.is_shutdown_requested());
642 }
643
644 #[tokio::test]
645 async fn test_lockfree_exchange_manager_message_recording() {
646 let manager = Arc::new(LockFreeExchangeManager::new());
647
648 let mut handles = Vec::new();
650 for i in 0..10 {
651 let manager_clone = Arc::clone(&manager);
652 let handle = thread::spawn(move || {
653 for j in 0..100 {
654 manager_clone.record_message(&format!("exchange_{i}"), j as u64);
655 }
656 });
657 handles.push(handle);
658 }
659
660 for handle in handles {
662 handle.join().unwrap();
663 }
664
665 let stats = manager.get_statistics();
666 assert_eq!(stats.total_messages_received, 1000); assert!(stats.total_bytes_received > 0);
668 }
669
670 #[tokio::test]
671 async fn test_lockfree_exchange_manager_error_recording() {
672 let manager = LockFreeExchangeManager::new();
673
674 manager.record_error("test_exchange");
676 manager.record_error("test_exchange");
677
678 let stats = manager.get_statistics();
679 assert_eq!(stats.total_errors, 2);
680 }
681
682 #[test]
683 fn test_atomic_exchange_state() {
684 let state = AtomicExchangeState::new();
685
686 assert!(!state.is_connected());
688 assert_eq!(state.reconnection_attempts.load(Ordering::Relaxed), 0);
689
690 state.set_connected(true);
692 assert!(state.is_connected());
693
694 state.record_error();
695 assert!(!state.is_connected());
696
697 state.record_message(1024);
699 assert_eq!(state.messages_received.load(Ordering::Relaxed), 1);
700 assert_eq!(state.bytes_received.load(Ordering::Relaxed), 1024);
701
702 state.increment_reconnection_attempts();
704 state.increment_reconnection_attempts();
705 assert_eq!(state.reconnection_attempts.load(Ordering::Relaxed), 2);
706
707 state.reset_reconnection_attempts();
708 assert_eq!(state.reconnection_attempts.load(Ordering::Relaxed), 0);
709
710 assert!(state.try_start_health_check());
712 assert!(!state.try_start_health_check()); state.finish_health_check();
715 assert!(state.try_start_health_check()); }
717
718 #[tokio::test]
719 async fn test_lockfree_exchange_manager_statistics() {
720 let manager = LockFreeExchangeManager::new();
721
722 manager.record_message("exchange1", 1000);
724 manager.record_message("exchange2", 2000);
725 manager.record_error("exchange1");
726
727 thread::sleep(Duration::from_millis(100));
729
730 let stats = manager.get_statistics();
731 assert_eq!(stats.total_messages_received, 2);
732 assert_eq!(stats.total_bytes_received, 3000);
733 assert_eq!(stats.total_errors, 1);
734 assert!(stats.messages_per_second > 0.0);
735 assert!(stats.bytes_per_second > 0.0);
736 assert!(stats.uptime_seconds > 0.0);
737 }
738}