rusty_engine/monitoring/
engine.rs1use crate::monitoring::{
7 MonitoringConfig, collector::MetricCollector, metrics::MetricsSnapshot, writer::MmapWriter,
8};
9use anyhow::Result;
10use flume::{Receiver, Sender};
11use parking_lot::RwLock;
12use std::sync::{
13 Arc,
14 atomic::{AtomicBool, AtomicU64, Ordering},
15};
16use tokio::time::{Duration, interval};
17
18#[derive(Debug, Default, Clone)]
20pub struct EngineStats {
21 pub metrics_processed: u64,
23 pub snapshots_written: u64,
25 pub bytes_written: u64,
27 pub write_errors: u64,
29 pub aggregation_time_ns: u64,
31}
32
33pub struct MonitoringEngine {
35 config: MonitoringConfig,
37 collectors: Arc<RwLock<Vec<Arc<MetricCollector>>>>,
39 writer: Arc<MmapWriter>,
41 snapshot_sender: Sender<MetricsSnapshot>,
43 snapshot_receiver: Receiver<MetricsSnapshot>,
44 shutdown: Arc<AtomicBool>,
46 stats: Arc<RwLock<EngineStats>>,
48 collector_counter: AtomicU64,
50}
51
52impl MonitoringEngine {
53 pub fn new(config: MonitoringConfig) -> Result<Arc<Self>> {
55 let (snapshot_sender, snapshot_receiver) = flume::bounded(config.channel_buffer_size);
57
58 let writer = Arc::new(MmapWriter::new(
60 &config.mmap_path,
61 config.mmap_file_size,
62 config.enable_compression,
63 )?);
64
65 let engine = Arc::new(Self {
66 config: config.clone(),
67 collectors: Arc::new(RwLock::new(Vec::new())),
68 writer,
69 snapshot_sender,
70 snapshot_receiver,
71 shutdown: Arc::new(AtomicBool::new(false)),
72 stats: Arc::new(RwLock::new(EngineStats::default())),
73 collector_counter: AtomicU64::new(0),
74 });
75
76 let engine_clone = Arc::clone(&engine);
78 tokio::spawn(async move {
79 engine_clone.run_processing_loop().await;
80 });
81
82 Ok(engine)
83 }
84
85 #[must_use]
87 pub fn create_collector(&self, name: String) -> Arc<MetricCollector> {
88 let collector = Arc::new(MetricCollector::new(name, self.config.clone()));
89 self.collectors.write().push(Arc::clone(&collector));
90 collector
91 }
92
93 #[must_use]
95 pub fn register_collector(&self) -> Arc<MetricCollector> {
96 let counter = self.collector_counter.fetch_add(1, Ordering::Relaxed);
97 let collector = Arc::new(MetricCollector::new(
98 format!("collector_{counter}"),
99 self.config.clone(),
100 ));
101 self.collectors.write().push(Arc::clone(&collector));
102 collector
103 }
104
105 #[must_use]
107 pub fn collectors(&self) -> Vec<Arc<MetricCollector>> {
108 self.collectors.read().clone()
109 }
110
111 #[must_use]
113 pub fn collector_count(&self) -> usize {
114 self.collectors.read().len()
115 }
116
117 #[must_use]
119 pub fn stats(&self) -> EngineStats {
120 let mut stats = self.stats.read().clone();
121
122 let total_metrics: u64 = self
124 .collectors()
125 .iter()
126 .map(|c| c.stats().metrics_recorded)
127 .sum();
128
129 stats.metrics_processed = total_metrics;
130 stats
131 }
132
133 pub async fn flush(&self) -> Result<()> {
135 let collectors = self.collectors.read().clone();
137 let mut all_metrics = Vec::new();
138
139 for collector in collectors {
140 all_metrics.extend(collector.flush());
141 }
142
143 if !all_metrics.is_empty() {
144 let snapshot = MetricsSnapshot {
146 timestamp_ns: std::time::SystemTime::now()
147 .duration_since(std::time::UNIX_EPOCH)
148 .unwrap()
149 .as_nanos() as u64,
150 metrics: all_metrics
151 .iter()
152 .map(crate::monitoring::metrics::AggregatedMetric::from_metric)
153 .collect(),
154 };
155
156 let _ = self.snapshot_sender.try_send(snapshot);
158 }
159
160 Ok(())
161 }
162
163 pub fn flush_all(&self) -> Result<()> {
165 let collectors = self.collectors.read().clone();
166 let mut all_metrics = Vec::new();
167
168 for collector in collectors {
169 all_metrics.extend(collector.flush());
170 }
171
172 if !all_metrics.is_empty() {
173 let snapshot = MetricsSnapshot {
174 timestamp_ns: std::time::SystemTime::now()
175 .duration_since(std::time::UNIX_EPOCH)
176 .unwrap()
177 .as_nanos() as u64,
178 metrics: all_metrics
179 .iter()
180 .map(crate::monitoring::metrics::AggregatedMetric::from_metric)
181 .collect(),
182 };
183
184 if let Err(e) = self.writer.write_snapshot(&snapshot) {
186 eprintln!("Failed to write metrics snapshot: {e}");
187 self.stats.write().write_errors += 1;
188 } else {
189 self.stats.write().snapshots_written += 1;
190 }
191 }
192
193 Ok(())
194 }
195
196 pub async fn shutdown(&self) -> Result<()> {
198 self.shutdown.store(true, Ordering::Release);
199 self.flush().await?;
200 Ok(())
201 }
202
203 async fn run_processing_loop(&self) {
205 let mut flush_interval = interval(Duration::from_millis(self.config.flush_interval_ms));
206
207 loop {
208 tokio::select! {
209 Ok(snapshot) = self.snapshot_receiver.recv_async() => {
211 let start = std::time::Instant::now();
212
213 if let Err(_e) = self.writer.write_snapshot(&snapshot) {
214 self.stats.write().write_errors += 1;
215 } else {
216 let mut stats = self.stats.write();
217 stats.snapshots_written += 1;
218 stats.aggregation_time_ns = start.elapsed().as_nanos() as u64;
219 }
220 }
221
222 _ = flush_interval.tick() => {
224 if let Err(_e) = self.flush().await {
225 }
227 }
228
229 _ = tokio::time::sleep(Duration::from_millis(100)) => {
231 if self.shutdown.load(Ordering::Acquire) {
232 break;
233 }
234 }
235 }
236 }
237 }
238}
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243
244 #[tokio::test]
245 async fn test_monitoring_engine_creation() {
246 let config = MonitoringConfig::default();
247 let engine = MonitoringEngine::new(config).unwrap();
248 assert_eq!(engine.collector_count(), 0);
249 }
250
251 #[tokio::test]
252 async fn test_collector_registration() {
253 let config = MonitoringConfig::default();
254 let engine = MonitoringEngine::new(config).unwrap();
255
256 let _collector1 = engine.register_collector();
257 let _collector2 = engine.register_collector();
258
259 assert_eq!(engine.collector_count(), 2);
260 }
261}