1use crate::monitoring::metrics::{AggregatedMetric, Metric, MetricsSnapshot};
7use crate::monitoring::zerocopy_writer::{MonitoringRecordHeader, ZeroCopyMonitoringWriter};
8use anyhow::{Result, anyhow};
9use bincode;
10use lz4;
11use memmap2::{MmapMut, MmapOptions};
12use parking_lot::Mutex;
13use rusty_common::collections::FxHashMap;
14use smartstring::alias::String;
15use std::fs::OpenOptions;
16use std::path::{Path, PathBuf};
17use std::sync::atomic::{AtomicUsize, Ordering};
18
19pub struct MmapWriter {
21 path: PathBuf,
23 mmap: Mutex<MmapMut>,
25 write_pos: AtomicUsize,
27 file_size: usize,
29 enable_compression: bool,
31}
32
33impl MmapWriter {
34 pub fn new(path: impl AsRef<Path>, file_size: usize, enable_compression: bool) -> Result<Self> {
36 let path = path.as_ref().to_path_buf();
37
38 if let Some(parent) = path.parent() {
40 std::fs::create_dir_all(parent)?;
41 }
42
43 let file = OpenOptions::new()
45 .read(true)
46 .write(true)
47 .create(true)
48 .truncate(true)
49 .open(&path)?;
50
51 file.set_len(file_size as u64)?;
53
54 let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
56
57 Ok(Self {
58 path,
59 mmap: Mutex::new(mmap),
60 write_pos: AtomicUsize::new(0),
61 file_size,
62 enable_compression,
63 })
64 }
65
66 pub fn write_snapshot(&self, snapshot: &MetricsSnapshot) -> Result<usize> {
68 let data = if self.enable_compression {
70 let serialized = bincode::encode_to_vec(snapshot, bincode::config::standard())?;
72 lz4::block::compress(&serialized, None, false)?
73 } else {
74 bincode::encode_to_vec(snapshot, bincode::config::standard())?
75 };
76
77 let header_size = 5;
79 let total_size = header_size + data.len();
80
81 let current_pos = self.write_pos.load(Ordering::Acquire);
83 if current_pos + total_size > self.file_size {
84 self.write_pos.store(0, Ordering::Release);
86 return Err(anyhow!("Memory-mapped file is full, wrapping around"));
87 }
88
89 let mut mmap = self.mmap.lock();
91
92 let header = MonitoringRecordHeader::new(data.len(), self.enable_compression);
94 ZeroCopyMonitoringWriter::write_header(&mut mmap, current_pos, &header)
95 .ok_or_else(|| anyhow!("Failed to write header"))?;
96
97 let data_start = current_pos + header_size;
99 let data_end = data_start + data.len();
100 mmap[data_start..data_end].copy_from_slice(&data);
101
102 self.write_pos.store(data_end, Ordering::Release);
104
105 mmap.flush_async()?;
107
108 Ok(data.len())
109 }
110
111 pub fn write_metrics(&self, metrics: &[Metric], timestamp_ns: u64) -> Result<usize> {
113 let mut aggregated: FxHashMap<
115 (String, crate::monitoring::metrics::MetricType),
116 AggregatedMetric,
117 > = FxHashMap::default();
118
119 for metric in metrics {
120 let key = (metric.name().clone(), metric.metric_type());
121
122 aggregated
123 .entry(key)
124 .and_modify(|agg| agg.update(metric))
125 .or_insert_with(|| AggregatedMetric::from_metric(metric));
126 }
127
128 let snapshot = MetricsSnapshot {
130 timestamp_ns,
131 metrics: aggregated.into_values().collect(),
132 };
133
134 self.write_snapshot(&snapshot)
135 }
136
137 pub fn write_position(&self) -> usize {
139 self.write_pos.load(Ordering::Acquire)
140 }
141
142 pub fn reset(&self) -> Result<()> {
144 self.write_pos.store(0, Ordering::Release);
145
146 let mut mmap = self.mmap.lock();
148 for byte in mmap.iter_mut() {
149 *byte = 0;
150 }
151 mmap.flush()?;
152
153 Ok(())
154 }
155
156 pub fn read_snapshots(&self) -> Result<Vec<MetricsSnapshot>> {
158 let mut snapshots = Vec::new();
159 let mmap = self.mmap.lock();
160 let mut pos = 0;
161 let end_pos = self.write_pos.load(Ordering::Acquire);
162
163 while pos + 5 <= end_pos {
164 let length =
166 u32::from_le_bytes([mmap[pos], mmap[pos + 1], mmap[pos + 2], mmap[pos + 3]])
167 as usize;
168 let compressed = mmap[pos + 4] != 0;
169
170 pos += 5;
171
172 if pos + length > end_pos {
173 break; }
175
176 let data = &mmap[pos..pos + length];
178
179 let snapshot = if compressed {
181 let decompressed = lz4::block::decompress(data, None)?;
182 bincode::decode_from_slice(&decompressed, bincode::config::standard())?.0
183 } else {
184 bincode::decode_from_slice(data, bincode::config::standard())?.0
185 };
186
187 snapshots.push(snapshot);
188 pos += length;
189 }
190
191 Ok(snapshots)
192 }
193}
194
195pub struct RotatingMmapWriter {
197 base_path: PathBuf,
199 current_index: AtomicUsize,
201 max_files: usize,
203 file_size: usize,
205 enable_compression: bool,
207 current_writer: Mutex<MmapWriter>,
209}
210
211impl RotatingMmapWriter {
212 pub fn new(
214 base_path: impl AsRef<Path>,
215 max_files: usize,
216 file_size: usize,
217 enable_compression: bool,
218 ) -> Result<Self> {
219 let base_path = base_path.as_ref().to_path_buf();
220 let current_path = Self::get_file_path(&base_path, 0);
221 let current_writer = MmapWriter::new(current_path, file_size, enable_compression)?;
222
223 Ok(Self {
224 base_path,
225 current_index: AtomicUsize::new(0),
226 max_files,
227 file_size,
228 enable_compression,
229 current_writer: Mutex::new(current_writer),
230 })
231 }
232
233 fn get_file_path(base_path: &Path, index: usize) -> PathBuf {
235 let file_name = format!("metrics_{index:04}.bin");
236 base_path.join(file_name)
237 }
238
239 fn rotate(&self) -> Result<()> {
241 let next_index = (self.current_index.load(Ordering::Acquire) + 1) % self.max_files;
242 let next_path = Self::get_file_path(&self.base_path, next_index);
243
244 let new_writer = MmapWriter::new(next_path, self.file_size, self.enable_compression)?;
245
246 let mut current_writer = self.current_writer.lock();
247 *current_writer = new_writer;
248
249 self.current_index.store(next_index, Ordering::Release);
250
251 Ok(())
252 }
253
254 pub fn write_snapshot(&self, snapshot: &MetricsSnapshot) -> Result<usize> {
256 let result = {
258 let writer = self.current_writer.lock();
259 writer.write_snapshot(snapshot)
260 };
261
262 match result {
263 Ok(size) => Ok(size),
264 Err(_) => {
265 self.rotate()?;
267 let writer = self.current_writer.lock();
268 writer.write_snapshot(snapshot)
269 }
270 }
271 }
272
273 pub fn write_metrics(&self, metrics: &[Metric], timestamp_ns: u64) -> Result<usize> {
275 let result = {
277 let writer = self.current_writer.lock();
278 writer.write_metrics(metrics, timestamp_ns)
279 };
280
281 match result {
282 Ok(size) => Ok(size),
283 Err(_) => {
284 self.rotate()?;
286 let writer = self.current_writer.lock();
287 writer.write_metrics(metrics, timestamp_ns)
288 }
289 }
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296 use crate::monitoring::metrics::{Counter, Gauge};
297 use tempfile::TempDir;
298
299 #[test]
300 fn test_mmap_writer_basic() {
301 let temp_dir = TempDir::new().unwrap();
302 let file_path = temp_dir.path().join("test_metrics.bin");
303
304 let writer = MmapWriter::new(&file_path, 1024 * 1024, false).unwrap();
305
306 let snapshot = MetricsSnapshot {
308 timestamp_ns: 1234567890,
309 metrics: vec![AggregatedMetric {
310 name: "test_counter".into(),
311 metric_type: crate::monitoring::metrics::MetricType::Counter,
312 count: 10,
313 sum: 100.0,
314 min: 5.0,
315 max: 15.0,
316 last_value: 12.0,
317 start_timestamp_ns: 1000,
318 end_timestamp_ns: 2000,
319 }],
320 };
321
322 let size = writer.write_snapshot(&snapshot).unwrap();
324 assert!(size > 0);
325 assert_eq!(writer.write_position(), size + 5); let snapshots = writer.read_snapshots().unwrap();
329 assert_eq!(snapshots.len(), 1);
330 assert_eq!(snapshots[0].timestamp_ns, snapshot.timestamp_ns);
331 assert_eq!(snapshots[0].metrics.len(), 1);
332 }
333
334 #[test]
335 fn test_mmap_writer_compression() {
336 let temp_dir = TempDir::new().unwrap();
337 let file_path = temp_dir.path().join("test_metrics_compressed.bin");
338
339 let writer = MmapWriter::new(&file_path, 1024 * 1024, true).unwrap();
340
341 let metrics = vec![
343 Metric::Counter(Counter {
344 name: "orders".into(),
345 value: 100,
346 timestamp_ns: 1000,
347 }),
348 Metric::Gauge(Gauge {
349 name: "pnl".into(),
350 value: 1234.56,
351 timestamp_ns: 2000,
352 }),
353 ];
354
355 let size = writer.write_metrics(&metrics, 3000).unwrap();
357 assert!(size > 0);
358
359 let snapshots = writer.read_snapshots().unwrap();
361 assert_eq!(snapshots.len(), 1);
362 assert_eq!(snapshots[0].metrics.len(), 2);
363 }
364
365 #[test]
366 fn test_rotating_writer() {
367 let temp_dir = TempDir::new().unwrap();
368 let base_path = temp_dir.path();
369
370 let writer = RotatingMmapWriter::new(
371 base_path, 3, 10240, false,
374 )
375 .unwrap();
376
377 for i in 0..5 {
379 let snapshot = MetricsSnapshot {
380 timestamp_ns: i as u64,
381 metrics: vec![AggregatedMetric {
382 name: format!("metric_{i}"),
383 metric_type: crate::monitoring::metrics::MetricType::Counter,
384 count: 1,
385 sum: i as f64,
386 min: i as f64,
387 max: i as f64,
388 last_value: i as f64,
389 start_timestamp_ns: i as u64,
390 end_timestamp_ns: i as u64,
391 }],
392 };
393
394 writer.write_snapshot(&snapshot).unwrap();
395 }
396
397 let entries: Vec<_> = std::fs::read_dir(base_path)
399 .unwrap()
400 .filter_map(|e| e.ok())
401 .filter(|e| e.path().extension().is_some_and(|ext| ext == "bin"))
402 .collect();
403
404 assert!(!entries.is_empty());
405 }
406
407 #[test]
408 fn test_writer_reset() {
409 let temp_dir = TempDir::new().unwrap();
410 let file_path = temp_dir.path().join("test_reset.bin");
411
412 let writer = MmapWriter::new(&file_path, 1024, false).unwrap();
413
414 let metrics = vec![Metric::Counter(Counter {
416 name: "test".into(),
417 value: 1,
418 timestamp_ns: 1000,
419 })];
420
421 writer.write_metrics(&metrics, 2000).unwrap();
422 assert!(writer.write_position() > 0);
423
424 writer.reset().unwrap();
426 assert_eq!(writer.write_position(), 0);
427
428 let snapshots = writer.read_snapshots().unwrap();
430 assert_eq!(snapshots.len(), 0);
431 }
432}