1use zerocopy::{FromBytes, IntoBytes};
7
8#[derive(
15 Debug,
16 Clone,
17 Copy,
18 zerocopy_derive::FromBytes,
19 zerocopy_derive::IntoBytes,
20 zerocopy_derive::Immutable,
21 zerocopy_derive::KnownLayout,
22)]
23#[repr(C)]
24pub struct MonitoringRecordHeader {
25 pub length: u32,
27 pub compression_flag: u8,
29 pub _reserved: [u8; 3],
31}
32
33impl MonitoringRecordHeader {
34 pub const SIZE: usize = 8;
36
37 #[inline]
39 #[must_use]
40 pub const fn new(data_length: usize, compressed: bool) -> Self {
41 Self {
42 length: data_length as u32,
43 compression_flag: if compressed { 1 } else { 0 },
44 _reserved: [0; 3],
45 }
46 }
47
48 #[inline]
50 pub const fn is_compressed(&self) -> bool {
51 self.compression_flag != 0
52 }
53
54 #[inline]
56 pub const fn data_length(&self) -> usize {
57 self.length as usize
58 }
59}
60
61#[derive(
63 Debug,
64 Clone,
65 Copy,
66 zerocopy_derive::FromBytes,
67 zerocopy_derive::IntoBytes,
68 zerocopy_derive::Immutable,
69 zerocopy_derive::KnownLayout,
70)]
71#[repr(C)]
72pub struct MetricTimestamp {
73 pub timestamp_ns: u64,
75}
76
77#[derive(
79 Debug,
80 Clone,
81 Copy,
82 zerocopy_derive::FromBytes,
83 zerocopy_derive::IntoBytes,
84 zerocopy_derive::Immutable,
85 zerocopy_derive::KnownLayout,
86)]
87#[repr(C)]
88pub struct AggregatedMetricHeader {
89 pub metric_type: u8,
91 pub _reserved: [u8; 7],
93 pub count: u64,
95 pub sum: f64,
97 pub min: f64,
99 pub max: f64,
101}
102
103impl AggregatedMetricHeader {
104 #[inline]
106 #[must_use]
107 pub const fn new(metric_type: u8, count: u64, sum: f64, min: f64, max: f64) -> Self {
108 Self {
109 metric_type,
110 _reserved: [0; 7],
111 count,
112 sum,
113 min,
114 max,
115 }
116 }
117}
118
119pub struct ZeroCopyMonitoringWriter;
121
122impl ZeroCopyMonitoringWriter {
123 #[inline]
127 pub fn write_header(
128 buffer: &mut [u8],
129 offset: usize,
130 header: &MonitoringRecordHeader,
131 ) -> Option<usize> {
132 let end = offset + MonitoringRecordHeader::SIZE;
133 if buffer.len() < end {
134 return None;
135 }
136
137 buffer[offset..end].copy_from_slice(header.as_bytes());
138 Some(MonitoringRecordHeader::SIZE)
139 }
140
141 #[inline]
143 pub fn read_header(buffer: &[u8], offset: usize) -> Option<&MonitoringRecordHeader> {
144 if buffer.len() < offset + MonitoringRecordHeader::SIZE {
145 return None;
146 }
147
148 MonitoringRecordHeader::ref_from_bytes(
149 &buffer[offset..offset + MonitoringRecordHeader::SIZE],
150 )
151 .ok()
152 }
153
154 pub fn write_metric(
156 buffer: &mut [u8],
157 offset: usize,
158 timestamp: u64,
159 metric_header: &AggregatedMetricHeader,
160 metric_name: &str,
161 ) -> Option<usize> {
162 let mut pos = offset;
163
164 let ts = MetricTimestamp {
166 timestamp_ns: timestamp,
167 };
168 let ts_size = std::mem::size_of::<MetricTimestamp>();
169 if buffer.len() < pos + ts_size {
170 return None;
171 }
172 buffer[pos..pos + ts_size].copy_from_slice(ts.as_bytes());
173 pos += ts_size;
174
175 let header_size = std::mem::size_of::<AggregatedMetricHeader>();
177 if buffer.len() < pos + header_size {
178 return None;
179 }
180 buffer[pos..pos + header_size].copy_from_slice(metric_header.as_bytes());
181 pos += header_size;
182
183 let name_bytes = metric_name.as_bytes();
185 let name_len = name_bytes.len() as u32;
186 if buffer.len() < pos + 4 + name_bytes.len() {
187 return None;
188 }
189 buffer[pos..pos + 4].copy_from_slice(&name_len.to_le_bytes());
190 pos += 4;
191 buffer[pos..pos + name_bytes.len()].copy_from_slice(name_bytes);
192 pos += name_bytes.len();
193
194 Some(pos - offset)
195 }
196}
197
198pub struct ZeroCopyMonitoringReader;
200
201impl ZeroCopyMonitoringReader {
202 pub fn read_records<F>(buffer: &[u8], mut callback: F) -> Result<usize, &'static str>
204 where
205 F: FnMut(&MonitoringRecordHeader, &[u8]),
206 {
207 let mut offset = 0;
208 let mut count = 0;
209
210 while offset + MonitoringRecordHeader::SIZE <= buffer.len() {
211 let header = MonitoringRecordHeader::ref_from_bytes(
213 &buffer[offset..offset + MonitoringRecordHeader::SIZE],
214 )
215 .map_err(|_| "Failed to parse header")?;
216
217 offset += MonitoringRecordHeader::SIZE;
218
219 let data_len = header.data_length();
221 if offset + data_len > buffer.len() {
222 break;
223 }
224
225 let data = &buffer[offset..offset + data_len];
227
228 callback(header, data);
230
231 offset += data_len;
232 count += 1;
233 }
234
235 Ok(count)
236 }
237}
238
239#[cfg(test)]
240mod tests {
241 use super::*;
242
243 #[test]
244 fn test_monitoring_record_header() {
245 let header = MonitoringRecordHeader::new(1024, true);
246 assert_eq!(header.data_length(), 1024);
247 assert!(header.is_compressed());
248
249 let bytes = header.as_bytes();
251 let parsed = MonitoringRecordHeader::ref_from_bytes(bytes).unwrap();
252 assert_eq!(parsed.length, header.length);
253 assert_eq!(parsed.compression_flag, header.compression_flag);
254 }
255
256 #[test]
257 fn test_write_read_header() {
258 let mut buffer = vec![0u8; 1024];
259 let header = MonitoringRecordHeader::new(256, false);
260
261 let written = ZeroCopyMonitoringWriter::write_header(&mut buffer, 0, &header).unwrap();
263 assert_eq!(written, MonitoringRecordHeader::SIZE);
264
265 let read_header =
267 MonitoringRecordHeader::ref_from_bytes(&buffer[0..MonitoringRecordHeader::SIZE])
268 .unwrap();
269 assert_eq!(read_header.data_length(), 256);
270 assert!(!read_header.is_compressed());
271 }
272
273 #[test]
274 fn test_metric_writing() {
275 let mut buffer = vec![0u8; 1024];
276 let metric_header = AggregatedMetricHeader::new(1, 100, 500.0, 1.0, 10.0);
277
278 let written = ZeroCopyMonitoringWriter::write_metric(
279 &mut buffer,
280 0,
281 1_000_000_000,
282 &metric_header,
283 "test_metric",
284 )
285 .unwrap();
286
287 assert!(written > 0);
288
289 let ts = MetricTimestamp::ref_from_bytes(&buffer[..8]).unwrap();
291 assert_eq!(ts.timestamp_ns, 1_000_000_000);
292 }
293
294 #[test]
295 fn test_reader() {
296 let mut buffer = vec![0u8; 1024];
297
298 let header = MonitoringRecordHeader::new(10, false);
300 let data = b"test_data_";
301
302 ZeroCopyMonitoringWriter::write_header(&mut buffer, 0, &header).unwrap();
303 buffer[8..18].copy_from_slice(data);
304
305 let mut count = 0;
307 let result = ZeroCopyMonitoringReader::read_records(&buffer[..18], |header, data| {
308 assert_eq!(header.data_length(), 10);
309 assert!(!header.is_compressed());
310 assert_eq!(data, b"test_data_");
311 count += 1;
312 });
313
314 assert!(result.is_ok());
315 assert_eq!(count, 1);
316 }
317}