rusty_engine/monitoring/
collector.rs1use crate::monitoring::MonitoringConfig;
7use crate::monitoring::metrics::Metric;
8use crate::monitoring::ring_buffer::SharedRingBuffer;
9use crate::timing::TimingUpkeep;
10use anyhow::{Result, anyhow};
11use flume::Sender;
12use parking_lot::RwLock;
13use std::sync::Arc;
14use std::thread;
15use std::time::Duration;
16
17pub struct MetricCollector {
19 name: String,
21 buffer: SharedRingBuffer<Metric>,
23 sender: Option<Sender<Vec<Metric>>>,
25 config: MonitoringConfig,
27 stats: Arc<RwLock<CollectorStats>>,
29}
30
31#[derive(Debug, Default)]
33pub struct CollectorStats {
34 pub metrics_recorded: u64,
36 pub metrics_sent: u64,
38 pub metrics_dropped: u64,
40 pub buffer_overflows: u64,
42 pub last_flush_ns: u64,
44 pub flush_count: u64,
46}
47
48impl MetricCollector {
49 #[must_use]
51 pub fn new(name: String, config: MonitoringConfig) -> Self {
52 Self {
53 name,
54 buffer: SharedRingBuffer::new(config.ring_buffer_size),
55 sender: None,
56 config,
57 stats: Arc::new(RwLock::new(CollectorStats::default())),
58 }
59 }
60
61 pub fn with_channel(
63 name: String,
64 config: MonitoringConfig,
65 sender: Sender<Vec<Metric>>,
66 ) -> Self {
67 Self {
68 name,
69 buffer: SharedRingBuffer::new(config.ring_buffer_size),
70 sender: Some(sender),
71 config,
72 stats: Arc::new(RwLock::new(CollectorStats::default())),
73 }
74 }
75
76 #[inline(always)]
78 pub fn record(&self, metric: Metric) {
79 if self.buffer.push(metric) {
81 let mut stats = self.stats.write();
83 stats.metrics_recorded += 1;
84 } else {
85 let mut stats = self.stats.write();
87 stats.metrics_dropped += 1;
88 stats.buffer_overflows = self.buffer.overflow_count() as u64;
89 }
90 }
91
92 pub fn start_flusher(&self) -> Result<thread::JoinHandle<()>> {
94 if self.sender.is_none() {
95 return Err(anyhow!("No channel configured for flushing"));
96 }
97
98 let buffer = self.buffer.clone();
99 let sender = self.sender.as_ref().unwrap().clone();
100 let flush_interval = Duration::from_millis(self.config.flush_interval_ms);
101 let stats = self.stats.clone();
102
103 let handle = thread::spawn(move || {
104 loop {
105 thread::sleep(flush_interval);
106
107 let metrics = buffer.drain(1024); if !metrics.is_empty() {
111 let count = metrics.len() as u64;
112
113 if sender.send(metrics).is_ok() {
115 let mut stats = stats.write();
116 stats.metrics_sent += count;
117 stats.last_flush_ns = TimingUpkeep::now_ns();
118 stats.flush_count += 1;
119 } else {
120 break;
122 }
123 }
124 }
125 });
126
127 Ok(handle)
128 }
129
130 #[must_use]
132 pub fn flush(&self) -> Vec<Metric> {
133 let metrics = self.buffer.drain(usize::MAX);
134
135 if !metrics.is_empty() {
136 let mut stats = self.stats.write();
137 stats.metrics_sent += metrics.len() as u64;
138 stats.last_flush_ns = TimingUpkeep::now_ns();
139 stats.flush_count += 1;
140 }
141
142 metrics
143 }
144
145 #[must_use]
147 pub fn stats(&self) -> CollectorStats {
148 self.stats.read().clone()
149 }
150
151 #[must_use]
153 pub fn name(&self) -> &str {
154 &self.name
155 }
156
157 #[must_use]
159 pub fn buffer_size(&self) -> usize {
160 self.buffer.len()
161 }
162}
163
164pub struct CollectorRegistry {
166 collectors: Arc<RwLock<Vec<Arc<MetricCollector>>>>,
167}
168
169impl CollectorRegistry {
170 #[must_use]
183 pub fn new() -> Self {
184 Self {
185 collectors: Arc::new(RwLock::new(Vec::new())),
186 }
187 }
188
189 pub fn register(&self, collector: Arc<MetricCollector>) {
191 self.collectors.write().push(collector);
192 }
193
194 #[must_use]
196 pub fn collectors(&self) -> Vec<Arc<MetricCollector>> {
197 self.collectors.read().clone()
198 }
199
200 #[must_use]
202 pub fn flush_all(&self) -> Vec<Metric> {
203 let mut all_metrics = Vec::new();
204
205 for collector in self.collectors.read().iter() {
206 all_metrics.extend(collector.flush());
207 }
208
209 all_metrics
210 }
211}
212
213impl Default for CollectorRegistry {
214 fn default() -> Self {
215 Self::new()
216 }
217}
218
219impl Clone for CollectorStats {
220 fn clone(&self) -> Self {
221 Self {
222 metrics_recorded: self.metrics_recorded,
223 metrics_sent: self.metrics_sent,
224 metrics_dropped: self.metrics_dropped,
225 buffer_overflows: self.buffer_overflows,
226 last_flush_ns: self.last_flush_ns,
227 flush_count: self.flush_count,
228 }
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use crate::monitoring::metrics::{Counter, Gauge};
236
237 #[test]
238 fn test_metric_collector_basic() {
239 let config = MonitoringConfig {
240 ring_buffer_size: 64,
241 ..Default::default()
242 };
243
244 let collector = MetricCollector::new("test".into(), config);
245
246 let counter = Metric::Counter(Counter {
248 name: "test_counter".into(),
249 value: 1,
250 timestamp_ns: 1000,
251 });
252
253 collector.record(counter.clone());
254 collector.record(counter.clone());
255
256 assert_eq!(collector.buffer_size(), 2);
257
258 let metrics = collector.flush();
260 assert_eq!(metrics.len(), 2);
261 assert_eq!(collector.buffer_size(), 0);
262
263 let stats = collector.stats();
265 assert_eq!(stats.metrics_recorded, 2);
266 assert_eq!(stats.metrics_sent, 2);
267 assert_eq!(stats.metrics_dropped, 0);
268 }
269
270 #[test]
271 fn test_metric_collector_with_channel() {
272 let config = MonitoringConfig {
273 ring_buffer_size: 64,
274 flush_interval_ms: 100,
275 ..Default::default()
276 };
277
278 let (tx, rx) = flume::unbounded();
279 let collector = Arc::new(MetricCollector::with_channel("test".into(), config, tx));
280
281 let handle = collector.start_flusher().unwrap();
283
284 for i in 0..10 {
286 let gauge = Metric::Gauge(Gauge {
287 name: "test_gauge".into(),
288 value: i as f64,
289 timestamp_ns: i as u64,
290 });
291 collector.record(gauge);
292 }
293
294 thread::sleep(Duration::from_millis(200));
296
297 let received = rx.try_recv().unwrap();
299 assert_eq!(received.len(), 10);
300
301 drop(rx); let _ = handle.join();
304 }
305
306 #[test]
307 fn test_collector_registry() {
308 let registry = CollectorRegistry::new();
309
310 let config = MonitoringConfig::default();
311 let collector1 = Arc::new(MetricCollector::new("collector1".into(), config.clone()));
312 let collector2 = Arc::new(MetricCollector::new("collector2".into(), config));
313
314 registry.register(collector1.clone());
315 registry.register(collector2.clone());
316
317 let counter = Metric::Counter(Counter {
319 name: "test".into(),
320 value: 1,
321 timestamp_ns: 1000,
322 });
323
324 collector1.record(counter.clone());
325 collector2.record(counter.clone());
326 collector2.record(counter);
327
328 let all_metrics = registry.flush_all();
330 assert_eq!(all_metrics.len(), 3);
331 }
332
333 #[test]
334 fn test_buffer_overflow() {
335 let config = MonitoringConfig {
336 ring_buffer_size: 4, ..Default::default()
338 };
339
340 let collector = MetricCollector::new("test".into(), config);
341
342 for i in 0..5 {
344 let counter = Metric::Counter(Counter {
345 name: "test".into(),
346 value: i,
347 timestamp_ns: i,
348 });
349 collector.record(counter);
350 }
351
352 let stats = collector.stats();
353 assert!(stats.metrics_dropped > 0);
354 assert!(stats.buffer_overflows > 0);
355 }
356}