rusty_monitor/
main.rs

1//! Rusty Monitor - Real-time Market Data Collection System
2//!
3//! A high-performance market data collection system that gathers real-time
4//! trade and orderbook data from multiple cryptocurrency exchanges and stores
5//! them in `FlatBuffers` format with compression.
6use mimalloc::MiMalloc;
7
8#[global_allocator]
9static GLOBAL: MiMalloc = MiMalloc;
10
11mod cli;
12
13use anyhow::{Context, Result};
14use clap::Parser;
15use cli::{Cli, Commands, ConfigArgs, ConfigCommands, StartArgs};
16use flume::{Receiver, Sender};
17use rusty_bin::monitor::system_monitor::SystemMonitor;
18use std::path::{Path, PathBuf};
19use std::sync::Arc;
20use tokio::signal;
21use tokio::task::JoinHandle;
22
23use rusty_bin::monitor::collector::{
24    CollectionManager, DataPipeline, MarketDataEvent, PipelineConfig,
25};
26use rusty_bin::monitor::config::MonitorConfig;
27use rusty_bin::monitor::config::storage::StorageConfig;
28use rusty_bin::monitor::storage::{CompressionMode, manager::StorageManager};
29
30struct MonitorServices {
31    collection_manager: Arc<CollectionManager>,
32    data_pipeline: DataPipeline,
33    system_monitor: Arc<SystemMonitor>,
34    event_sender: Sender<MarketDataEvent>,
35    event_receiver: Receiver<MarketDataEvent>,
36    config: MonitorConfig,
37}
38
39#[tokio::main]
40async fn main() -> Result<()> {
41    // Initialize logging early
42    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
43        .format_timestamp_nanos()
44        .init();
45
46    // Parse command line arguments using clap's derive API
47    let cli = Cli::parse();
48
49    // Handle different commands
50    // If no subcommand is specified, default to the `start` command.
51    match cli
52        .command
53        .as_ref()
54        .unwrap_or(&Commands::Start(StartArgs {}))
55    {
56        Commands::Start(_) => start_monitor(&cli).await,
57        Commands::Config(config_args) => handle_config_command(config_args, &cli.config).await,
58        Commands::Status(_) => handle_status_command(&cli).await,
59    }
60}
61
62/// Start the market data monitor
63async fn start_monitor(cli: &Cli) -> Result<()> {
64    log::info!("🚀 Starting Rusty Market Data Monitor");
65
66    let mut services = initialize_services(cli).await?;
67    run_and_shutdown_services(&mut services, cli).await?;
68
69    log::info!("✅ Rusty Market Data Monitor shut down gracefully.");
70    Ok(())
71}
72
73async fn initialize_services(cli: &Cli) -> Result<MonitorServices> {
74    // 1. Load configuration
75    let config = match load_config(&cli.config) {
76        Ok(config) => config,
77        Err(e) => {
78            log::error!("Failed to load configuration: {e}");
79            log::error!(
80                "If you are running for the first time, please create default configuration file with `config generate`"
81            );
82            return Err(e);
83        }
84    };
85
86    // 2. Validate and setup
87    config
88        .validate()
89        .context("Configuration validation failed")?;
90    config
91        .create_directories()
92        .context("Failed to create directories")?;
93
94    // 3. Create component configurations
95    let storage_config = StorageConfig {
96        market_data_path: config.storage.market_data_path.clone(),
97        rotation_check_interval_s: 1,
98        max_file_size_mb: 1,
99        max_records_per_file: 1000,
100        ..Default::default()
101    };
102    let compression_mode = if config.storage.enable_compression {
103        CompressionMode::Realtime
104    } else {
105        CompressionMode::None
106    };
107    let pipeline_config = PipelineConfig {
108        batch_size: 100,
109        batch_timeout_ms: 100,
110        worker_count: 4,
111        compression_mode,
112        rotation_check_interval_seconds: 5,
113    };
114
115    // 4. Create communication channel
116    let (event_sender, event_receiver) = flume::unbounded();
117
118    // 5. Initialize managers and services
119    log::info!("📊 Initializing components...");
120    let collection_manager = Arc::new(CollectionManager::new(config.clone()).await?);
121    let storage_manager = Arc::new(StorageManager::new(storage_config).await?);
122    let data_pipeline = DataPipeline::new(pipeline_config, storage_manager);
123    let system_monitor = Arc::new(SystemMonitor::new(config.clone()));
124    log::info!("✅ Components initialized.");
125
126    Ok(MonitorServices {
127        collection_manager,
128        data_pipeline,
129        system_monitor,
130        event_sender,
131        event_receiver,
132        config,
133    })
134}
135
136async fn run_and_shutdown_services(services: &mut MonitorServices, cli: &Cli) -> Result<()> {
137    // Extract command line filters
138    let exchange_filter = cli.exchanges.clone().map(|exchanges| {
139        exchanges
140            .into_iter()
141            .map(std::convert::Into::into)
142            .collect()
143    });
144    let symbol_filter = cli
145        .symbols
146        .clone()
147        .map(|symbols| symbols.into_iter().map(std::convert::Into::into).collect());
148
149    // Start background tasks
150    log::info!("🔄 Starting background services...");
151    services
152        .data_pipeline
153        .start(services.event_receiver.clone())
154        .await?;
155
156    let collection_handle = {
157        let manager = services.collection_manager.clone();
158        let sender = services.event_sender.clone();
159        tokio::spawn(async move {
160            if let Err(e) = manager
161                .start_collection(exchange_filter, symbol_filter, sender)
162                .await
163            {
164                log::error!("Collection manager failed: {e}");
165            } else {
166                log::info!("Collection manager completed successfully");
167            }
168        })
169    };
170
171    log::info!("Starting health monitoring...");
172    let health_handle = services.system_monitor.start_health_monitoring().await;
173
174    let mut metrics_handle: Option<JoinHandle<()>> = None;
175    if services.config.monitoring.enable_metrics {
176        log::info!("Starting metrics monitoring...");
177        metrics_handle = Some(services.system_monitor.start_monitoring().await);
178        log::info!("✅ Monitoring services started.");
179    }
180
181    // Wait for shutdown signal (Ctrl+C)
182    wait_for_shutdown_signal().await;
183
184    // Graceful shutdown
185    log::info!("🔌 Shutting down services...");
186    log::info!("Stopping collection manager...");
187    services.collection_manager.stop_all().await?;
188    collection_handle.await?;
189    log::info!("Collection manager stopped.");
190
191    if services.config.monitoring.enable_metrics
192        && let Some(handle) = metrics_handle
193    {
194        log::info!("Stopping metrics monitoring...");
195        services.system_monitor.stop_monitoring().await;
196        handle.await?;
197        log::info!("Metrics monitoring stopped.");
198    }
199
200    log::info!("Stopping health monitoring...");
201    health_handle.abort();
202    log::info!("Health monitoring stopped.");
203
204    log::info!("Stopping data pipeline...");
205    services.data_pipeline.stop().await?;
206    log::info!("Data pipeline stopped.");
207
208    Ok(())
209}
210
211/// Load configuration from file
212fn load_config(path: &PathBuf) -> Result<MonitorConfig> {
213    log::info!("Loading configuration from: {path:?}");
214    let config =
215        MonitorConfig::from_file(path).context(format!("Failed to load config file: {path:?}"))?;
216    Ok(config)
217}
218
219/// Handle the `config` subcommand
220async fn handle_config_command(args: &ConfigArgs, config_path: &PathBuf) -> Result<()> {
221    match &args.command {
222        ConfigCommands::Validate => {
223            log::info!("Validating configuration file: {config_path:?}");
224            match load_config(config_path) {
225                Ok(config) => match config.validate() {
226                    Ok(()) => log::info!("✅ Configuration is valid."),
227                    Err(e) => log::error!("❌ Configuration validation failed: {e}"),
228                },
229                Err(e) => log::error!("❌ Failed to load configuration: {e}"),
230            }
231        }
232        ConfigCommands::Generate { output } => {
233            log::info!("Generating default configuration file at: {output:?}");
234            let config = MonitorConfig::default();
235            let toml_content = toml::to_string_pretty(&config)?;
236
237            // create directories if they don't exist
238            let dir = Path::new(output)
239                .parent()
240                .ok_or(anyhow::anyhow!("Invalid output path"))?;
241            if !dir.exists() {
242                tokio::fs::create_dir_all(dir).await?;
243            }
244
245            tokio::fs::write(output, toml_content).await?;
246            log::info!("✅ Default configuration file generated successfully.");
247        }
248    }
249    Ok(())
250}
251
252/// Handle the `status` subcommand
253async fn handle_status_command(cli: &Cli) -> Result<()> {
254    log::info!("Showing system status...");
255
256    let config = load_config(&cli.config)?;
257    let collection_manager = Arc::new(CollectionManager::new(config.clone()).await?);
258    let system_monitor = Arc::new(SystemMonitor::new(config.clone()));
259
260    let status = collection_manager.get_collection_status().await;
261    let stats = collection_manager.get_statistics().await;
262    let system_metrics = system_monitor.get_system_metrics().await;
263    let health_checks = system_monitor.get_health_checks().await;
264
265    println!("--- System Status ---");
266    display_system_status(&system_metrics, &health_checks);
267
268    println!("\n--- Collection Status ---");
269    display_collection_status(&status);
270
271    println!("\n--- Collection Statistics ---");
272    display_collection_statistics(&stats);
273
274    Ok(())
275}
276
277/// Display system status in a user-friendly format
278fn display_system_status(
279    metrics: &rusty_bin::monitor::SystemMetrics,
280    health_checks: &[rusty_bin::monitor::HealthCheck],
281) {
282    // System uptime
283    let uptime_seconds = metrics.uptime;
284    let uptime_hours = uptime_seconds / 3600;
285    let uptime_minutes = (uptime_seconds % 3600) / 60;
286    let uptime_secs = uptime_seconds % 60;
287
288    println!("  Uptime: {uptime_hours}h {uptime_minutes}m {uptime_secs}s");
289
290    // CPU usage
291    println!("  CPU Usage: {:.1}%", metrics.cpu_usage);
292
293    // Memory usage
294    let memory_gb = metrics.memory_total as f64 / (1024.0 * 1024.0 * 1024.0);
295    let memory_used_gb = metrics.memory_used as f64 / (1024.0 * 1024.0 * 1024.0);
296    println!(
297        "  Memory: {:.2}GB / {:.2}GB ({:.1}%)",
298        memory_used_gb, memory_gb, metrics.memory_usage
299    );
300
301    // Disk usage
302    if !metrics.disk_usage.is_empty() {
303        println!("  Disk Usage:");
304        for (mount, disk) in &metrics.disk_usage {
305            let total_gb = disk.total_space as f64 / (1024.0 * 1024.0 * 1024.0);
306            let used_gb = disk.used_space as f64 / (1024.0 * 1024.0 * 1024.0);
307            println!(
308                "    {}: {:.2}GB / {:.2}GB ({:.1}%)",
309                mount, used_gb, total_gb, disk.usage_percent
310            );
311        }
312    }
313
314    // Network stats
315    let net_rx_mb = metrics.network_stats.bytes_received as f64 / (1024.0 * 1024.0);
316    let net_tx_mb = metrics.network_stats.bytes_transmitted as f64 / (1024.0 * 1024.0);
317    println!("  Network: RX {net_rx_mb:.2}MB, TX {net_tx_mb:.2}MB");
318
319    // Health status
320    println!("  Health Status:");
321    for check in health_checks {
322        let status_icon = match check.status {
323            rusty_bin::monitor::HealthStatus::Healthy => "✅",
324            rusty_bin::monitor::HealthStatus::Warning => "⚠️",
325            rusty_bin::monitor::HealthStatus::Critical => "❌",
326            rusty_bin::monitor::HealthStatus::Unknown => "❓",
327        };
328        println!("    {} {}: {}", status_icon, check.name, check.message);
329    }
330}
331
332/// Display collection status in a user-friendly format
333fn display_collection_status(
334    status: &rusty_common::collections::FxHashMap<
335        smartstring::alias::String,
336        Vec<rusty_bin::monitor::collector::CollectionStatus>,
337    >,
338) {
339    if status.is_empty() {
340        println!("  No active collections");
341        return;
342    }
343
344    for (exchange, statuses) in status {
345        println!("  Exchange: {exchange}");
346        for status in statuses {
347            let status_icon = if status.connected { "✅" } else { "❌" };
348            let data_types: Vec<String> = status
349                .data_types
350                .iter()
351                .map(std::string::ToString::to_string)
352                .collect();
353            println!(
354                "    {} {} [{}] - {}",
355                status_icon,
356                status.symbol,
357                data_types.join(", "),
358                if status.connected {
359                    "connected"
360                } else {
361                    "disconnected"
362                }
363            );
364        }
365    }
366}
367
368/// Display collection statistics in a user-friendly format
369fn display_collection_statistics(
370    stats: &rusty_common::collections::FxHashMap<smartstring::alias::String, u32>,
371) {
372    if stats.is_empty() {
373        println!("  No statistics available");
374        return;
375    }
376
377    let total_tasks: u32 = stats.values().sum();
378    println!("  Total active tasks: {total_tasks}");
379
380    for (exchange, count) in stats {
381        println!("  {exchange}: {count} tasks");
382    }
383}
384
385/// Wait for a shutdown signal (e.g., Ctrl+C)
386async fn wait_for_shutdown_signal() {
387    let ctrl_c = async {
388        signal::ctrl_c()
389            .await
390            .expect("Failed to install Ctrl+C handler");
391    };
392
393    #[cfg(unix)]
394    let terminate = async {
395        signal::unix::signal(signal::unix::SignalKind::terminate())
396            .expect("Failed to install signal handler")
397            .recv()
398            .await;
399    };
400
401    #[cfg(not(unix))]
402    let terminate = std::future::pending::<()>();
403
404    tokio::select! {
405        () = ctrl_c => {log::info!("Received Ctrl+C, initiating shutdown...")},
406        () = terminate => {log::info!("Received terminate signal, initiating shutdown...")},
407    }
408}