rusty_engine/monitoring/mod.rs
1//! Zero-latency monitoring framework for HFT trading systems
2//!
3//! This module provides a high-performance monitoring solution designed for
4//! minimal impact on trading system latency. It uses lock-free data structures,
5//! async processing, and memory-mapped files for efficient metric collection
6//! and persistence.
7//!
8//! # Key Features
9//! - Lock-free ring buffers for zero-allocation metric recording
10//! - MPMC channels (flume) for async metric processing
11//! - Memory-mapped files for low-latency persistence
12//! - Nanosecond precision timing using quanta
13//! - Support for counters, gauges, histograms, and latency measurements
14
15use anyhow::Result;
16use std::sync::Arc;
17
18pub mod collector;
19pub mod engine;
20pub mod metrics;
21pub mod ring_buffer;
22pub mod writer;
23pub mod zerocopy_writer;
24
25pub use collector::MetricCollector;
26pub use engine::MonitoringEngine;
27pub use metrics::{Counter, Gauge, Histogram, LatencyTracker, Metric, MetricType};
28pub use writer::MmapWriter;
29
30/// Configuration for the monitoring system
31#[derive(Debug, Clone)]
32pub struct MonitoringConfig {
33 /// Size of the ring buffer for each metric collector (must be power of 2)
34 pub ring_buffer_size: usize,
35 /// Channel buffer size for async processing
36 pub channel_buffer_size: usize,
37 /// Path for memory-mapped files
38 pub mmap_path: String,
39 /// File size for memory-mapped files
40 pub mmap_file_size: usize,
41 /// Number of worker threads for metric aggregation
42 pub num_workers: usize,
43 /// Flush interval in milliseconds
44 pub flush_interval_ms: u64,
45 /// Enable compression for persisted metrics
46 pub enable_compression: bool,
47}
48
49impl Default for MonitoringConfig {
50 fn default() -> Self {
51 Self {
52 ring_buffer_size: 65536, // 64K entries
53 channel_buffer_size: 8192,
54 mmap_path: "/tmp/rusty_metrics".into(),
55 mmap_file_size: 100 * 1024 * 1024, // 100 MB
56 num_workers: 2,
57 flush_interval_ms: 1000, // 1 second
58 enable_compression: false,
59 }
60 }
61}
62
63/// Initialize the global monitoring system
64pub fn init_monitoring(config: MonitoringConfig) -> Result<Arc<MonitoringEngine>> {
65 MonitoringEngine::new(config)
66}
67
68/// Record a metric with zero overhead
69#[inline(always)]
70pub fn record_metric(collector: &MetricCollector, metric: Metric) {
71 collector.record(metric);
72}
73
74/// Convenience macro for recording metrics
75#[macro_export]
76macro_rules! record_counter {
77 ($collector:expr, $name:expr, $value:expr) => {
78 $crate::monitoring::record_metric(
79 $collector,
80 $crate::monitoring::Metric::Counter($crate::monitoring::Counter {
81 name: $name.into(),
82 value: $value,
83 timestamp_ns: $crate::timing::TimingUpkeep::now_ns(),
84 }),
85 )
86 };
87}
88
89/// Records a gauge metric with the current timestamp
90///
91/// A gauge represents a value that can go up or down over time,
92/// such as current memory usage, active connections, or queue depth.
93///
94/// # Arguments
95///
96/// * `$collector` - The [`MetricCollector`] instance to record to
97/// * `$name` - The metric name as a string or string slice
98/// * `$value` - The gauge value as an f64
99///
100/// # Example
101///
102/// ```ignore
103/// record_gauge!(collector, "active_orders", 42.0);
104/// record_gauge!(collector, "memory_usage_mb", 1024.5);
105/// ```
106#[macro_export]
107macro_rules! record_gauge {
108 ($collector:expr, $name:expr, $value:expr) => {
109 $crate::monitoring::record_metric(
110 $collector,
111 $crate::monitoring::Metric::Gauge($crate::monitoring::Gauge {
112 name: $name.into(),
113 value: $value,
114 timestamp_ns: $crate::timing::TimingUpkeep::now_ns(),
115 }),
116 )
117 };
118}
119
120/// Records a latency measurement metric
121///
122/// This macro records the latency of an operation in nanoseconds.
123/// It's optimized for high-frequency measurements with minimal overhead.
124///
125/// # Arguments
126///
127/// * `$collector` - The [`MetricCollector`] instance to record to
128/// * `$name` - The metric name as a string or string slice
129/// * `$latency_ns` - The latency value in nanoseconds as a u64
130///
131/// # Example
132///
133/// ```ignore
134/// let start = TimingUpkeep::now_ns();
135/// // ... perform operation ...
136/// let latency = TimingUpkeep::now_ns() - start;
137/// record_latency!(collector, "order_processing", latency);
138/// ```
139#[macro_export]
140macro_rules! record_latency {
141 ($collector:expr, $name:expr, $latency_ns:expr) => {
142 $crate::monitoring::record_metric(
143 $collector,
144 $crate::monitoring::Metric::Latency($crate::monitoring::LatencyTracker {
145 name: $name.into(),
146 latency_ns: $latency_ns,
147 timestamp_ns: $crate::timing::TimingUpkeep::now_ns(),
148 }),
149 )
150 };
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156
157 #[test]
158 fn test_monitoring_config_default() {
159 let config = MonitoringConfig::default();
160 assert_eq!(config.ring_buffer_size, 65536);
161 assert_eq!(config.channel_buffer_size, 8192);
162 assert_eq!(config.num_workers, 2);
163 assert!(!config.enable_compression);
164 }
165}