rusty_bin/monitor/collector/
manager.rs

1//! Collection manager for coordinating data collection from multiple exchanges
2//!
3//! This module manages the collection of market data from various exchanges and symbols
4//! using the ExchangeClient infrastructure.
5
6use crate::monitor::collector::{
7    CollectionStatus, CollectionTask, DataType, MarketDataEvent, Result,
8    exchange_client::ExchangeClient,
9};
10use crate::monitor::config::MonitorConfig;
11use flume::Sender;
12use futures::future;
13use quanta::Clock;
14use rusty_common::collections::FxHashMap;
15use smartstring::alias::String as SmartString;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use tokio::sync::{RwLock, oneshot};
19use tokio::task::JoinHandle;
20
21/// Retry state for failed exchange connections
22#[derive(Debug, Clone)]
23struct RetryState {
24    failure_time: Instant,
25    retry_count: u32,
26    next_retry_time: Instant,
27    last_error: String,
28}
29
30impl RetryState {
31    fn new(error: String) -> Self {
32        let now = Instant::now();
33        Self {
34            failure_time: now,
35            retry_count: 0,
36            next_retry_time: now + Duration::from_secs(5), // Initial retry after 5 seconds
37            last_error: error,
38        }
39    }
40
41    fn calculate_next_retry(&mut self) {
42        self.retry_count += 1;
43        // Exponential backoff with jitter: 5s, 10s, 20s, 40s, 80s, 160s, max 300s (5 min)
44        let base_delay =
45            Duration::from_secs(5).min(Duration::from_secs(5 * (1 << self.retry_count)));
46        let max_delay = Duration::from_secs(300);
47        let delay = base_delay.min(max_delay);
48
49        // Add jitter (±20%) to avoid thundering herd
50        // Add jitter factor between 0.8 and 1.2
51        let jitter_factor = 0.8 + (rand::random::<f64>() * 0.4);
52        let final_delay = delay.mul_f64(jitter_factor);
53
54        self.next_retry_time = Instant::now() + final_delay;
55    }
56
57    fn should_retry(&self) -> bool {
58        // Give up after 10 retries
59        self.retry_count < 10 && Instant::now() >= self.next_retry_time
60    }
61}
62
63/// Collection manager that coordinates data collection from multiple exchanges
64pub struct CollectionManager {
65    _config: MonitorConfig,
66    exchange_clients: Arc<RwLock<FxHashMap<SmartString, ExchangeClient>>>,
67    active_tasks: Arc<RwLock<FxHashMap<String, CollectionTask>>>,
68    collection_handles: Arc<RwLock<Vec<JoinHandle<()>>>>,
69    shutdown_tx: Arc<RwLock<Option<oneshot::Sender<()>>>>,
70    failed_exchanges: Arc<RwLock<FxHashMap<SmartString, RetryState>>>,
71}
72
73impl CollectionManager {
74    /// Create a new collection manager
75    pub async fn new(config: MonitorConfig) -> Result<Self> {
76        Ok(Self {
77            _config: config,
78            exchange_clients: Arc::new(RwLock::new(FxHashMap::default())),
79            active_tasks: Arc::new(RwLock::new(FxHashMap::default())),
80            collection_handles: Arc::new(RwLock::new(Vec::new())),
81            shutdown_tx: Arc::new(RwLock::new(None)),
82            failed_exchanges: Arc::new(RwLock::new(FxHashMap::default())),
83        })
84    }
85
86    /// Start data collection for specified exchanges and symbols
87    pub async fn start_collection(
88        &self,
89        exchange_filter: Option<Vec<SmartString>>,
90        symbol_filter: Option<Vec<SmartString>>,
91        event_sender: Sender<MarketDataEvent>,
92    ) -> Result<()> {
93        log::info!(
94            "Starting data collection with filters: exchanges={exchange_filter:?}, symbols={symbol_filter:?}"
95        );
96
97        // Determine which exchanges to use
98        let exchanges = match exchange_filter {
99            Some(exchanges) => exchanges,
100            None => vec![
101                "binance".into(),
102                // "upbit".into(),
103                // "bybit".into(),
104                // "coinbase".into(),
105                // "bithumb".into(),
106            ],
107        };
108
109        // Determine which symbols to collect (default popular symbols if none specified)
110        let symbols = match symbol_filter {
111            Some(symbols) => symbols,
112            None => vec![
113                "BTCUSDT".into(),  // Binance Futures perpetual
114                "ETHUSDT".into(),  // Binance Futures perpetual
115                "ADAUSDT".into(),  // Binance Futures perpetual
116                "SOLUSDT".into(),  // Binance Futures perpetual
117                "AVAXUSDT".into(), // Binance Futures perpetual
118                "DOTUSDT".into(),  // Binance Futures perpetual
119                "KRW-BTC".into(),  // Upbit format
120                "BTC-USD".into(),  // Coinbase format
121                "BTC_KRW".into(),  // Bithumb format
122            ],
123        };
124
125        // Initialize exchange clients
126        for exchange_name in exchanges {
127            let mut client = ExchangeClient::new(exchange_name.clone());
128
129            match client.connect().await {
130                Ok(()) => {
131                    log::info!("Successfully connected to {exchange_name}");
132                    // Remove from failed exchanges if it was there
133                    self.failed_exchanges.write().await.remove(&exchange_name);
134                }
135                Err(e) => {
136                    log::error!("Failed to connect to exchange {exchange_name}: {e}");
137                    // Add to failed exchanges for retry
138                    self.failed_exchanges
139                        .write()
140                        .await
141                        .insert(exchange_name.clone(), RetryState::new(e.to_string()));
142                    continue; // Skip this exchange for now
143                }
144            }
145
146            log::info!("Successfully connected to exchange: {exchange_name}");
147
148            // Filter symbols appropriate for this exchange
149            let exchange_symbols = self.filter_symbols_for_exchange(&exchange_name, &symbols);
150
151            if exchange_symbols.is_empty() {
152                log::warn!("No symbols to collect for exchange: {exchange_name}");
153                let _ = client.disconnect().await;
154                continue;
155            }
156
157            // Start collection for both trades and orderbook
158            let data_types = vec![DataType::Trades, DataType::OrderBook];
159
160            client
161                .start_collection(
162                    exchange_symbols.clone(),
163                    data_types.clone(),
164                    event_sender.clone(),
165                )
166                .await?;
167            log::info!(
168                "Started collection for exchange {exchange_name} with symbols: {exchange_symbols:?}"
169            );
170
171            // Add tasks to tracking
172            let mut active_tasks = self.active_tasks.write().await;
173
174            for symbol in &exchange_symbols {
175                for data_type in &data_types {
176                    let task_id = format!("{exchange_name}:{symbol}:{data_type}");
177                    active_tasks.insert(
178                        task_id,
179                        CollectionTask {
180                            exchange: exchange_name.clone(),
181                            symbol: symbol.clone(),
182                            data_types: vec![*data_type],
183                            enabled: true,
184                            retry_count: 0,
185                            max_retries: 3,
186                            backoff_ms: 1000,
187                        },
188                    );
189                }
190            }
191
192            // Store the client
193            self.exchange_clients
194                .write()
195                .await
196                .insert(exchange_name, client);
197        }
198
199        log::info!("All exchange connections initialized. Starting management loop...");
200
201        // Management loop - keep the collection manager running
202        let (tx, mut rx) = oneshot::channel();
203        *self.shutdown_tx.write().await = Some(tx);
204        let mut health_check_interval = tokio::time::interval(std::time::Duration::from_secs(30));
205        let mut retry_interval = tokio::time::interval(std::time::Duration::from_secs(60));
206
207        loop {
208            tokio::select! {
209                _ = &mut rx => {
210                    log::info!("CollectionManager received shutdown signal. Exiting management loop.");
211                    break;
212                }
213                _ = health_check_interval.tick() => {
214                    // Perform health checks on all connections
215                    let health_status = self.health_check().await;
216
217                    health_status
218                        .into_iter()
219                        .filter(|(_, healthy)| !healthy)
220                        .for_each(|(exchange, _)| {
221                            log::warn!("Exchange {exchange} connection unhealthy");
222                            // Mark unhealthy exchanges for retry
223                            let failed_exchanges = self.failed_exchanges.clone();
224                            let exchange = exchange.clone();
225                            tokio::spawn(async move {
226                                let mut failed = failed_exchanges.write().await;
227                                failed.entry(exchange.clone())
228                                    .or_insert_with(|| RetryState::new("Health check failed".to_string()));
229                            });
230                        });
231                }
232                _ = retry_interval.tick() => {
233                    // Retry failed connections
234                    self.retry_failed_exchanges(&event_sender).await;
235                }
236            }
237        }
238        log::info!("Collection management loop has stopped.");
239        Ok(())
240    }
241
242    /// Stop all collection tasks
243    pub async fn stop_all(&self) -> Result<()> {
244        log::info!("Stopping all collection tasks");
245
246        if let Some(tx) = self.shutdown_tx.write().await.take()
247            && tx.send(()).is_err()
248        {
249            log::warn!(
250                "Failed to send shutdown signal to management loop; it might have already stopped."
251            );
252        }
253
254        // Stop all collection handles
255        let mut handles = self.collection_handles.write().await;
256        for handle in handles.drain(..) {
257            handle.abort();
258        }
259
260        // Disconnect all exchange clients
261        let mut clients = self.exchange_clients.write().await;
262        for (exchange_name, mut client) in clients.drain() {
263            if let Err(e) = client.disconnect().await {
264                log::error!("Failed to disconnect from exchange {exchange_name}: {e}");
265            }
266        }
267
268        // Clear active tasks
269        self.active_tasks.write().await.clear();
270
271        log::info!("All collection tasks stopped");
272
273        Ok(())
274    }
275
276    /// Get active tasks
277    pub async fn get_active_tasks(&self) -> FxHashMap<String, CollectionTask> {
278        self.active_tasks.read().await.clone()
279    }
280
281    /// Get collection status for all exchanges
282    pub async fn get_collection_status(&self) -> FxHashMap<SmartString, Vec<CollectionStatus>> {
283        let mut status_map = FxHashMap::default();
284        let clients = self.exchange_clients.read().await;
285
286        for (exchange_name, client) in clients.iter() {
287            if let Some(status) = client.get_status().await {
288                status_map
289                    .entry(exchange_name.clone())
290                    .or_insert_with(Vec::new)
291                    .push(status);
292            }
293        }
294
295        status_map
296    }
297
298    /// Stop collection for a specific exchange
299    pub async fn stop_exchange(&self, exchange_name: &str) -> Result<()> {
300        log::info!("Stopping collection for exchange: {exchange_name}");
301
302        let exchange_name_smart: SmartString = exchange_name.into();
303        let mut clients = self.exchange_clients.write().await;
304
305        if let Some(mut client) = clients.remove(&exchange_name_smart) {
306            client.disconnect().await?;
307
308            // Remove related tasks
309            let mut active_tasks = self.active_tasks.write().await;
310            active_tasks.retain(|task_id, _| !task_id.starts_with(exchange_name));
311
312            log::info!("Successfully stopped collection for exchange: {exchange_name}");
313        } else {
314            log::warn!("Exchange {exchange_name} was not active");
315        }
316
317        Ok(())
318    }
319
320    /// Filter symbols to only include those appropriate for the given exchange
321    fn filter_symbols_for_exchange(
322        &self,
323        exchange_name: &str,
324        symbols: &[SmartString],
325    ) -> Vec<SmartString> {
326        symbols
327            .iter()
328            .filter(|symbol| self.is_symbol_valid_for_exchange(exchange_name, symbol))
329            .cloned()
330            .collect()
331    }
332
333    /// Check if a symbol is valid for a specific exchange based on naming conventions
334    fn is_symbol_valid_for_exchange(&self, exchange_name: &str, symbol: &str) -> bool {
335        match exchange_name.to_lowercase().as_str() {
336            "binance" => {
337                // Binance uses format like BTCUSDT
338                symbol.len() >= 6
339                    && symbol.chars().all(|c| c.is_ascii_uppercase())
340                    && !symbol.contains('-')
341                    && !symbol.contains('_')
342            }
343            "upbit" => {
344                // Upbit uses format like KRW-BTC
345                symbol.contains('-') && symbol.starts_with("KRW-")
346            }
347            "bybit" => {
348                // Bybit uses format like BTCUSDT (similar to Binance)
349                symbol.len() >= 6
350                    && symbol.chars().all(|c| c.is_ascii_uppercase())
351                    && !symbol.contains('-')
352                    && !symbol.contains('_')
353            }
354            "coinbase" => {
355                // Coinbase uses format like BTC-USD
356                symbol.contains('-') && !symbol.starts_with("KRW-")
357            }
358            "bithumb" => {
359                // Bithumb uses format like BTC_KRW
360                symbol.contains('_')
361            }
362            _ => false,
363        }
364    }
365
366    /// Get statistics for all active collections
367    pub async fn get_statistics(&self) -> FxHashMap<SmartString, u32> {
368        let mut stats = FxHashMap::default();
369        let clients = self.exchange_clients.read().await;
370
371        for (exchange_name, _) in clients.iter() {
372            let active_tasks = self.active_tasks.read().await;
373            let count = active_tasks
374                .values()
375                .filter(|task| task.exchange == *exchange_name)
376                .count() as u32;
377            stats.insert(exchange_name.clone(), count);
378        }
379
380        stats
381    }
382
383    /// Health check for all exchange connections
384    /// Retry failed exchange connections
385    async fn retry_failed_exchanges(&self, event_sender: &Sender<MarketDataEvent>) {
386        let failed_exchanges = self.failed_exchanges.read().await.clone();
387
388        for (exchange_name, mut retry_state) in failed_exchanges {
389            if !retry_state.should_retry() {
390                if retry_state.retry_count >= 10 {
391                    log::error!(
392                        "Exchange {} has failed {} times. Giving up. Last error: {}",
393                        exchange_name,
394                        retry_state.retry_count,
395                        retry_state.last_error
396                    );
397                    // Remove from retry list
398                    self.failed_exchanges.write().await.remove(&exchange_name);
399                }
400                continue;
401            }
402
403            log::info!(
404                "Retrying connection to {} (attempt {} after {}s)",
405                exchange_name,
406                retry_state.retry_count + 1,
407                retry_state
408                    .next_retry_time
409                    .duration_since(retry_state.failure_time)
410                    .as_secs()
411            );
412
413            // Create a new client and try to connect
414            let mut client = ExchangeClient::new(exchange_name.clone());
415
416            match client.connect().await {
417                Ok(()) => {
418                    log::info!("Successfully reconnected to {exchange_name}");
419
420                    // Remove from failed list
421                    self.failed_exchanges.write().await.remove(&exchange_name);
422
423                    // Store the reconnected client
424                    self.exchange_clients
425                        .write()
426                        .await
427                        .insert(exchange_name.clone(), client);
428
429                    // TODO: Re-subscribe to previously active symbols for this exchange
430                    // For now, we'll just log that reconnection succeeded
431
432                    // Send a connection status event
433                    let _ = event_sender.send(MarketDataEvent::ConnectionStatus {
434                        exchange: exchange_name.clone(),
435                        symbol: "".into(), // No specific symbol for overall connection
436                        connected: true,
437                        timestamp: Clock::new().raw(),
438                    });
439                }
440                Err(e) => {
441                    log::error!(
442                        "Failed to reconnect to {} (attempt {}): {}",
443                        exchange_name,
444                        retry_state.retry_count + 1,
445                        e
446                    );
447
448                    // Update retry state
449                    retry_state.last_error = e.to_string();
450                    retry_state.calculate_next_retry();
451
452                    // Update in the map
453                    self.failed_exchanges
454                        .write()
455                        .await
456                        .insert(exchange_name, retry_state);
457                }
458            }
459        }
460    }
461
462    /// Perform health check on all registered exchanges
463    ///
464    /// Returns a map of exchange names to their health status (true if connected and healthy, false otherwise)
465    pub async fn health_check(&self) -> FxHashMap<SmartString, bool> {
466        let clients = self.exchange_clients.read().await;
467
468        let health_futures = clients.iter().map(|(name, client)| async move {
469            let is_healthy = client
470                .get_status()
471                .await
472                .map(|status| status.connected)
473                .unwrap_or(false);
474            (name.clone(), is_healthy)
475        });
476
477        let health_results = future::join_all(health_futures).await;
478
479        health_results.into_iter().collect()
480    }
481}