rusty_engine/monitoring/
collector.rs

1//! Metric collector implementation using lock-free ring buffers
2//!
3//! This module provides the MetricCollector that uses lock-free ring buffers
4//! for zero-latency metric recording in the hot path.
5
6use crate::monitoring::MonitoringConfig;
7use crate::monitoring::metrics::Metric;
8use crate::monitoring::ring_buffer::SharedRingBuffer;
9use crate::timing::TimingUpkeep;
10use anyhow::{Result, anyhow};
11use flume::Sender;
12use parking_lot::RwLock;
13use std::sync::Arc;
14use std::thread;
15use std::time::Duration;
16
17/// A metric collector that uses lock-free ring buffers
18pub struct MetricCollector {
19    /// Name of this collector
20    name: String,
21    /// Ring buffer for metric storage
22    buffer: SharedRingBuffer<Metric>,
23    /// Channel sender for async processing
24    sender: Option<Sender<Vec<Metric>>>,
25    /// Configuration
26    config: MonitoringConfig,
27    /// Collection statistics
28    stats: Arc<RwLock<CollectorStats>>,
29}
30
31/// Statistics for a metric collector
32#[derive(Debug, Default)]
33pub struct CollectorStats {
34    /// Total number of metrics recorded by this collector
35    pub metrics_recorded: u64,
36    /// Number of metrics successfully sent for processing
37    pub metrics_sent: u64,
38    /// Number of metrics dropped due to channel backpressure
39    pub metrics_dropped: u64,
40    /// Number of times the ring buffer overflowed
41    pub buffer_overflows: u64,
42    /// Timestamp of the last flush operation in nanoseconds
43    pub last_flush_ns: u64,
44    /// Total number of flush operations performed
45    pub flush_count: u64,
46}
47
48impl MetricCollector {
49    /// Create a new metric collector
50    #[must_use]
51    pub fn new(name: String, config: MonitoringConfig) -> Self {
52        Self {
53            name,
54            buffer: SharedRingBuffer::new(config.ring_buffer_size),
55            sender: None,
56            config,
57            stats: Arc::new(RwLock::new(CollectorStats::default())),
58        }
59    }
60
61    /// Create a collector connected to a processing channel
62    pub fn with_channel(
63        name: String,
64        config: MonitoringConfig,
65        sender: Sender<Vec<Metric>>,
66    ) -> Self {
67        Self {
68            name,
69            buffer: SharedRingBuffer::new(config.ring_buffer_size),
70            sender: Some(sender),
71            config,
72            stats: Arc::new(RwLock::new(CollectorStats::default())),
73        }
74    }
75
76    /// Record a metric with minimal overhead
77    #[inline(always)]
78    pub fn record(&self, metric: Metric) {
79        // Fast path - push to ring buffer
80        if self.buffer.push(metric) {
81            // Use relaxed ordering for stats update to minimize overhead
82            let mut stats = self.stats.write();
83            stats.metrics_recorded += 1;
84        } else {
85            // Buffer overflow - update stats
86            let mut stats = self.stats.write();
87            stats.metrics_dropped += 1;
88            stats.buffer_overflows = self.buffer.overflow_count() as u64;
89        }
90    }
91
92    /// Start the background flushing thread
93    pub fn start_flusher(&self) -> Result<thread::JoinHandle<()>> {
94        if self.sender.is_none() {
95            return Err(anyhow!("No channel configured for flushing"));
96        }
97
98        let buffer = self.buffer.clone();
99        let sender = self.sender.as_ref().unwrap().clone();
100        let flush_interval = Duration::from_millis(self.config.flush_interval_ms);
101        let stats = self.stats.clone();
102
103        let handle = thread::spawn(move || {
104            loop {
105                thread::sleep(flush_interval);
106
107                // Drain metrics from ring buffer
108                let metrics = buffer.drain(1024); // Drain up to 1024 metrics at a time
109
110                if !metrics.is_empty() {
111                    let count = metrics.len() as u64;
112
113                    // Send to processing channel
114                    if sender.send(metrics).is_ok() {
115                        let mut stats = stats.write();
116                        stats.metrics_sent += count;
117                        stats.last_flush_ns = TimingUpkeep::now_ns();
118                        stats.flush_count += 1;
119                    } else {
120                        // Channel closed, exit
121                        break;
122                    }
123                }
124            }
125        });
126
127        Ok(handle)
128    }
129
130    /// Manually flush metrics from the buffer
131    #[must_use]
132    pub fn flush(&self) -> Vec<Metric> {
133        let metrics = self.buffer.drain(usize::MAX);
134
135        if !metrics.is_empty() {
136            let mut stats = self.stats.write();
137            stats.metrics_sent += metrics.len() as u64;
138            stats.last_flush_ns = TimingUpkeep::now_ns();
139            stats.flush_count += 1;
140        }
141
142        metrics
143    }
144
145    /// Get collector statistics
146    #[must_use]
147    pub fn stats(&self) -> CollectorStats {
148        self.stats.read().clone()
149    }
150
151    /// Get the collector name
152    #[must_use]
153    pub fn name(&self) -> &str {
154        &self.name
155    }
156
157    /// Get the current buffer size
158    #[must_use]
159    pub fn buffer_size(&self) -> usize {
160        self.buffer.len()
161    }
162}
163
164/// A collection of metric collectors
165pub struct CollectorRegistry {
166    collectors: Arc<RwLock<Vec<Arc<MetricCollector>>>>,
167}
168
169impl CollectorRegistry {
170    /// Creates a new collector registry for managing multiple metric collectors
171    ///
172    /// The registry provides thread-safe storage and retrieval of metric collectors,
173    /// allowing different components of the system to register their own collectors.
174    ///
175    /// # Example
176    ///
177    /// ```ignore
178    /// let registry = CollectorRegistry::new();
179    /// let collector = Arc::new(MetricCollector::new("orders".into(), config));
180    /// registry.register(collector);
181    /// ```
182    #[must_use]
183    pub fn new() -> Self {
184        Self {
185            collectors: Arc::new(RwLock::new(Vec::new())),
186        }
187    }
188
189    /// Register a collector
190    pub fn register(&self, collector: Arc<MetricCollector>) {
191        self.collectors.write().push(collector);
192    }
193
194    /// Get all collectors
195    #[must_use]
196    pub fn collectors(&self) -> Vec<Arc<MetricCollector>> {
197        self.collectors.read().clone()
198    }
199
200    /// Flush all collectors
201    #[must_use]
202    pub fn flush_all(&self) -> Vec<Metric> {
203        let mut all_metrics = Vec::new();
204
205        for collector in self.collectors.read().iter() {
206            all_metrics.extend(collector.flush());
207        }
208
209        all_metrics
210    }
211}
212
213impl Default for CollectorRegistry {
214    fn default() -> Self {
215        Self::new()
216    }
217}
218
219impl Clone for CollectorStats {
220    fn clone(&self) -> Self {
221        Self {
222            metrics_recorded: self.metrics_recorded,
223            metrics_sent: self.metrics_sent,
224            metrics_dropped: self.metrics_dropped,
225            buffer_overflows: self.buffer_overflows,
226            last_flush_ns: self.last_flush_ns,
227            flush_count: self.flush_count,
228        }
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use crate::monitoring::metrics::{Counter, Gauge};
236
237    #[test]
238    fn test_metric_collector_basic() {
239        let config = MonitoringConfig {
240            ring_buffer_size: 64,
241            ..Default::default()
242        };
243
244        let collector = MetricCollector::new("test".into(), config);
245
246        // Record some metrics
247        let counter = Metric::Counter(Counter {
248            name: "test_counter".into(),
249            value: 1,
250            timestamp_ns: 1000,
251        });
252
253        collector.record(counter.clone());
254        collector.record(counter.clone());
255
256        assert_eq!(collector.buffer_size(), 2);
257
258        // Flush metrics
259        let metrics = collector.flush();
260        assert_eq!(metrics.len(), 2);
261        assert_eq!(collector.buffer_size(), 0);
262
263        // Check stats
264        let stats = collector.stats();
265        assert_eq!(stats.metrics_recorded, 2);
266        assert_eq!(stats.metrics_sent, 2);
267        assert_eq!(stats.metrics_dropped, 0);
268    }
269
270    #[test]
271    fn test_metric_collector_with_channel() {
272        let config = MonitoringConfig {
273            ring_buffer_size: 64,
274            flush_interval_ms: 100,
275            ..Default::default()
276        };
277
278        let (tx, rx) = flume::unbounded();
279        let collector = Arc::new(MetricCollector::with_channel("test".into(), config, tx));
280
281        // Start flusher
282        let handle = collector.start_flusher().unwrap();
283
284        // Record metrics
285        for i in 0..10 {
286            let gauge = Metric::Gauge(Gauge {
287                name: "test_gauge".into(),
288                value: i as f64,
289                timestamp_ns: i as u64,
290            });
291            collector.record(gauge);
292        }
293
294        // Wait for flush
295        thread::sleep(Duration::from_millis(200));
296
297        // Check that metrics were sent
298        let received = rx.try_recv().unwrap();
299        assert_eq!(received.len(), 10);
300
301        // Cleanup
302        drop(rx); // Close channel to stop flusher
303        let _ = handle.join();
304    }
305
306    #[test]
307    fn test_collector_registry() {
308        let registry = CollectorRegistry::new();
309
310        let config = MonitoringConfig::default();
311        let collector1 = Arc::new(MetricCollector::new("collector1".into(), config.clone()));
312        let collector2 = Arc::new(MetricCollector::new("collector2".into(), config));
313
314        registry.register(collector1.clone());
315        registry.register(collector2.clone());
316
317        // Record metrics in both collectors
318        let counter = Metric::Counter(Counter {
319            name: "test".into(),
320            value: 1,
321            timestamp_ns: 1000,
322        });
323
324        collector1.record(counter.clone());
325        collector2.record(counter.clone());
326        collector2.record(counter);
327
328        // Flush all
329        let all_metrics = registry.flush_all();
330        assert_eq!(all_metrics.len(), 3);
331    }
332
333    #[test]
334    fn test_buffer_overflow() {
335        let config = MonitoringConfig {
336            ring_buffer_size: 4, // Very small buffer
337            ..Default::default()
338        };
339
340        let collector = MetricCollector::new("test".into(), config);
341
342        // Fill buffer (can hold 3 items due to ring buffer implementation)
343        for i in 0..5 {
344            let counter = Metric::Counter(Counter {
345                name: "test".into(),
346                value: i,
347                timestamp_ns: i,
348            });
349            collector.record(counter);
350        }
351
352        let stats = collector.stats();
353        assert!(stats.metrics_dropped > 0);
354        assert!(stats.buffer_overflows > 0);
355    }
356}