rusty_engine/monitoring/
writer.rs

1//! Memory-mapped file writer for efficient metric persistence
2//!
3//! This module provides a memory-mapped file writer that allows for
4//! low-latency metric persistence without system call overhead.
5
6use crate::monitoring::metrics::{AggregatedMetric, Metric, MetricsSnapshot};
7use crate::monitoring::zerocopy_writer::{MonitoringRecordHeader, ZeroCopyMonitoringWriter};
8use anyhow::{Result, anyhow};
9use bincode;
10use lz4;
11use memmap2::{MmapMut, MmapOptions};
12use parking_lot::Mutex;
13use rusty_common::collections::FxHashMap;
14use smartstring::alias::String;
15use std::fs::OpenOptions;
16use std::path::{Path, PathBuf};
17use std::sync::atomic::{AtomicUsize, Ordering};
18
19/// Memory-mapped file writer for metrics
20pub struct MmapWriter {
21    /// Path to the memory-mapped file
22    path: PathBuf,
23    /// Memory-mapped region
24    mmap: Mutex<MmapMut>,
25    /// Current write position
26    write_pos: AtomicUsize,
27    /// File size
28    file_size: usize,
29    /// Enable compression
30    enable_compression: bool,
31}
32
33impl MmapWriter {
34    /// Create a new memory-mapped file writer
35    pub fn new(path: impl AsRef<Path>, file_size: usize, enable_compression: bool) -> Result<Self> {
36        let path = path.as_ref().to_path_buf();
37
38        // Create directory if it doesn't exist
39        if let Some(parent) = path.parent() {
40            std::fs::create_dir_all(parent)?;
41        }
42
43        // Open or create the file
44        let file = OpenOptions::new()
45            .read(true)
46            .write(true)
47            .create(true)
48            .truncate(true)
49            .open(&path)?;
50
51        // Set file size
52        file.set_len(file_size as u64)?;
53
54        // Create memory map
55        let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
56
57        Ok(Self {
58            path,
59            mmap: Mutex::new(mmap),
60            write_pos: AtomicUsize::new(0),
61            file_size,
62            enable_compression,
63        })
64    }
65
66    /// Write a metrics snapshot to the memory-mapped file
67    pub fn write_snapshot(&self, snapshot: &MetricsSnapshot) -> Result<usize> {
68        // Serialize the snapshot
69        let data = if self.enable_compression {
70            // Compress with LZ4
71            let serialized = bincode::encode_to_vec(snapshot, bincode::config::standard())?;
72            lz4::block::compress(&serialized, None, false)?
73        } else {
74            bincode::encode_to_vec(snapshot, bincode::config::standard())?
75        };
76
77        // Calculate header size (4 bytes for length + 1 byte for compression flag)
78        let header_size = 5;
79        let total_size = header_size + data.len();
80
81        // Check if we have enough space
82        let current_pos = self.write_pos.load(Ordering::Acquire);
83        if current_pos + total_size > self.file_size {
84            // Wrap around to the beginning
85            self.write_pos.store(0, Ordering::Release);
86            return Err(anyhow!("Memory-mapped file is full, wrapping around"));
87        }
88
89        // Write to memory-mapped file
90        let mut mmap = self.mmap.lock();
91
92        // Write header using zerocopy
93        let header = MonitoringRecordHeader::new(data.len(), self.enable_compression);
94        ZeroCopyMonitoringWriter::write_header(&mut mmap, current_pos, &header)
95            .ok_or_else(|| anyhow!("Failed to write header"))?;
96
97        // Write data
98        let data_start = current_pos + header_size;
99        let data_end = data_start + data.len();
100        mmap[data_start..data_end].copy_from_slice(&data);
101
102        // Update write position
103        self.write_pos.store(data_end, Ordering::Release);
104
105        // Flush to disk (async in background)
106        mmap.flush_async()?;
107
108        Ok(data.len())
109    }
110
111    /// Write raw metrics (will aggregate them first)
112    pub fn write_metrics(&self, metrics: &[Metric], timestamp_ns: u64) -> Result<usize> {
113        // Aggregate metrics by name and type
114        let mut aggregated: FxHashMap<
115            (String, crate::monitoring::metrics::MetricType),
116            AggregatedMetric,
117        > = FxHashMap::default();
118
119        for metric in metrics {
120            let key = (metric.name().clone(), metric.metric_type());
121
122            aggregated
123                .entry(key)
124                .and_modify(|agg| agg.update(metric))
125                .or_insert_with(|| AggregatedMetric::from_metric(metric));
126        }
127
128        // Create snapshot
129        let snapshot = MetricsSnapshot {
130            timestamp_ns,
131            metrics: aggregated.into_values().collect(),
132        };
133
134        self.write_snapshot(&snapshot)
135    }
136
137    /// Get the current write position
138    pub fn write_position(&self) -> usize {
139        self.write_pos.load(Ordering::Acquire)
140    }
141
142    /// Reset the writer (clear the file)
143    pub fn reset(&self) -> Result<()> {
144        self.write_pos.store(0, Ordering::Release);
145
146        // Clear the memory-mapped region
147        let mut mmap = self.mmap.lock();
148        for byte in mmap.iter_mut() {
149            *byte = 0;
150        }
151        mmap.flush()?;
152
153        Ok(())
154    }
155
156    /// Read snapshots from the memory-mapped file
157    pub fn read_snapshots(&self) -> Result<Vec<MetricsSnapshot>> {
158        let mut snapshots = Vec::new();
159        let mmap = self.mmap.lock();
160        let mut pos = 0;
161        let end_pos = self.write_pos.load(Ordering::Acquire);
162
163        while pos + 5 <= end_pos {
164            // Read header
165            let length =
166                u32::from_le_bytes([mmap[pos], mmap[pos + 1], mmap[pos + 2], mmap[pos + 3]])
167                    as usize;
168            let compressed = mmap[pos + 4] != 0;
169
170            pos += 5;
171
172            if pos + length > end_pos {
173                break; // Incomplete record
174            }
175
176            // Read data
177            let data = &mmap[pos..pos + length];
178
179            // Deserialize
180            let snapshot = if compressed {
181                let decompressed = lz4::block::decompress(data, None)?;
182                bincode::decode_from_slice(&decompressed, bincode::config::standard())?.0
183            } else {
184                bincode::decode_from_slice(data, bincode::config::standard())?.0
185            };
186
187            snapshots.push(snapshot);
188            pos += length;
189        }
190
191        Ok(snapshots)
192    }
193}
194
195/// A rotating file writer that manages multiple memory-mapped files
196pub struct RotatingMmapWriter {
197    /// Base path for files
198    base_path: PathBuf,
199    /// Current file index
200    current_index: AtomicUsize,
201    /// Maximum number of files
202    max_files: usize,
203    /// File size
204    file_size: usize,
205    /// Enable compression
206    enable_compression: bool,
207    /// Current writer
208    current_writer: Mutex<MmapWriter>,
209}
210
211impl RotatingMmapWriter {
212    /// Create a new rotating memory-mapped file writer
213    pub fn new(
214        base_path: impl AsRef<Path>,
215        max_files: usize,
216        file_size: usize,
217        enable_compression: bool,
218    ) -> Result<Self> {
219        let base_path = base_path.as_ref().to_path_buf();
220        let current_path = Self::get_file_path(&base_path, 0);
221        let current_writer = MmapWriter::new(current_path, file_size, enable_compression)?;
222
223        Ok(Self {
224            base_path,
225            current_index: AtomicUsize::new(0),
226            max_files,
227            file_size,
228            enable_compression,
229            current_writer: Mutex::new(current_writer),
230        })
231    }
232
233    /// Get the file path for a given index
234    fn get_file_path(base_path: &Path, index: usize) -> PathBuf {
235        let file_name = format!("metrics_{index:04}.bin");
236        base_path.join(file_name)
237    }
238
239    /// Rotate to the next file
240    fn rotate(&self) -> Result<()> {
241        let next_index = (self.current_index.load(Ordering::Acquire) + 1) % self.max_files;
242        let next_path = Self::get_file_path(&self.base_path, next_index);
243
244        let new_writer = MmapWriter::new(next_path, self.file_size, self.enable_compression)?;
245
246        let mut current_writer = self.current_writer.lock();
247        *current_writer = new_writer;
248
249        self.current_index.store(next_index, Ordering::Release);
250
251        Ok(())
252    }
253
254    /// Write a metrics snapshot
255    pub fn write_snapshot(&self, snapshot: &MetricsSnapshot) -> Result<usize> {
256        // Try to write to current file
257        let result = {
258            let writer = self.current_writer.lock();
259            writer.write_snapshot(snapshot)
260        };
261
262        match result {
263            Ok(size) => Ok(size),
264            Err(_) => {
265                // File is full, rotate and retry
266                self.rotate()?;
267                let writer = self.current_writer.lock();
268                writer.write_snapshot(snapshot)
269            }
270        }
271    }
272
273    /// Write raw metrics
274    pub fn write_metrics(&self, metrics: &[Metric], timestamp_ns: u64) -> Result<usize> {
275        // Try to write to current file
276        let result = {
277            let writer = self.current_writer.lock();
278            writer.write_metrics(metrics, timestamp_ns)
279        };
280
281        match result {
282            Ok(size) => Ok(size),
283            Err(_) => {
284                // File is full, rotate and retry
285                self.rotate()?;
286                let writer = self.current_writer.lock();
287                writer.write_metrics(metrics, timestamp_ns)
288            }
289        }
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296    use crate::monitoring::metrics::{Counter, Gauge};
297    use tempfile::TempDir;
298
299    #[test]
300    fn test_mmap_writer_basic() {
301        let temp_dir = TempDir::new().unwrap();
302        let file_path = temp_dir.path().join("test_metrics.bin");
303
304        let writer = MmapWriter::new(&file_path, 1024 * 1024, false).unwrap();
305
306        // Create a snapshot
307        let snapshot = MetricsSnapshot {
308            timestamp_ns: 1234567890,
309            metrics: vec![AggregatedMetric {
310                name: "test_counter".into(),
311                metric_type: crate::monitoring::metrics::MetricType::Counter,
312                count: 10,
313                sum: 100.0,
314                min: 5.0,
315                max: 15.0,
316                last_value: 12.0,
317                start_timestamp_ns: 1000,
318                end_timestamp_ns: 2000,
319            }],
320        };
321
322        // Write snapshot
323        let size = writer.write_snapshot(&snapshot).unwrap();
324        assert!(size > 0);
325        assert_eq!(writer.write_position(), size + 5); // data + header
326
327        // Read snapshots
328        let snapshots = writer.read_snapshots().unwrap();
329        assert_eq!(snapshots.len(), 1);
330        assert_eq!(snapshots[0].timestamp_ns, snapshot.timestamp_ns);
331        assert_eq!(snapshots[0].metrics.len(), 1);
332    }
333
334    #[test]
335    fn test_mmap_writer_compression() {
336        let temp_dir = TempDir::new().unwrap();
337        let file_path = temp_dir.path().join("test_metrics_compressed.bin");
338
339        let writer = MmapWriter::new(&file_path, 1024 * 1024, true).unwrap();
340
341        // Create metrics
342        let metrics = vec![
343            Metric::Counter(Counter {
344                name: "orders".into(),
345                value: 100,
346                timestamp_ns: 1000,
347            }),
348            Metric::Gauge(Gauge {
349                name: "pnl".into(),
350                value: 1234.56,
351                timestamp_ns: 2000,
352            }),
353        ];
354
355        // Write metrics
356        let size = writer.write_metrics(&metrics, 3000).unwrap();
357        assert!(size > 0);
358
359        // Read back
360        let snapshots = writer.read_snapshots().unwrap();
361        assert_eq!(snapshots.len(), 1);
362        assert_eq!(snapshots[0].metrics.len(), 2);
363    }
364
365    #[test]
366    fn test_rotating_writer() {
367        let temp_dir = TempDir::new().unwrap();
368        let base_path = temp_dir.path();
369
370        let writer = RotatingMmapWriter::new(
371            base_path, 3,     // max 3 files
372            10240, // 10KB per file
373            false,
374        )
375        .unwrap();
376
377        // Write multiple snapshots
378        for i in 0..5 {
379            let snapshot = MetricsSnapshot {
380                timestamp_ns: i as u64,
381                metrics: vec![AggregatedMetric {
382                    name: format!("metric_{i}"),
383                    metric_type: crate::monitoring::metrics::MetricType::Counter,
384                    count: 1,
385                    sum: i as f64,
386                    min: i as f64,
387                    max: i as f64,
388                    last_value: i as f64,
389                    start_timestamp_ns: i as u64,
390                    end_timestamp_ns: i as u64,
391                }],
392            };
393
394            writer.write_snapshot(&snapshot).unwrap();
395        }
396
397        // Check that files were created
398        let entries: Vec<_> = std::fs::read_dir(base_path)
399            .unwrap()
400            .filter_map(|e| e.ok())
401            .filter(|e| e.path().extension().is_some_and(|ext| ext == "bin"))
402            .collect();
403
404        assert!(!entries.is_empty());
405    }
406
407    #[test]
408    fn test_writer_reset() {
409        let temp_dir = TempDir::new().unwrap();
410        let file_path = temp_dir.path().join("test_reset.bin");
411
412        let writer = MmapWriter::new(&file_path, 1024, false).unwrap();
413
414        // Write some data
415        let metrics = vec![Metric::Counter(Counter {
416            name: "test".into(),
417            value: 1,
418            timestamp_ns: 1000,
419        })];
420
421        writer.write_metrics(&metrics, 2000).unwrap();
422        assert!(writer.write_position() > 0);
423
424        // Reset
425        writer.reset().unwrap();
426        assert_eq!(writer.write_position(), 0);
427
428        // Should be able to read empty
429        let snapshots = writer.read_snapshots().unwrap();
430        assert_eq!(snapshots.len(), 0);
431    }
432}