1use mimalloc::MiMalloc;
7
8#[global_allocator]
9static GLOBAL: MiMalloc = MiMalloc;
10
11mod cli;
12
13use anyhow::{Context, Result};
14use clap::Parser;
15use cli::{Cli, Commands, ConfigArgs, ConfigCommands, StartArgs};
16use flume::{Receiver, Sender};
17use rusty_bin::monitor::system_monitor::SystemMonitor;
18use std::path::{Path, PathBuf};
19use std::sync::Arc;
20use tokio::signal;
21use tokio::task::JoinHandle;
22
23use rusty_bin::monitor::collector::{
24 CollectionManager, DataPipeline, MarketDataEvent, PipelineConfig,
25};
26use rusty_bin::monitor::config::MonitorConfig;
27use rusty_bin::monitor::config::storage::StorageConfig;
28use rusty_bin::monitor::storage::{CompressionMode, manager::StorageManager};
29
30struct MonitorServices {
31 collection_manager: Arc<CollectionManager>,
32 data_pipeline: DataPipeline,
33 system_monitor: Arc<SystemMonitor>,
34 event_sender: Sender<MarketDataEvent>,
35 event_receiver: Receiver<MarketDataEvent>,
36 config: MonitorConfig,
37}
38
39#[tokio::main]
40async fn main() -> Result<()> {
41 env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
43 .format_timestamp_nanos()
44 .init();
45
46 let cli = Cli::parse();
48
49 match cli
52 .command
53 .as_ref()
54 .unwrap_or(&Commands::Start(StartArgs {}))
55 {
56 Commands::Start(_) => start_monitor(&cli).await,
57 Commands::Config(config_args) => handle_config_command(config_args, &cli.config).await,
58 Commands::Status(_) => handle_status_command(&cli).await,
59 }
60}
61
62async fn start_monitor(cli: &Cli) -> Result<()> {
64 log::info!("🚀 Starting Rusty Market Data Monitor");
65
66 let mut services = initialize_services(cli).await?;
67 run_and_shutdown_services(&mut services, cli).await?;
68
69 log::info!("✅ Rusty Market Data Monitor shut down gracefully.");
70 Ok(())
71}
72
73async fn initialize_services(cli: &Cli) -> Result<MonitorServices> {
74 let config = match load_config(&cli.config) {
76 Ok(config) => config,
77 Err(e) => {
78 log::error!("Failed to load configuration: {e}");
79 log::error!(
80 "If you are running for the first time, please create default configuration file with `config generate`"
81 );
82 return Err(e);
83 }
84 };
85
86 config
88 .validate()
89 .context("Configuration validation failed")?;
90 config
91 .create_directories()
92 .context("Failed to create directories")?;
93
94 let storage_config = StorageConfig {
96 market_data_path: config.storage.market_data_path.clone(),
97 rotation_check_interval_s: 1,
98 max_file_size_mb: 1,
99 max_records_per_file: 1000,
100 ..Default::default()
101 };
102 let compression_mode = if config.storage.enable_compression {
103 CompressionMode::Realtime
104 } else {
105 CompressionMode::None
106 };
107 let pipeline_config = PipelineConfig {
108 batch_size: 100,
109 batch_timeout_ms: 100,
110 worker_count: 4,
111 compression_mode,
112 rotation_check_interval_seconds: 5,
113 };
114
115 let (event_sender, event_receiver) = flume::unbounded();
117
118 log::info!("📊 Initializing components...");
120 let collection_manager = Arc::new(CollectionManager::new(config.clone()).await?);
121 let storage_manager = Arc::new(StorageManager::new(storage_config).await?);
122 let data_pipeline = DataPipeline::new(pipeline_config, storage_manager);
123 let system_monitor = Arc::new(SystemMonitor::new(config.clone()));
124 log::info!("✅ Components initialized.");
125
126 Ok(MonitorServices {
127 collection_manager,
128 data_pipeline,
129 system_monitor,
130 event_sender,
131 event_receiver,
132 config,
133 })
134}
135
136async fn run_and_shutdown_services(services: &mut MonitorServices, cli: &Cli) -> Result<()> {
137 let exchange_filter = cli.exchanges.clone().map(|exchanges| {
139 exchanges
140 .into_iter()
141 .map(std::convert::Into::into)
142 .collect()
143 });
144 let symbol_filter = cli
145 .symbols
146 .clone()
147 .map(|symbols| symbols.into_iter().map(std::convert::Into::into).collect());
148
149 log::info!("🔄 Starting background services...");
151 services
152 .data_pipeline
153 .start(services.event_receiver.clone())
154 .await?;
155
156 let collection_handle = {
157 let manager = services.collection_manager.clone();
158 let sender = services.event_sender.clone();
159 tokio::spawn(async move {
160 if let Err(e) = manager
161 .start_collection(exchange_filter, symbol_filter, sender)
162 .await
163 {
164 log::error!("Collection manager failed: {e}");
165 } else {
166 log::info!("Collection manager completed successfully");
167 }
168 })
169 };
170
171 log::info!("Starting health monitoring...");
172 let health_handle = services.system_monitor.start_health_monitoring().await;
173
174 let mut metrics_handle: Option<JoinHandle<()>> = None;
175 if services.config.monitoring.enable_metrics {
176 log::info!("Starting metrics monitoring...");
177 metrics_handle = Some(services.system_monitor.start_monitoring().await);
178 log::info!("✅ Monitoring services started.");
179 }
180
181 wait_for_shutdown_signal().await;
183
184 log::info!("🔌 Shutting down services...");
186 log::info!("Stopping collection manager...");
187 services.collection_manager.stop_all().await?;
188 collection_handle.await?;
189 log::info!("Collection manager stopped.");
190
191 if services.config.monitoring.enable_metrics
192 && let Some(handle) = metrics_handle
193 {
194 log::info!("Stopping metrics monitoring...");
195 services.system_monitor.stop_monitoring().await;
196 handle.await?;
197 log::info!("Metrics monitoring stopped.");
198 }
199
200 log::info!("Stopping health monitoring...");
201 health_handle.abort();
202 log::info!("Health monitoring stopped.");
203
204 log::info!("Stopping data pipeline...");
205 services.data_pipeline.stop().await?;
206 log::info!("Data pipeline stopped.");
207
208 Ok(())
209}
210
211fn load_config(path: &PathBuf) -> Result<MonitorConfig> {
213 log::info!("Loading configuration from: {path:?}");
214 let config =
215 MonitorConfig::from_file(path).context(format!("Failed to load config file: {path:?}"))?;
216 Ok(config)
217}
218
219async fn handle_config_command(args: &ConfigArgs, config_path: &PathBuf) -> Result<()> {
221 match &args.command {
222 ConfigCommands::Validate => {
223 log::info!("Validating configuration file: {config_path:?}");
224 match load_config(config_path) {
225 Ok(config) => match config.validate() {
226 Ok(()) => log::info!("✅ Configuration is valid."),
227 Err(e) => log::error!("❌ Configuration validation failed: {e}"),
228 },
229 Err(e) => log::error!("❌ Failed to load configuration: {e}"),
230 }
231 }
232 ConfigCommands::Generate { output } => {
233 log::info!("Generating default configuration file at: {output:?}");
234 let config = MonitorConfig::default();
235 let toml_content = toml::to_string_pretty(&config)?;
236
237 let dir = Path::new(output)
239 .parent()
240 .ok_or(anyhow::anyhow!("Invalid output path"))?;
241 if !dir.exists() {
242 tokio::fs::create_dir_all(dir).await?;
243 }
244
245 tokio::fs::write(output, toml_content).await?;
246 log::info!("✅ Default configuration file generated successfully.");
247 }
248 }
249 Ok(())
250}
251
252async fn handle_status_command(cli: &Cli) -> Result<()> {
254 log::info!("Showing system status...");
255
256 let config = load_config(&cli.config)?;
257 let collection_manager = Arc::new(CollectionManager::new(config.clone()).await?);
258 let system_monitor = Arc::new(SystemMonitor::new(config.clone()));
259
260 let status = collection_manager.get_collection_status().await;
261 let stats = collection_manager.get_statistics().await;
262 let system_metrics = system_monitor.get_system_metrics().await;
263 let health_checks = system_monitor.get_health_checks().await;
264
265 println!("--- System Status ---");
266 display_system_status(&system_metrics, &health_checks);
267
268 println!("\n--- Collection Status ---");
269 display_collection_status(&status);
270
271 println!("\n--- Collection Statistics ---");
272 display_collection_statistics(&stats);
273
274 Ok(())
275}
276
277fn display_system_status(
279 metrics: &rusty_bin::monitor::SystemMetrics,
280 health_checks: &[rusty_bin::monitor::HealthCheck],
281) {
282 let uptime_seconds = metrics.uptime;
284 let uptime_hours = uptime_seconds / 3600;
285 let uptime_minutes = (uptime_seconds % 3600) / 60;
286 let uptime_secs = uptime_seconds % 60;
287
288 println!(" Uptime: {uptime_hours}h {uptime_minutes}m {uptime_secs}s");
289
290 println!(" CPU Usage: {:.1}%", metrics.cpu_usage);
292
293 let memory_gb = metrics.memory_total as f64 / (1024.0 * 1024.0 * 1024.0);
295 let memory_used_gb = metrics.memory_used as f64 / (1024.0 * 1024.0 * 1024.0);
296 println!(
297 " Memory: {:.2}GB / {:.2}GB ({:.1}%)",
298 memory_used_gb, memory_gb, metrics.memory_usage
299 );
300
301 if !metrics.disk_usage.is_empty() {
303 println!(" Disk Usage:");
304 for (mount, disk) in &metrics.disk_usage {
305 let total_gb = disk.total_space as f64 / (1024.0 * 1024.0 * 1024.0);
306 let used_gb = disk.used_space as f64 / (1024.0 * 1024.0 * 1024.0);
307 println!(
308 " {}: {:.2}GB / {:.2}GB ({:.1}%)",
309 mount, used_gb, total_gb, disk.usage_percent
310 );
311 }
312 }
313
314 let net_rx_mb = metrics.network_stats.bytes_received as f64 / (1024.0 * 1024.0);
316 let net_tx_mb = metrics.network_stats.bytes_transmitted as f64 / (1024.0 * 1024.0);
317 println!(" Network: RX {net_rx_mb:.2}MB, TX {net_tx_mb:.2}MB");
318
319 println!(" Health Status:");
321 for check in health_checks {
322 let status_icon = match check.status {
323 rusty_bin::monitor::HealthStatus::Healthy => "✅",
324 rusty_bin::monitor::HealthStatus::Warning => "⚠️",
325 rusty_bin::monitor::HealthStatus::Critical => "❌",
326 rusty_bin::monitor::HealthStatus::Unknown => "❓",
327 };
328 println!(" {} {}: {}", status_icon, check.name, check.message);
329 }
330}
331
332fn display_collection_status(
334 status: &rusty_common::collections::FxHashMap<
335 smartstring::alias::String,
336 Vec<rusty_bin::monitor::collector::CollectionStatus>,
337 >,
338) {
339 if status.is_empty() {
340 println!(" No active collections");
341 return;
342 }
343
344 for (exchange, statuses) in status {
345 println!(" Exchange: {exchange}");
346 for status in statuses {
347 let status_icon = if status.connected { "✅" } else { "❌" };
348 let data_types: Vec<String> = status
349 .data_types
350 .iter()
351 .map(std::string::ToString::to_string)
352 .collect();
353 println!(
354 " {} {} [{}] - {}",
355 status_icon,
356 status.symbol,
357 data_types.join(", "),
358 if status.connected {
359 "connected"
360 } else {
361 "disconnected"
362 }
363 );
364 }
365 }
366}
367
368fn display_collection_statistics(
370 stats: &rusty_common::collections::FxHashMap<smartstring::alias::String, u32>,
371) {
372 if stats.is_empty() {
373 println!(" No statistics available");
374 return;
375 }
376
377 let total_tasks: u32 = stats.values().sum();
378 println!(" Total active tasks: {total_tasks}");
379
380 for (exchange, count) in stats {
381 println!(" {exchange}: {count} tasks");
382 }
383}
384
385async fn wait_for_shutdown_signal() {
387 let ctrl_c = async {
388 signal::ctrl_c()
389 .await
390 .expect("Failed to install Ctrl+C handler");
391 };
392
393 #[cfg(unix)]
394 let terminate = async {
395 signal::unix::signal(signal::unix::SignalKind::terminate())
396 .expect("Failed to install signal handler")
397 .recv()
398 .await;
399 };
400
401 #[cfg(not(unix))]
402 let terminate = std::future::pending::<()>();
403
404 tokio::select! {
405 () = ctrl_c => {log::info!("Received Ctrl+C, initiating shutdown...")},
406 () = terminate => {log::info!("Received terminate signal, initiating shutdown...")},
407 }
408}