rusty_bin/monitor/collector/
manager.rs1use crate::monitor::collector::{
7 CollectionStatus, CollectionTask, DataType, MarketDataEvent, Result,
8 exchange_client::ExchangeClient,
9};
10use crate::monitor::config::MonitorConfig;
11use flume::Sender;
12use futures::future;
13use quanta::Clock;
14use rusty_common::collections::FxHashMap;
15use smartstring::alias::String as SmartString;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use tokio::sync::{RwLock, oneshot};
19use tokio::task::JoinHandle;
20
21#[derive(Debug, Clone)]
23struct RetryState {
24 failure_time: Instant,
25 retry_count: u32,
26 next_retry_time: Instant,
27 last_error: String,
28}
29
30impl RetryState {
31 fn new(error: String) -> Self {
32 let now = Instant::now();
33 Self {
34 failure_time: now,
35 retry_count: 0,
36 next_retry_time: now + Duration::from_secs(5), last_error: error,
38 }
39 }
40
41 fn calculate_next_retry(&mut self) {
42 self.retry_count += 1;
43 let base_delay =
45 Duration::from_secs(5).min(Duration::from_secs(5 * (1 << self.retry_count)));
46 let max_delay = Duration::from_secs(300);
47 let delay = base_delay.min(max_delay);
48
49 let jitter_factor = 0.8 + (rand::random::<f64>() * 0.4);
52 let final_delay = delay.mul_f64(jitter_factor);
53
54 self.next_retry_time = Instant::now() + final_delay;
55 }
56
57 fn should_retry(&self) -> bool {
58 self.retry_count < 10 && Instant::now() >= self.next_retry_time
60 }
61}
62
63pub struct CollectionManager {
65 _config: MonitorConfig,
66 exchange_clients: Arc<RwLock<FxHashMap<SmartString, ExchangeClient>>>,
67 active_tasks: Arc<RwLock<FxHashMap<String, CollectionTask>>>,
68 collection_handles: Arc<RwLock<Vec<JoinHandle<()>>>>,
69 shutdown_tx: Arc<RwLock<Option<oneshot::Sender<()>>>>,
70 failed_exchanges: Arc<RwLock<FxHashMap<SmartString, RetryState>>>,
71}
72
73impl CollectionManager {
74 pub async fn new(config: MonitorConfig) -> Result<Self> {
76 Ok(Self {
77 _config: config,
78 exchange_clients: Arc::new(RwLock::new(FxHashMap::default())),
79 active_tasks: Arc::new(RwLock::new(FxHashMap::default())),
80 collection_handles: Arc::new(RwLock::new(Vec::new())),
81 shutdown_tx: Arc::new(RwLock::new(None)),
82 failed_exchanges: Arc::new(RwLock::new(FxHashMap::default())),
83 })
84 }
85
86 pub async fn start_collection(
88 &self,
89 exchange_filter: Option<Vec<SmartString>>,
90 symbol_filter: Option<Vec<SmartString>>,
91 event_sender: Sender<MarketDataEvent>,
92 ) -> Result<()> {
93 log::info!(
94 "Starting data collection with filters: exchanges={exchange_filter:?}, symbols={symbol_filter:?}"
95 );
96
97 let exchanges = match exchange_filter {
99 Some(exchanges) => exchanges,
100 None => vec![
101 "binance".into(),
102 ],
107 };
108
109 let symbols = match symbol_filter {
111 Some(symbols) => symbols,
112 None => vec![
113 "BTCUSDT".into(), "ETHUSDT".into(), "ADAUSDT".into(), "SOLUSDT".into(), "AVAXUSDT".into(), "DOTUSDT".into(), "KRW-BTC".into(), "BTC-USD".into(), "BTC_KRW".into(), ],
123 };
124
125 for exchange_name in exchanges {
127 let mut client = ExchangeClient::new(exchange_name.clone());
128
129 match client.connect().await {
130 Ok(()) => {
131 log::info!("Successfully connected to {exchange_name}");
132 self.failed_exchanges.write().await.remove(&exchange_name);
134 }
135 Err(e) => {
136 log::error!("Failed to connect to exchange {exchange_name}: {e}");
137 self.failed_exchanges
139 .write()
140 .await
141 .insert(exchange_name.clone(), RetryState::new(e.to_string()));
142 continue; }
144 }
145
146 log::info!("Successfully connected to exchange: {exchange_name}");
147
148 let exchange_symbols = self.filter_symbols_for_exchange(&exchange_name, &symbols);
150
151 if exchange_symbols.is_empty() {
152 log::warn!("No symbols to collect for exchange: {exchange_name}");
153 let _ = client.disconnect().await;
154 continue;
155 }
156
157 let data_types = vec![DataType::Trades, DataType::OrderBook];
159
160 client
161 .start_collection(
162 exchange_symbols.clone(),
163 data_types.clone(),
164 event_sender.clone(),
165 )
166 .await?;
167 log::info!(
168 "Started collection for exchange {exchange_name} with symbols: {exchange_symbols:?}"
169 );
170
171 let mut active_tasks = self.active_tasks.write().await;
173
174 for symbol in &exchange_symbols {
175 for data_type in &data_types {
176 let task_id = format!("{exchange_name}:{symbol}:{data_type}");
177 active_tasks.insert(
178 task_id,
179 CollectionTask {
180 exchange: exchange_name.clone(),
181 symbol: symbol.clone(),
182 data_types: vec![*data_type],
183 enabled: true,
184 retry_count: 0,
185 max_retries: 3,
186 backoff_ms: 1000,
187 },
188 );
189 }
190 }
191
192 self.exchange_clients
194 .write()
195 .await
196 .insert(exchange_name, client);
197 }
198
199 log::info!("All exchange connections initialized. Starting management loop...");
200
201 let (tx, mut rx) = oneshot::channel();
203 *self.shutdown_tx.write().await = Some(tx);
204 let mut health_check_interval = tokio::time::interval(std::time::Duration::from_secs(30));
205 let mut retry_interval = tokio::time::interval(std::time::Duration::from_secs(60));
206
207 loop {
208 tokio::select! {
209 _ = &mut rx => {
210 log::info!("CollectionManager received shutdown signal. Exiting management loop.");
211 break;
212 }
213 _ = health_check_interval.tick() => {
214 let health_status = self.health_check().await;
216
217 health_status
218 .into_iter()
219 .filter(|(_, healthy)| !healthy)
220 .for_each(|(exchange, _)| {
221 log::warn!("Exchange {exchange} connection unhealthy");
222 let failed_exchanges = self.failed_exchanges.clone();
224 let exchange = exchange.clone();
225 tokio::spawn(async move {
226 let mut failed = failed_exchanges.write().await;
227 failed.entry(exchange.clone())
228 .or_insert_with(|| RetryState::new("Health check failed".to_string()));
229 });
230 });
231 }
232 _ = retry_interval.tick() => {
233 self.retry_failed_exchanges(&event_sender).await;
235 }
236 }
237 }
238 log::info!("Collection management loop has stopped.");
239 Ok(())
240 }
241
242 pub async fn stop_all(&self) -> Result<()> {
244 log::info!("Stopping all collection tasks");
245
246 if let Some(tx) = self.shutdown_tx.write().await.take()
247 && tx.send(()).is_err()
248 {
249 log::warn!(
250 "Failed to send shutdown signal to management loop; it might have already stopped."
251 );
252 }
253
254 let mut handles = self.collection_handles.write().await;
256 for handle in handles.drain(..) {
257 handle.abort();
258 }
259
260 let mut clients = self.exchange_clients.write().await;
262 for (exchange_name, mut client) in clients.drain() {
263 if let Err(e) = client.disconnect().await {
264 log::error!("Failed to disconnect from exchange {exchange_name}: {e}");
265 }
266 }
267
268 self.active_tasks.write().await.clear();
270
271 log::info!("All collection tasks stopped");
272
273 Ok(())
274 }
275
276 pub async fn get_active_tasks(&self) -> FxHashMap<String, CollectionTask> {
278 self.active_tasks.read().await.clone()
279 }
280
281 pub async fn get_collection_status(&self) -> FxHashMap<SmartString, Vec<CollectionStatus>> {
283 let mut status_map = FxHashMap::default();
284 let clients = self.exchange_clients.read().await;
285
286 for (exchange_name, client) in clients.iter() {
287 if let Some(status) = client.get_status().await {
288 status_map
289 .entry(exchange_name.clone())
290 .or_insert_with(Vec::new)
291 .push(status);
292 }
293 }
294
295 status_map
296 }
297
298 pub async fn stop_exchange(&self, exchange_name: &str) -> Result<()> {
300 log::info!("Stopping collection for exchange: {exchange_name}");
301
302 let exchange_name_smart: SmartString = exchange_name.into();
303 let mut clients = self.exchange_clients.write().await;
304
305 if let Some(mut client) = clients.remove(&exchange_name_smart) {
306 client.disconnect().await?;
307
308 let mut active_tasks = self.active_tasks.write().await;
310 active_tasks.retain(|task_id, _| !task_id.starts_with(exchange_name));
311
312 log::info!("Successfully stopped collection for exchange: {exchange_name}");
313 } else {
314 log::warn!("Exchange {exchange_name} was not active");
315 }
316
317 Ok(())
318 }
319
320 fn filter_symbols_for_exchange(
322 &self,
323 exchange_name: &str,
324 symbols: &[SmartString],
325 ) -> Vec<SmartString> {
326 symbols
327 .iter()
328 .filter(|symbol| self.is_symbol_valid_for_exchange(exchange_name, symbol))
329 .cloned()
330 .collect()
331 }
332
333 fn is_symbol_valid_for_exchange(&self, exchange_name: &str, symbol: &str) -> bool {
335 match exchange_name.to_lowercase().as_str() {
336 "binance" => {
337 symbol.len() >= 6
339 && symbol.chars().all(|c| c.is_ascii_uppercase())
340 && !symbol.contains('-')
341 && !symbol.contains('_')
342 }
343 "upbit" => {
344 symbol.contains('-') && symbol.starts_with("KRW-")
346 }
347 "bybit" => {
348 symbol.len() >= 6
350 && symbol.chars().all(|c| c.is_ascii_uppercase())
351 && !symbol.contains('-')
352 && !symbol.contains('_')
353 }
354 "coinbase" => {
355 symbol.contains('-') && !symbol.starts_with("KRW-")
357 }
358 "bithumb" => {
359 symbol.contains('_')
361 }
362 _ => false,
363 }
364 }
365
366 pub async fn get_statistics(&self) -> FxHashMap<SmartString, u32> {
368 let mut stats = FxHashMap::default();
369 let clients = self.exchange_clients.read().await;
370
371 for (exchange_name, _) in clients.iter() {
372 let active_tasks = self.active_tasks.read().await;
373 let count = active_tasks
374 .values()
375 .filter(|task| task.exchange == *exchange_name)
376 .count() as u32;
377 stats.insert(exchange_name.clone(), count);
378 }
379
380 stats
381 }
382
383 async fn retry_failed_exchanges(&self, event_sender: &Sender<MarketDataEvent>) {
386 let failed_exchanges = self.failed_exchanges.read().await.clone();
387
388 for (exchange_name, mut retry_state) in failed_exchanges {
389 if !retry_state.should_retry() {
390 if retry_state.retry_count >= 10 {
391 log::error!(
392 "Exchange {} has failed {} times. Giving up. Last error: {}",
393 exchange_name,
394 retry_state.retry_count,
395 retry_state.last_error
396 );
397 self.failed_exchanges.write().await.remove(&exchange_name);
399 }
400 continue;
401 }
402
403 log::info!(
404 "Retrying connection to {} (attempt {} after {}s)",
405 exchange_name,
406 retry_state.retry_count + 1,
407 retry_state
408 .next_retry_time
409 .duration_since(retry_state.failure_time)
410 .as_secs()
411 );
412
413 let mut client = ExchangeClient::new(exchange_name.clone());
415
416 match client.connect().await {
417 Ok(()) => {
418 log::info!("Successfully reconnected to {exchange_name}");
419
420 self.failed_exchanges.write().await.remove(&exchange_name);
422
423 self.exchange_clients
425 .write()
426 .await
427 .insert(exchange_name.clone(), client);
428
429 let _ = event_sender.send(MarketDataEvent::ConnectionStatus {
434 exchange: exchange_name.clone(),
435 symbol: "".into(), connected: true,
437 timestamp: Clock::new().raw(),
438 });
439 }
440 Err(e) => {
441 log::error!(
442 "Failed to reconnect to {} (attempt {}): {}",
443 exchange_name,
444 retry_state.retry_count + 1,
445 e
446 );
447
448 retry_state.last_error = e.to_string();
450 retry_state.calculate_next_retry();
451
452 self.failed_exchanges
454 .write()
455 .await
456 .insert(exchange_name, retry_state);
457 }
458 }
459 }
460 }
461
462 pub async fn health_check(&self) -> FxHashMap<SmartString, bool> {
466 let clients = self.exchange_clients.read().await;
467
468 let health_futures = clients.iter().map(|(name, client)| async move {
469 let is_healthy = client
470 .get_status()
471 .await
472 .map(|status| status.connected)
473 .unwrap_or(false);
474 (name.clone(), is_healthy)
475 });
476
477 let health_results = future::join_all(health_futures).await;
478
479 health_results.into_iter().collect()
480 }
481}