1use std::{sync::Arc, time::Duration};
7
8use flume::{Receiver, Sender};
9use rusty_common::collections::FxHashMap;
10use sysinfo::System;
11use tokio::{sync::RwLock, task::JoinHandle, time::interval};
12
13use crate::monitor::{
14 ApplicationMetrics, HealthCheck, HealthStatus, MonitorConfig, SystemMetrics,
15 metrics::MetricsCollector,
16};
17
18#[derive(Debug)]
20pub struct SystemMonitor {
21 config: MonitorConfig,
23 app_metrics: Arc<RwLock<ApplicationMetrics>>,
25 metrics_collector: MetricsCollector,
27 latest_system_metrics: Arc<RwLock<SystemMetrics>>,
29 health_checks: Arc<RwLock<Vec<HealthCheck>>>,
31 shutdown_monitoring_signal_tx: Arc<RwLock<Sender<()>>>,
33 shutdown_monitor_signal_rx: Arc<RwLock<Receiver<()>>>,
35 shutdown_health_signal_tx: Arc<RwLock<Sender<()>>>,
37 shutdown_health_signal_rx: Arc<RwLock<Receiver<()>>>,
39}
40
41impl SystemMonitor {
42 #[must_use]
44 pub fn new(config: MonitorConfig) -> Self {
45 let app_metrics = ApplicationMetrics {
46 timestamp: crate::monitor::utils::time::now_nanos(),
47 trades_processed: 0,
48 orderbooks_processed: 0,
49 files_written: 0,
50 bytes_written: 0,
51 compression_ratio: 0.0,
52 active_connections: 0,
53 error_count: 0,
54 latency_p50: 0.0,
55 latency_p95: 0.0,
56 latency_p99: 0.0,
57 };
58
59 let (shutdown_tx, shutdown_rx) = flume::unbounded();
60 let (shutdown_health_tx, shutdown_health_rx) = flume::unbounded();
61
62 let metrics_collector = MetricsCollector::new();
63 let initial_system_metrics = metrics_collector.collect_system_metrics();
64
65 Self {
66 config,
67 app_metrics: Arc::new(RwLock::new(app_metrics)),
68 metrics_collector,
69 latest_system_metrics: Arc::new(RwLock::new(initial_system_metrics)),
70 health_checks: Arc::new(RwLock::new(Vec::new())),
71
72 shutdown_monitoring_signal_tx: Arc::new(RwLock::new(shutdown_tx)),
73 shutdown_monitor_signal_rx: Arc::new(RwLock::new(shutdown_rx)),
74
75 shutdown_health_signal_tx: Arc::new(RwLock::new(shutdown_health_tx)),
76 shutdown_health_signal_rx: Arc::new(RwLock::new(shutdown_health_rx)),
77 }
78 }
79
80 pub async fn start_monitoring(&self) -> JoinHandle<()> {
82 log::info!("Starting system monitoring");
83
84 let config = self.config.clone();
86 let app_metrics = self.app_metrics.clone();
87 let latest_system_metrics = self.latest_system_metrics.clone();
88 let metrics_collector = self.metrics_collector.clone();
89
90 let shutdown_monitor_signal_rx = self.shutdown_monitor_signal_rx.clone();
91
92 tokio::spawn(async move {
93 let mut interval = interval(Duration::from_secs(
94 config.monitoring.metrics_interval_seconds,
95 ));
96
97 loop {
98 tokio::select! {
99 _ = interval.tick() => {
100 {
102 let mut metrics = app_metrics.write().await;
103 metrics.timestamp = crate::monitor::utils::time::now_nanos();
104 }
105
106 let metrics_collector_clone = metrics_collector.clone();
108 let new_system_metrics =
109 tokio::task::spawn_blocking(move || metrics_collector_clone.collect_system_metrics())
110 .await
111 .unwrap();
112
113 {
114 let mut system_metrics = latest_system_metrics.write().await;
115 *system_metrics = new_system_metrics;
116 }
117 }
118 _ = shutdown_monitor_signal_rx.read() => {
119 break;
120 }
121 }
122 }
123 })
124 }
125
126 pub async fn stop_monitoring(&self) {
128 log::info!("Stopping system monitoring");
129 let _ = self.shutdown_monitoring_signal_tx.write().await.send(());
130 log::info!("System monitoring stopped");
131 }
132
133 pub async fn start_health_monitoring(&self) -> JoinHandle<()> {
137 log::info!("Starting health monitoring");
138
139 let config = self.config.clone();
141 let shutdown_health_signal_rx = self.shutdown_health_signal_rx.clone();
142 let health_checks = self.health_checks.clone();
143
144 tokio::spawn(async move {
145 let mut interval = interval(Duration::from_secs(
146 config.monitoring.health_check_interval_seconds,
147 ));
148
149 loop {
150 tokio::select! {
151 _ = interval.tick() => {
152 let health_check_results = Self::perform_health_checks_static(&config).await;
154
155 *health_checks.write().await = health_check_results.clone();
157
158 for check in &health_check_results {
160 match check.status {
161 HealthStatus::Critical => {
162 log::error!("Critical health check failed: {} - {}", check.name, check.message);
163 }
165 HealthStatus::Warning => {
166 log::warn!("Health check warning: {} - {}", check.name, check.message);
167 }
168 _ => {}
169 }
170 }
171 }
172 _ = shutdown_health_signal_rx.read() => {
173 break;
174 }
175 }
176 }
177 })
178 }
179
180 pub async fn get_system_metrics(&self) -> SystemMetrics {
182 self.latest_system_metrics.read().await.clone()
183 }
184
185 pub async fn check_health(&self) -> Vec<HealthCheck> {
187 self.health_checks.read().await.clone()
188 }
189
190 pub async fn get_health_checks(&self) -> Vec<HealthCheck> {
192 self.health_checks.read().await.clone()
193 }
194
195 pub async fn get_app_metrics(&self) -> ApplicationMetrics {
197 self.app_metrics.read().await.clone()
198 }
199
200 pub async fn update_app_metrics<F>(&self, updater: F)
202 where
203 F: FnOnce(&mut ApplicationMetrics),
204 {
205 let mut metrics = self.app_metrics.write().await;
206 updater(&mut metrics);
207 }
208
209 async fn perform_health_checks_static(config: &MonitorConfig) -> Vec<HealthCheck> {
211 let mut checks = Vec::new();
212
213 checks.push(Self::check_disk_space_static(config).await);
215
216 checks.push(Self::check_memory_usage_static().await);
218
219 checks.push(Self::check_data_directories_static(config).await);
221
222 checks.push(Self::check_file_permissions_static(config).await);
224
225 checks
226 }
227
228 async fn check_disk_space_static(_config: &MonitorConfig) -> HealthCheck {
230 use sysinfo::Disks;
231
232 let disks = Disks::new_with_refreshed_list();
233
234 let mut status = HealthStatus::Healthy;
235 let mut message = "Disk space is adequate".to_string();
236 let mut details = FxHashMap::default();
237
238 for disk in &disks {
239 let total = disk.total_space();
240 let available = disk.available_space();
241 let usage_percent = ((total - available) as f64 / total as f64) * 100.0;
242
243 let mount_point = disk.mount_point().to_string_lossy().to_string();
244 details.insert(
245 format!("disk_{}", mount_point.replace('/', "_")),
246 format!("{usage_percent:.1}% used"),
247 );
248
249 if usage_percent > 90.0 {
250 status = HealthStatus::Critical;
251 message = format!("Disk space critical: {mount_point} is {usage_percent:.1}% full");
252 } else if usage_percent > 80.0 && status == HealthStatus::Healthy {
253 status = HealthStatus::Warning;
254 message = format!("Disk space warning: {mount_point} is {usage_percent:.1}% full");
255 }
256 }
257
258 HealthCheck {
259 name: "disk_space".to_string(),
260 status,
261 message,
262 timestamp: crate::monitor::utils::time::now_nanos(),
263 details,
264 }
265 }
266
267 async fn check_memory_usage_static() -> HealthCheck {
269 let mut system = System::new_all();
270 system.refresh_memory();
271
272 let memory_total = system.total_memory();
273 let memory_used = system.used_memory();
274 let usage_percent = (memory_used as f64 / memory_total as f64) * 100.0;
275
276 let (status, message) = if usage_percent > 90.0 {
277 (
278 HealthStatus::Critical,
279 format!("Memory usage critical: {usage_percent:.1}%"),
280 )
281 } else if usage_percent > 80.0 {
282 (
283 HealthStatus::Warning,
284 format!("Memory usage high: {usage_percent:.1}%"),
285 )
286 } else {
287 (
288 HealthStatus::Healthy,
289 format!("Memory usage normal: {usage_percent:.1}%"),
290 )
291 };
292
293 let mut details = FxHashMap::default();
294 details.insert("usage_percent".to_string(), format!("{usage_percent:.1}%"));
295 details.insert(
296 "used_mb".to_string(),
297 format!("{}", memory_used / 1024 / 1024),
298 );
299 details.insert(
300 "total_mb".to_string(),
301 format!("{}", memory_total / 1024 / 1024),
302 );
303
304 HealthCheck {
305 name: "memory_usage".to_string(),
306 status,
307 message,
308 timestamp: crate::monitor::utils::time::now_nanos(),
309 details,
310 }
311 }
312
313 async fn check_data_directories_static(config: &MonitorConfig) -> HealthCheck {
315 let directories = [&config.storage.base_path, &config.storage.market_data_path];
316
317 let mut status = HealthStatus::Healthy;
318 let mut message = "All data directories accessible".to_string();
319 let mut details = FxHashMap::default();
320
321 for (i, dir) in directories.iter().enumerate() {
322 let dir_name = format!("directory_{i}");
323
324 if !dir.exists() {
325 status = HealthStatus::Critical;
326 message = format!("Data directory missing: {}", dir.display());
327 details.insert(dir_name, "missing".to_string());
328 } else if !dir.is_dir() {
329 status = HealthStatus::Critical;
330 message = format!("Data path is not a directory: {}", dir.display());
331 details.insert(dir_name, "not_directory".to_string());
332 } else {
333 details.insert(dir_name, "ok".to_string());
334 }
335 }
336
337 HealthCheck {
338 name: "data_directories".to_string(),
339 status,
340 message,
341 timestamp: crate::monitor::utils::time::now_nanos(),
342 details,
343 }
344 }
345
346 async fn check_file_permissions_static(config: &MonitorConfig) -> HealthCheck {
348 use std::fs::OpenOptions;
349
350 let test_file = config.storage.market_data_path.join(".permission_test");
351
352 let (status, message) = match OpenOptions::new()
353 .create(true)
354 .write(true)
355 .truncate(true)
356 .open(&test_file)
357 {
358 Ok(_) => {
359 let _ = std::fs::remove_file(&test_file);
361 (
362 HealthStatus::Healthy,
363 "File permissions are adequate".to_string(),
364 )
365 }
366 Err(e) => (
367 HealthStatus::Critical,
368 format!("Cannot write to data directory: {e}"),
369 ),
370 };
371
372 HealthCheck {
373 name: "file_permissions".to_string(),
374 status,
375 message,
376 timestamp: crate::monitor::utils::time::now_nanos(),
377 details: FxHashMap::default(),
378 }
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385
386 #[tokio::test]
387 async fn test_system_monitor() {
388 let config = MonitorConfig::default();
389 let system_monitor = SystemMonitor::new(config);
390
391 system_monitor.start_monitoring().await;
392
393 let system_metrics = system_monitor.get_system_metrics().await;
394 println!("System metrics: {system_metrics:?}");
395
396 let app_metrics = system_monitor.get_app_metrics().await;
397 println!("App metrics: {app_metrics:?}");
398 }
399}