rusty_engine/monitoring/
engine.rs

1//! Monitoring engine for high-performance metrics collection and processing
2//!
3//! This module provides the core monitoring engine that coordinates metric collection,
4//! aggregation, and persistence for the HFT trading system.
5
6use crate::monitoring::{
7    MonitoringConfig, collector::MetricCollector, metrics::MetricsSnapshot, writer::MmapWriter,
8};
9use anyhow::Result;
10use flume::{Receiver, Sender};
11use parking_lot::RwLock;
12use std::sync::{
13    Arc,
14    atomic::{AtomicBool, AtomicU64, Ordering},
15};
16use tokio::time::{Duration, interval};
17
18/// Statistics for the monitoring engine
19#[derive(Debug, Default, Clone)]
20pub struct EngineStats {
21    /// Total number of metrics processed by the engine
22    pub metrics_processed: u64,
23    /// Number of metric snapshots written to disk
24    pub snapshots_written: u64,
25    /// Total bytes written to memory-mapped files
26    pub bytes_written: u64,
27    /// Number of write errors encountered
28    pub write_errors: u64,
29    /// Total time spent in aggregation in nanoseconds
30    pub aggregation_time_ns: u64,
31}
32
33/// High-performance monitoring engine for HFT systems
34pub struct MonitoringEngine {
35    /// Configuration for the monitoring system
36    config: MonitoringConfig,
37    /// Metric collectors for different components
38    collectors: Arc<RwLock<Vec<Arc<MetricCollector>>>>,
39    /// Memory-mapped writer for persistence
40    writer: Arc<MmapWriter>,
41    /// Channel for metric snapshots
42    snapshot_sender: Sender<MetricsSnapshot>,
43    snapshot_receiver: Receiver<MetricsSnapshot>,
44    /// Shutdown flag
45    shutdown: Arc<AtomicBool>,
46    /// Statistics counters
47    stats: Arc<RwLock<EngineStats>>,
48    /// Collector name counter for auto-generated names
49    collector_counter: AtomicU64,
50}
51
52impl MonitoringEngine {
53    /// Create a new monitoring engine
54    pub fn new(config: MonitoringConfig) -> Result<Arc<Self>> {
55        // Create channel for metric snapshots
56        let (snapshot_sender, snapshot_receiver) = flume::bounded(config.channel_buffer_size);
57
58        // Create memory-mapped writer
59        let writer = Arc::new(MmapWriter::new(
60            &config.mmap_path,
61            config.mmap_file_size,
62            config.enable_compression,
63        )?);
64
65        let engine = Arc::new(Self {
66            config: config.clone(),
67            collectors: Arc::new(RwLock::new(Vec::new())),
68            writer,
69            snapshot_sender,
70            snapshot_receiver,
71            shutdown: Arc::new(AtomicBool::new(false)),
72            stats: Arc::new(RwLock::new(EngineStats::default())),
73            collector_counter: AtomicU64::new(0),
74        });
75
76        // Start background processing
77        let engine_clone = Arc::clone(&engine);
78        tokio::spawn(async move {
79            engine_clone.run_processing_loop().await;
80        });
81
82        Ok(engine)
83    }
84
85    /// Create a new metric collector with a custom name
86    #[must_use]
87    pub fn create_collector(&self, name: String) -> Arc<MetricCollector> {
88        let collector = Arc::new(MetricCollector::new(name, self.config.clone()));
89        self.collectors.write().push(Arc::clone(&collector));
90        collector
91    }
92
93    /// Register a new metric collector with auto-generated name
94    #[must_use]
95    pub fn register_collector(&self) -> Arc<MetricCollector> {
96        let counter = self.collector_counter.fetch_add(1, Ordering::Relaxed);
97        let collector = Arc::new(MetricCollector::new(
98            format!("collector_{counter}"),
99            self.config.clone(),
100        ));
101        self.collectors.write().push(Arc::clone(&collector));
102        collector
103    }
104
105    /// Get all registered collectors
106    #[must_use]
107    pub fn collectors(&self) -> Vec<Arc<MetricCollector>> {
108        self.collectors.read().clone()
109    }
110
111    /// Get the number of registered collectors
112    #[must_use]
113    pub fn collector_count(&self) -> usize {
114        self.collectors.read().len()
115    }
116
117    /// Get engine statistics
118    #[must_use]
119    pub fn stats(&self) -> EngineStats {
120        let mut stats = self.stats.read().clone();
121
122        // Add additional runtime stats
123        let total_metrics: u64 = self
124            .collectors()
125            .iter()
126            .map(|c| c.stats().metrics_recorded)
127            .sum();
128
129        stats.metrics_processed = total_metrics;
130        stats
131    }
132
133    /// Flush all pending metrics to storage
134    pub async fn flush(&self) -> Result<()> {
135        // Collect metrics from all collectors
136        let collectors = self.collectors.read().clone();
137        let mut all_metrics = Vec::new();
138
139        for collector in collectors {
140            all_metrics.extend(collector.flush());
141        }
142
143        if !all_metrics.is_empty() {
144            // Create snapshot
145            let snapshot = MetricsSnapshot {
146                timestamp_ns: std::time::SystemTime::now()
147                    .duration_since(std::time::UNIX_EPOCH)
148                    .unwrap()
149                    .as_nanos() as u64,
150                metrics: all_metrics
151                    .iter()
152                    .map(crate::monitoring::metrics::AggregatedMetric::from_metric)
153                    .collect(),
154            };
155
156            // Send for async processing
157            let _ = self.snapshot_sender.try_send(snapshot);
158        }
159
160        Ok(())
161    }
162
163    /// Flush all collectors synchronously (convenience method)
164    pub fn flush_all(&self) -> Result<()> {
165        let collectors = self.collectors.read().clone();
166        let mut all_metrics = Vec::new();
167
168        for collector in collectors {
169            all_metrics.extend(collector.flush());
170        }
171
172        if !all_metrics.is_empty() {
173            let snapshot = MetricsSnapshot {
174                timestamp_ns: std::time::SystemTime::now()
175                    .duration_since(std::time::UNIX_EPOCH)
176                    .unwrap()
177                    .as_nanos() as u64,
178                metrics: all_metrics
179                    .iter()
180                    .map(crate::monitoring::metrics::AggregatedMetric::from_metric)
181                    .collect(),
182            };
183
184            // Try to write directly (blocks if needed)
185            if let Err(e) = self.writer.write_snapshot(&snapshot) {
186                eprintln!("Failed to write metrics snapshot: {e}");
187                self.stats.write().write_errors += 1;
188            } else {
189                self.stats.write().snapshots_written += 1;
190            }
191        }
192
193        Ok(())
194    }
195
196    /// Shutdown the monitoring engine
197    pub async fn shutdown(&self) -> Result<()> {
198        self.shutdown.store(true, Ordering::Release);
199        self.flush().await?;
200        Ok(())
201    }
202
203    /// Main processing loop
204    async fn run_processing_loop(&self) {
205        let mut flush_interval = interval(Duration::from_millis(self.config.flush_interval_ms));
206
207        loop {
208            tokio::select! {
209                // Process incoming metric snapshots
210                Ok(snapshot) = self.snapshot_receiver.recv_async() => {
211                    let start = std::time::Instant::now();
212
213                    if let Err(_e) = self.writer.write_snapshot(&snapshot) {
214                        self.stats.write().write_errors += 1;
215                    } else {
216                        let mut stats = self.stats.write();
217                        stats.snapshots_written += 1;
218                        stats.aggregation_time_ns = start.elapsed().as_nanos() as u64;
219                    }
220                }
221
222                // Periodic flush
223                _ = flush_interval.tick() => {
224                    if let Err(_e) = self.flush().await {
225                        // Flush errors are logged in the flush method
226                    }
227                }
228
229                // Check for shutdown
230                _ = tokio::time::sleep(Duration::from_millis(100)) => {
231                    if self.shutdown.load(Ordering::Acquire) {
232                        break;
233                    }
234                }
235            }
236        }
237    }
238}
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243
244    #[tokio::test]
245    async fn test_monitoring_engine_creation() {
246        let config = MonitoringConfig::default();
247        let engine = MonitoringEngine::new(config).unwrap();
248        assert_eq!(engine.collector_count(), 0);
249    }
250
251    #[tokio::test]
252    async fn test_collector_registration() {
253        let config = MonitoringConfig::default();
254        let engine = MonitoringEngine::new(config).unwrap();
255
256        let _collector1 = engine.register_collector();
257        let _collector2 = engine.register_collector();
258
259        assert_eq!(engine.collector_count(), 2);
260    }
261}