rusty_bin/monitor/
system_monitor.rs

1//! System monitoring and health check implementation
2//!
3//! This module provides comprehensive system monitoring including health checks,
4//! application metrics, and system metrics collection.
5
6use std::{sync::Arc, time::Duration};
7
8use flume::{Receiver, Sender};
9use rusty_common::collections::FxHashMap;
10use sysinfo::System;
11use tokio::{sync::RwLock, task::JoinHandle, time::interval};
12
13use crate::monitor::{
14    ApplicationMetrics, HealthCheck, HealthStatus, MonitorConfig, SystemMetrics,
15    metrics::MetricsCollector,
16};
17
18/// System monitor for health checks and metrics collection
19#[derive(Debug)]
20pub struct SystemMonitor {
21    /// Monitor configuration
22    config: MonitorConfig,
23    /// Application metrics
24    app_metrics: Arc<RwLock<ApplicationMetrics>>,
25    /// Metrics collector instance
26    metrics_collector: MetricsCollector,
27    /// Latest system metrics
28    latest_system_metrics: Arc<RwLock<SystemMetrics>>,
29    /// Health check results
30    health_checks: Arc<RwLock<Vec<HealthCheck>>>,
31    /// Shutdown signal sender for monitoring
32    shutdown_monitoring_signal_tx: Arc<RwLock<Sender<()>>>,
33    /// Shutdown signal receiver for monitoring
34    shutdown_monitor_signal_rx: Arc<RwLock<Receiver<()>>>,
35    /// Shutdown signal sender for health checks
36    shutdown_health_signal_tx: Arc<RwLock<Sender<()>>>,
37    /// Shutdown signal receiver for health checks
38    shutdown_health_signal_rx: Arc<RwLock<Receiver<()>>>,
39}
40
41impl SystemMonitor {
42    /// Create a new system monitor
43    #[must_use]
44    pub fn new(config: MonitorConfig) -> Self {
45        let app_metrics = ApplicationMetrics {
46            timestamp: crate::monitor::utils::time::now_nanos(),
47            trades_processed: 0,
48            orderbooks_processed: 0,
49            files_written: 0,
50            bytes_written: 0,
51            compression_ratio: 0.0,
52            active_connections: 0,
53            error_count: 0,
54            latency_p50: 0.0,
55            latency_p95: 0.0,
56            latency_p99: 0.0,
57        };
58
59        let (shutdown_tx, shutdown_rx) = flume::unbounded();
60        let (shutdown_health_tx, shutdown_health_rx) = flume::unbounded();
61
62        let metrics_collector = MetricsCollector::new();
63        let initial_system_metrics = metrics_collector.collect_system_metrics();
64
65        Self {
66            config,
67            app_metrics: Arc::new(RwLock::new(app_metrics)),
68            metrics_collector,
69            latest_system_metrics: Arc::new(RwLock::new(initial_system_metrics)),
70            health_checks: Arc::new(RwLock::new(Vec::new())),
71
72            shutdown_monitoring_signal_tx: Arc::new(RwLock::new(shutdown_tx)),
73            shutdown_monitor_signal_rx: Arc::new(RwLock::new(shutdown_rx)),
74
75            shutdown_health_signal_tx: Arc::new(RwLock::new(shutdown_health_tx)),
76            shutdown_health_signal_rx: Arc::new(RwLock::new(shutdown_health_rx)),
77        }
78    }
79
80    /// Start monitoring (both metrics and health checks)
81    pub async fn start_monitoring(&self) -> JoinHandle<()> {
82        log::info!("Starting system monitoring");
83
84        // Start metrics collection in background
85        let config = self.config.clone();
86        let app_metrics = self.app_metrics.clone();
87        let latest_system_metrics = self.latest_system_metrics.clone();
88        let metrics_collector = self.metrics_collector.clone();
89
90        let shutdown_monitor_signal_rx = self.shutdown_monitor_signal_rx.clone();
91
92        tokio::spawn(async move {
93            let mut interval = interval(Duration::from_secs(
94                config.monitoring.metrics_interval_seconds,
95            ));
96
97            loop {
98                tokio::select! {
99                    _ = interval.tick() => {
100                        // Update application metrics timestamp
101                        {
102                            let mut metrics = app_metrics.write().await;
103                            metrics.timestamp = crate::monitor::utils::time::now_nanos();
104                        }
105
106                        // Collect and update system metrics
107                        let metrics_collector_clone = metrics_collector.clone();
108                        let new_system_metrics =
109                            tokio::task::spawn_blocking(move || metrics_collector_clone.collect_system_metrics())
110                                .await
111                                .unwrap();
112
113                        {
114                            let mut system_metrics = latest_system_metrics.write().await;
115                            *system_metrics = new_system_metrics;
116                        }
117                    }
118                    _ = shutdown_monitor_signal_rx.read() => {
119                        break;
120                    }
121                }
122            }
123        })
124    }
125
126    /// Stop monitoring
127    pub async fn stop_monitoring(&self) {
128        log::info!("Stopping system monitoring");
129        let _ = self.shutdown_monitoring_signal_tx.write().await.send(());
130        log::info!("System monitoring stopped");
131    }
132
133    /// Start health monitoring background task
134    ///
135    /// Returns a JoinHandle to the spawned health monitoring task that periodically checks system health
136    pub async fn start_health_monitoring(&self) -> JoinHandle<()> {
137        log::info!("Starting health monitoring");
138
139        // Start health monitoring in background
140        let config = self.config.clone();
141        let shutdown_health_signal_rx = self.shutdown_health_signal_rx.clone();
142        let health_checks = self.health_checks.clone();
143
144        tokio::spawn(async move {
145            let mut interval = interval(Duration::from_secs(
146                config.monitoring.health_check_interval_seconds,
147            ));
148
149            loop {
150                tokio::select! {
151                    _ = interval.tick() => {
152                        // Perform health checks
153                        let health_check_results = Self::perform_health_checks_static(&config).await;
154
155                        // Store health check results
156                        *health_checks.write().await = health_check_results.clone();
157
158                        // Check for critical issues
159                        for check in &health_check_results {
160                            match check.status {
161                                HealthStatus::Critical => {
162                                    log::error!("Critical health check failed: {} - {}", check.name, check.message);
163                                    // Here we could send alerts (Discord, email, etc.)
164                                }
165                                HealthStatus::Warning => {
166                                    log::warn!("Health check warning: {} - {}", check.name, check.message);
167                                }
168                                _ => {}
169                            }
170                        }
171                    }
172                    _ = shutdown_health_signal_rx.read() => {
173                        break;
174                    }
175                }
176            }
177        })
178    }
179
180    /// Collect current metrics
181    pub async fn get_system_metrics(&self) -> SystemMetrics {
182        self.latest_system_metrics.read().await.clone()
183    }
184
185    /// Check system health
186    pub async fn check_health(&self) -> Vec<HealthCheck> {
187        self.health_checks.read().await.clone()
188    }
189
190    /// Get latest stored health checks
191    pub async fn get_health_checks(&self) -> Vec<HealthCheck> {
192        self.health_checks.read().await.clone()
193    }
194
195    /// Get application metrics
196    pub async fn get_app_metrics(&self) -> ApplicationMetrics {
197        self.app_metrics.read().await.clone()
198    }
199
200    /// Update application metrics
201    pub async fn update_app_metrics<F>(&self, updater: F)
202    where
203        F: FnOnce(&mut ApplicationMetrics),
204    {
205        let mut metrics = self.app_metrics.write().await;
206        updater(&mut metrics);
207    }
208
209    /// Perform health checks (static version)
210    async fn perform_health_checks_static(config: &MonitorConfig) -> Vec<HealthCheck> {
211        let mut checks = Vec::new();
212
213        // Check disk space
214        checks.push(Self::check_disk_space_static(config).await);
215
216        // Check memory usage
217        checks.push(Self::check_memory_usage_static().await);
218
219        // Check data directories
220        checks.push(Self::check_data_directories_static(config).await);
221
222        // Check file permissions
223        checks.push(Self::check_file_permissions_static(config).await);
224
225        checks
226    }
227
228    /// Check disk space (static version)
229    async fn check_disk_space_static(_config: &MonitorConfig) -> HealthCheck {
230        use sysinfo::Disks;
231
232        let disks = Disks::new_with_refreshed_list();
233
234        let mut status = HealthStatus::Healthy;
235        let mut message = "Disk space is adequate".to_string();
236        let mut details = FxHashMap::default();
237
238        for disk in &disks {
239            let total = disk.total_space();
240            let available = disk.available_space();
241            let usage_percent = ((total - available) as f64 / total as f64) * 100.0;
242
243            let mount_point = disk.mount_point().to_string_lossy().to_string();
244            details.insert(
245                format!("disk_{}", mount_point.replace('/', "_")),
246                format!("{usage_percent:.1}% used"),
247            );
248
249            if usage_percent > 90.0 {
250                status = HealthStatus::Critical;
251                message = format!("Disk space critical: {mount_point} is {usage_percent:.1}% full");
252            } else if usage_percent > 80.0 && status == HealthStatus::Healthy {
253                status = HealthStatus::Warning;
254                message = format!("Disk space warning: {mount_point} is {usage_percent:.1}% full");
255            }
256        }
257
258        HealthCheck {
259            name: "disk_space".to_string(),
260            status,
261            message,
262            timestamp: crate::monitor::utils::time::now_nanos(),
263            details,
264        }
265    }
266
267    /// Check memory usage (static version)
268    async fn check_memory_usage_static() -> HealthCheck {
269        let mut system = System::new_all();
270        system.refresh_memory();
271
272        let memory_total = system.total_memory();
273        let memory_used = system.used_memory();
274        let usage_percent = (memory_used as f64 / memory_total as f64) * 100.0;
275
276        let (status, message) = if usage_percent > 90.0 {
277            (
278                HealthStatus::Critical,
279                format!("Memory usage critical: {usage_percent:.1}%"),
280            )
281        } else if usage_percent > 80.0 {
282            (
283                HealthStatus::Warning,
284                format!("Memory usage high: {usage_percent:.1}%"),
285            )
286        } else {
287            (
288                HealthStatus::Healthy,
289                format!("Memory usage normal: {usage_percent:.1}%"),
290            )
291        };
292
293        let mut details = FxHashMap::default();
294        details.insert("usage_percent".to_string(), format!("{usage_percent:.1}%"));
295        details.insert(
296            "used_mb".to_string(),
297            format!("{}", memory_used / 1024 / 1024),
298        );
299        details.insert(
300            "total_mb".to_string(),
301            format!("{}", memory_total / 1024 / 1024),
302        );
303
304        HealthCheck {
305            name: "memory_usage".to_string(),
306            status,
307            message,
308            timestamp: crate::monitor::utils::time::now_nanos(),
309            details,
310        }
311    }
312
313    /// Check data directories (static version)
314    async fn check_data_directories_static(config: &MonitorConfig) -> HealthCheck {
315        let directories = [&config.storage.base_path, &config.storage.market_data_path];
316
317        let mut status = HealthStatus::Healthy;
318        let mut message = "All data directories accessible".to_string();
319        let mut details = FxHashMap::default();
320
321        for (i, dir) in directories.iter().enumerate() {
322            let dir_name = format!("directory_{i}");
323
324            if !dir.exists() {
325                status = HealthStatus::Critical;
326                message = format!("Data directory missing: {}", dir.display());
327                details.insert(dir_name, "missing".to_string());
328            } else if !dir.is_dir() {
329                status = HealthStatus::Critical;
330                message = format!("Data path is not a directory: {}", dir.display());
331                details.insert(dir_name, "not_directory".to_string());
332            } else {
333                details.insert(dir_name, "ok".to_string());
334            }
335        }
336
337        HealthCheck {
338            name: "data_directories".to_string(),
339            status,
340            message,
341            timestamp: crate::monitor::utils::time::now_nanos(),
342            details,
343        }
344    }
345
346    /// Check file permissions (static version)
347    async fn check_file_permissions_static(config: &MonitorConfig) -> HealthCheck {
348        use std::fs::OpenOptions;
349
350        let test_file = config.storage.market_data_path.join(".permission_test");
351
352        let (status, message) = match OpenOptions::new()
353            .create(true)
354            .write(true)
355            .truncate(true)
356            .open(&test_file)
357        {
358            Ok(_) => {
359                // Clean up test file
360                let _ = std::fs::remove_file(&test_file);
361                (
362                    HealthStatus::Healthy,
363                    "File permissions are adequate".to_string(),
364                )
365            }
366            Err(e) => (
367                HealthStatus::Critical,
368                format!("Cannot write to data directory: {e}"),
369            ),
370        };
371
372        HealthCheck {
373            name: "file_permissions".to_string(),
374            status,
375            message,
376            timestamp: crate::monitor::utils::time::now_nanos(),
377            details: FxHashMap::default(),
378        }
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use super::*;
385
386    #[tokio::test]
387    async fn test_system_monitor() {
388        let config = MonitorConfig::default();
389        let system_monitor = SystemMonitor::new(config);
390
391        system_monitor.start_monitoring().await;
392
393        let system_metrics = system_monitor.get_system_metrics().await;
394        println!("System metrics: {system_metrics:?}");
395
396        let app_metrics = system_monitor.get_app_metrics().await;
397        println!("App metrics: {app_metrics:?}");
398    }
399}