rusty_engine/monitoring/
zerocopy_writer.rs

1//! Zero-copy structures for monitoring data persistence
2//!
3//! This module provides zerocopy-based structures for the monitoring writer's
4//! memory-mapped file format, eliminating allocations when writing/reading data.
5
6use zerocopy::{FromBytes, IntoBytes};
7
8/// Zero-copy header for monitoring data records
9///
10/// Format:
11/// - 4 bytes: data length (little-endian)
12/// - 1 byte: compression flag (0 = uncompressed, 1 = compressed)
13/// - 3 bytes: reserved for alignment
14#[derive(
15    Debug,
16    Clone,
17    Copy,
18    zerocopy_derive::FromBytes,
19    zerocopy_derive::IntoBytes,
20    zerocopy_derive::Immutable,
21    zerocopy_derive::KnownLayout,
22)]
23#[repr(C)]
24pub struct MonitoringRecordHeader {
25    /// Length of the data payload in bytes (little-endian)
26    pub length: u32,
27    /// Compression flag (0 = uncompressed, 1 = compressed)
28    pub compression_flag: u8,
29    /// Reserved bytes for alignment
30    pub _reserved: [u8; 3],
31}
32
33impl MonitoringRecordHeader {
34    /// Header size in bytes
35    pub const SIZE: usize = 8;
36
37    /// Create a new header
38    #[inline]
39    #[must_use]
40    pub const fn new(data_length: usize, compressed: bool) -> Self {
41        Self {
42            length: data_length as u32,
43            compression_flag: if compressed { 1 } else { 0 },
44            _reserved: [0; 3],
45        }
46    }
47
48    /// Check if the data is compressed
49    #[inline]
50    pub const fn is_compressed(&self) -> bool {
51        self.compression_flag != 0
52    }
53
54    /// Get the data length
55    #[inline]
56    pub const fn data_length(&self) -> usize {
57        self.length as usize
58    }
59}
60
61/// Zero-copy timestamp record for aggregated metrics
62#[derive(
63    Debug,
64    Clone,
65    Copy,
66    zerocopy_derive::FromBytes,
67    zerocopy_derive::IntoBytes,
68    zerocopy_derive::Immutable,
69    zerocopy_derive::KnownLayout,
70)]
71#[repr(C)]
72pub struct MetricTimestamp {
73    /// Timestamp in nanoseconds since epoch
74    pub timestamp_ns: u64,
75}
76
77/// Zero-copy aggregated metric header
78#[derive(
79    Debug,
80    Clone,
81    Copy,
82    zerocopy_derive::FromBytes,
83    zerocopy_derive::IntoBytes,
84    zerocopy_derive::Immutable,
85    zerocopy_derive::KnownLayout,
86)]
87#[repr(C)]
88pub struct AggregatedMetricHeader {
89    /// Metric type (0 = Counter, 1 = Gauge, 2 = Histogram)
90    pub metric_type: u8,
91    /// Reserved for alignment
92    pub _reserved: [u8; 7],
93    /// Number of values
94    pub count: u64,
95    /// Sum of all values
96    pub sum: f64,
97    /// Minimum value
98    pub min: f64,
99    /// Maximum value
100    pub max: f64,
101}
102
103impl AggregatedMetricHeader {
104    /// Create a new aggregated metric header
105    #[inline]
106    #[must_use]
107    pub const fn new(metric_type: u8, count: u64, sum: f64, min: f64, max: f64) -> Self {
108        Self {
109            metric_type,
110            _reserved: [0; 7],
111            count,
112            sum,
113            min,
114            max,
115        }
116    }
117}
118
119/// Zero-copy writer for monitoring data
120pub struct ZeroCopyMonitoringWriter;
121
122impl ZeroCopyMonitoringWriter {
123    /// Write a record header directly to memory-mapped buffer
124    ///
125    /// Returns the number of bytes written
126    #[inline]
127    pub fn write_header(
128        buffer: &mut [u8],
129        offset: usize,
130        header: &MonitoringRecordHeader,
131    ) -> Option<usize> {
132        let end = offset + MonitoringRecordHeader::SIZE;
133        if buffer.len() < end {
134            return None;
135        }
136
137        buffer[offset..end].copy_from_slice(header.as_bytes());
138        Some(MonitoringRecordHeader::SIZE)
139    }
140
141    /// Read a record header from memory-mapped buffer
142    #[inline]
143    pub fn read_header(buffer: &[u8], offset: usize) -> Option<&MonitoringRecordHeader> {
144        if buffer.len() < offset + MonitoringRecordHeader::SIZE {
145            return None;
146        }
147
148        MonitoringRecordHeader::ref_from_bytes(
149            &buffer[offset..offset + MonitoringRecordHeader::SIZE],
150        )
151        .ok()
152    }
153
154    /// Write aggregated metric data using zerocopy
155    pub fn write_metric(
156        buffer: &mut [u8],
157        offset: usize,
158        timestamp: u64,
159        metric_header: &AggregatedMetricHeader,
160        metric_name: &str,
161    ) -> Option<usize> {
162        let mut pos = offset;
163
164        // Write timestamp
165        let ts = MetricTimestamp {
166            timestamp_ns: timestamp,
167        };
168        let ts_size = std::mem::size_of::<MetricTimestamp>();
169        if buffer.len() < pos + ts_size {
170            return None;
171        }
172        buffer[pos..pos + ts_size].copy_from_slice(ts.as_bytes());
173        pos += ts_size;
174
175        // Write metric header
176        let header_size = std::mem::size_of::<AggregatedMetricHeader>();
177        if buffer.len() < pos + header_size {
178            return None;
179        }
180        buffer[pos..pos + header_size].copy_from_slice(metric_header.as_bytes());
181        pos += header_size;
182
183        // Write name length and name
184        let name_bytes = metric_name.as_bytes();
185        let name_len = name_bytes.len() as u32;
186        if buffer.len() < pos + 4 + name_bytes.len() {
187            return None;
188        }
189        buffer[pos..pos + 4].copy_from_slice(&name_len.to_le_bytes());
190        pos += 4;
191        buffer[pos..pos + name_bytes.len()].copy_from_slice(name_bytes);
192        pos += name_bytes.len();
193
194        Some(pos - offset)
195    }
196}
197
198/// Zero-copy reader for monitoring data
199pub struct ZeroCopyMonitoringReader;
200
201impl ZeroCopyMonitoringReader {
202    /// Read records from a memory-mapped buffer
203    pub fn read_records<F>(buffer: &[u8], mut callback: F) -> Result<usize, &'static str>
204    where
205        F: FnMut(&MonitoringRecordHeader, &[u8]),
206    {
207        let mut offset = 0;
208        let mut count = 0;
209
210        while offset + MonitoringRecordHeader::SIZE <= buffer.len() {
211            // Read header
212            let header = MonitoringRecordHeader::ref_from_bytes(
213                &buffer[offset..offset + MonitoringRecordHeader::SIZE],
214            )
215            .map_err(|_| "Failed to parse header")?;
216
217            offset += MonitoringRecordHeader::SIZE;
218
219            // Check if we have enough data
220            let data_len = header.data_length();
221            if offset + data_len > buffer.len() {
222                break;
223            }
224
225            // Get data slice
226            let data = &buffer[offset..offset + data_len];
227
228            // Call callback
229            callback(header, data);
230
231            offset += data_len;
232            count += 1;
233        }
234
235        Ok(count)
236    }
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242
243    #[test]
244    fn test_monitoring_record_header() {
245        let header = MonitoringRecordHeader::new(1024, true);
246        assert_eq!(header.data_length(), 1024);
247        assert!(header.is_compressed());
248
249        // Test round-trip
250        let bytes = header.as_bytes();
251        let parsed = MonitoringRecordHeader::ref_from_bytes(bytes).unwrap();
252        assert_eq!(parsed.length, header.length);
253        assert_eq!(parsed.compression_flag, header.compression_flag);
254    }
255
256    #[test]
257    fn test_write_read_header() {
258        let mut buffer = vec![0u8; 1024];
259        let header = MonitoringRecordHeader::new(256, false);
260
261        // Write
262        let written = ZeroCopyMonitoringWriter::write_header(&mut buffer, 0, &header).unwrap();
263        assert_eq!(written, MonitoringRecordHeader::SIZE);
264
265        // Read using zerocopy
266        let read_header =
267            MonitoringRecordHeader::ref_from_bytes(&buffer[0..MonitoringRecordHeader::SIZE])
268                .unwrap();
269        assert_eq!(read_header.data_length(), 256);
270        assert!(!read_header.is_compressed());
271    }
272
273    #[test]
274    fn test_metric_writing() {
275        let mut buffer = vec![0u8; 1024];
276        let metric_header = AggregatedMetricHeader::new(1, 100, 500.0, 1.0, 10.0);
277
278        let written = ZeroCopyMonitoringWriter::write_metric(
279            &mut buffer,
280            0,
281            1_000_000_000,
282            &metric_header,
283            "test_metric",
284        )
285        .unwrap();
286
287        assert!(written > 0);
288
289        // Verify we can read the timestamp back
290        let ts = MetricTimestamp::ref_from_bytes(&buffer[..8]).unwrap();
291        assert_eq!(ts.timestamp_ns, 1_000_000_000);
292    }
293
294    #[test]
295    fn test_reader() {
296        let mut buffer = vec![0u8; 1024];
297
298        // Write a test record
299        let header = MonitoringRecordHeader::new(10, false);
300        let data = b"test_data_";
301
302        ZeroCopyMonitoringWriter::write_header(&mut buffer, 0, &header).unwrap();
303        buffer[8..18].copy_from_slice(data);
304
305        // Read it back
306        let mut count = 0;
307        let result = ZeroCopyMonitoringReader::read_records(&buffer[..18], |header, data| {
308            assert_eq!(header.data_length(), 10);
309            assert!(!header.is_compressed());
310            assert_eq!(data, b"test_data_");
311            count += 1;
312        });
313
314        assert!(result.is_ok());
315        assert_eq!(count, 1);
316    }
317}