rusty_bin/monitor/storage/
zerocopy_writer.rs1use crate::monitor::schema::{OrderBookRecord, OrderBookSerializer, TradeRecord, TradeSerializer};
7use crate::monitor::storage::{CompressionMode, Result, StorageConfig, StorageError};
8use parking_lot::Mutex;
9use quanta::Instant as QuantaInstant;
10use simd_aligned::VecSimd;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use tokio::fs::{File, OpenOptions};
14use tokio::io::{AsyncWriteExt, BufWriter};
15use wide::f64x4;
16#[derive(Debug)]
20struct ZerocopyBufferPool {
21 serialization_buffers: Vec<Vec<u8>>,
22 compression_buffers: Vec<Vec<u8>>,
23 simd_buffers: Vec<VecSimd<f64x4>>,
24 available_serialization: Vec<usize>,
25 available_compression: Vec<usize>,
26 available_simd: Vec<usize>,
27}
28
29impl ZerocopyBufferPool {
30 fn new(pool_size: usize, buffer_size: usize) -> Self {
31 let mut serialization_buffers = Vec::with_capacity(pool_size);
32 let mut compression_buffers = Vec::with_capacity(pool_size);
33 let mut simd_buffers = Vec::with_capacity(pool_size);
34 let mut available_serialization = Vec::with_capacity(pool_size);
35 let mut available_compression = Vec::with_capacity(pool_size);
36 let mut available_simd = Vec::with_capacity(pool_size);
37
38 for i in 0..pool_size {
39 serialization_buffers.push(Vec::with_capacity(buffer_size));
40 compression_buffers.push(Vec::with_capacity(buffer_size));
41 simd_buffers.push(VecSimd::<f64x4>::with(0.0, 64)); available_serialization.push(i);
43 available_compression.push(i);
44 available_simd.push(i);
45 }
46
47 Self {
48 serialization_buffers,
49 compression_buffers,
50 simd_buffers,
51 available_serialization,
52 available_compression,
53 available_simd,
54 }
55 }
56
57 fn get_serialization_buffer(&mut self) -> Option<usize> {
58 self.available_serialization.pop()
59 }
60
61 fn return_serialization_buffer(&mut self, index: usize) {
62 self.serialization_buffers[index].clear();
63 self.available_serialization.push(index);
64 }
65
66 fn get_compression_buffer(&mut self) -> Option<usize> {
67 self.available_compression.pop()
68 }
69
70 fn return_compression_buffer(&mut self, index: usize) {
71 self.compression_buffers[index].clear();
72 self.available_compression.push(index);
73 }
74}
75
76#[derive(Debug, Clone, Default)]
78pub struct ZerocopyWriterStats {
79 pub records_written: u64,
81 pub bytes_written: u64,
83 pub bytes_original: u64,
85 pub compression_ratio: f64,
87 pub avg_latency_nanos: u64,
89 pub buffer_reuse_count: u64,
91 pub zero_copy_operations: u64,
93 pub simd_operations: u64,
95}
96
97#[derive(Debug)]
99pub struct ZerocopyFileWriter {
100 file_path: PathBuf,
101 config: StorageConfig,
102 compression_mode: CompressionMode,
103 writer: Arc<Mutex<Option<BufWriter<File>>>>,
104 buffer_pool: Arc<Mutex<ZerocopyBufferPool>>,
105 stats: Arc<Mutex<ZerocopyWriterStats>>,
106 write_buffer: Arc<Mutex<Vec<u8>>>,
107 compression_buffer: Arc<Mutex<Vec<u8>>>,
108}
109
110impl ZerocopyFileWriter {
111 pub async fn new(
113 file_path: PathBuf,
114 config: StorageConfig,
115 compression_mode: CompressionMode,
116 ) -> Result<Self> {
117 if let Some(parent) = file_path.parent() {
119 tokio::fs::create_dir_all(parent).await?;
120 }
121
122 let actual_path = match compression_mode {
124 CompressionMode::None => file_path.clone(),
125 CompressionMode::Realtime | CompressionMode::Buffered => {
126 let mut path = file_path.clone();
127 path.set_extension("fb.zst");
128 path
129 }
130 };
131
132 let file = OpenOptions::new()
134 .create(true)
135 .append(true)
136 .open(&actual_path)
137 .await?;
138
139 let writer = BufWriter::new(file);
140
141 let buffer_pool = ZerocopyBufferPool::new(32, 128 * 1024); Ok(Self {
145 file_path: actual_path,
146 config,
147 compression_mode,
148 writer: Arc::new(Mutex::new(Some(writer))),
149 buffer_pool: Arc::new(Mutex::new(buffer_pool)),
150 stats: Arc::new(Mutex::new(ZerocopyWriterStats::default())),
151 write_buffer: Arc::new(Mutex::new(Vec::with_capacity(256 * 1024))), compression_buffer: Arc::new(Mutex::new(Vec::with_capacity(256 * 1024))), })
154 }
155
156 pub async fn write_trade_zerocopy(&self, trade: &TradeRecord) -> Result<()> {
158 let start_time = QuantaInstant::now();
159
160 let buffer_index = {
162 let mut pool = self.buffer_pool.lock();
163 pool.get_serialization_buffer()
164 };
165
166 if let Some(index) = buffer_index {
167 let bytes_written = {
169 let mut pool = self.buffer_pool.lock();
170 let buffer = &mut pool.serialization_buffers[index];
171 self.serialize_trade_to_buffer(trade, buffer)?
172 };
173
174 let data_to_write = {
176 let pool = self.buffer_pool.lock();
177 pool.serialization_buffers[index][..bytes_written].to_vec()
178 };
179
180 self.write_data_zerocopy(&data_to_write, trade.timestamp_exchange)
182 .await?;
183
184 {
186 let mut pool = self.buffer_pool.lock();
187 pool.return_serialization_buffer(index);
188 }
189
190 let latency_nanos = start_time.elapsed().as_nanos() as u64;
192 {
193 let mut stats = self.stats.lock();
194 stats.records_written += 1;
195 stats.bytes_written += bytes_written as u64;
196 stats.avg_latency_nanos = (stats.avg_latency_nanos * 9 + latency_nanos) / 10;
197 stats.zero_copy_operations += 1;
198 stats.buffer_reuse_count += 1;
199 }
200 } else {
201 let serialized = TradeSerializer::serialize_trade(trade)
203 .map_err(|e| StorageError::Serialization(e.to_string()))?;
204
205 self.write_data_zerocopy(&serialized, trade.timestamp_exchange)
206 .await?;
207
208 let latency_nanos = start_time.elapsed().as_nanos() as u64;
210 {
211 let mut stats = self.stats.lock();
212 stats.records_written += 1;
213 stats.bytes_written += serialized.len() as u64;
214 stats.avg_latency_nanos = (stats.avg_latency_nanos * 9 + latency_nanos) / 10;
215 stats.zero_copy_operations += 1;
216 }
217 }
218
219 Ok(())
220 }
221
222 pub async fn write_orderbook_zerocopy(&self, orderbook: &OrderBookRecord) -> Result<()> {
224 let start_time = QuantaInstant::now();
225
226 let buffer_index = {
228 let mut pool = self.buffer_pool.lock();
229 pool.get_serialization_buffer()
230 };
231
232 if let Some(index) = buffer_index {
233 let bytes_written = {
235 let mut pool = self.buffer_pool.lock();
236 let buffer = &mut pool.serialization_buffers[index];
237 self.serialize_orderbook_to_buffer(orderbook, buffer)?
238 };
239
240 let data_to_write = {
242 let pool = self.buffer_pool.lock();
243 pool.serialization_buffers[index][..bytes_written].to_vec()
244 };
245
246 self.write_data_zerocopy(&data_to_write, orderbook.timestamp_exchange)
248 .await?;
249
250 {
252 let mut pool = self.buffer_pool.lock();
253 pool.return_serialization_buffer(index);
254 }
255
256 let latency_nanos = start_time.elapsed().as_nanos() as u64;
258 {
259 let mut stats = self.stats.lock();
260 stats.records_written += 1;
261 stats.bytes_written += bytes_written as u64;
262 stats.avg_latency_nanos = (stats.avg_latency_nanos * 9 + latency_nanos) / 10;
263 stats.zero_copy_operations += 1;
264 stats.buffer_reuse_count += 1;
265 }
266 } else {
267 let serialized = OrderBookSerializer::serialize_orderbook(orderbook)
269 .map_err(|e| StorageError::Serialization(e.to_string()))?;
270
271 self.write_data_zerocopy(&serialized, orderbook.timestamp_exchange)
272 .await?;
273
274 let latency_nanos = start_time.elapsed().as_nanos() as u64;
276 {
277 let mut stats = self.stats.lock();
278 stats.records_written += 1;
279 stats.bytes_written += serialized.len() as u64;
280 stats.avg_latency_nanos = (stats.avg_latency_nanos * 9 + latency_nanos) / 10;
281 stats.zero_copy_operations += 1;
282 }
283 }
284
285 Ok(())
286 }
287
288 pub async fn write_trade_batch_simd(&self, trades: &[&TradeRecord]) -> Result<u64> {
290 if trades.is_empty() {
291 return Ok(0);
292 }
293
294 let start_time = QuantaInstant::now();
295 let mut total_bytes = 0u64;
296
297 let simd_chunk_size = 4; let chunk_count = trades.len().div_ceil(simd_chunk_size);
300
301 for chunk in trades.chunks(simd_chunk_size) {
302 for trade in chunk {
304 if let Ok(serialized) = TradeSerializer::serialize_trade(trade) {
305 self.write_data_zerocopy(&serialized, trade.timestamp_exchange)
306 .await?;
307 total_bytes += serialized.len() as u64;
308 }
309 }
310 }
311
312 let latency_nanos = start_time.elapsed().as_nanos() as u64;
314 {
315 let mut stats = self.stats.lock();
316 stats.records_written += trades.len() as u64;
317 stats.bytes_written += total_bytes;
318 stats.avg_latency_nanos = (stats.avg_latency_nanos * 9 + latency_nanos) / 10;
319 stats.simd_operations += chunk_count as u64;
320 stats.zero_copy_operations += trades.len() as u64;
321 }
322
323 Ok(total_bytes)
324 }
325
326 fn serialize_trade_to_buffer(
329 &self,
330 trade: &TradeRecord,
331 buffer: &mut Vec<u8>,
332 ) -> Result<usize> {
333 TradeSerializer::serialize_trade_into(trade, buffer)
335 .map_err(|e| StorageError::Serialization(e.to_string()))
336 }
337
338 fn serialize_orderbook_to_buffer(
341 &self,
342 orderbook: &OrderBookRecord,
343 buffer: &mut Vec<u8>,
344 ) -> Result<usize> {
345 buffer.clear();
346
347 let serialized = OrderBookSerializer::serialize_orderbook(orderbook)
349 .map_err(|e| StorageError::Serialization(e.to_string()))?;
350
351 buffer.extend_from_slice(&serialized);
352 Ok(buffer.len())
353 }
354
355 async fn write_data_zerocopy(&self, data: &[u8], _timestamp: u64) -> Result<()> {
357 let original_size = data.len() as u64;
358
359 match self.compression_mode {
360 CompressionMode::None => {
361 let write_data = {
363 let mut write_buffer = self.write_buffer.lock();
364 write_buffer.clear();
365
366 let length = data.len() as u32;
368 write_buffer.extend_from_slice(&length.to_le_bytes());
369 write_buffer.extend_from_slice(data);
370
371 write_buffer.clone()
373 };
374
375 let writer = {
377 let mut writer_guard = self.writer.lock();
378 writer_guard.take()
379 };
380
381 if let Some(mut writer) = writer {
382 let write_result = async {
384 writer.write_all(&write_data).await?;
385 writer.flush().await
386 }
387 .await;
388
389 {
391 let mut writer_guard = self.writer.lock();
392 *writer_guard = Some(writer);
393 }
394
395 write_result?;
397 } else {
398 return Err(StorageError::Io(std::io::Error::new(
399 std::io::ErrorKind::BrokenPipe,
400 "Writer is closed",
401 )));
402 }
403 }
404 CompressionMode::Realtime | CompressionMode::Buffered => {
405 let data_to_compress = {
407 let mut compression_buffer = self.compression_buffer.lock();
408 compression_buffer.clear();
409
410 let length = data.len() as u32;
412 compression_buffer.extend_from_slice(&length.to_le_bytes());
413 compression_buffer.extend_from_slice(data);
414
415 compression_buffer.clone()
417 };
418
419 let compression_level = if self.compression_mode == CompressionMode::Realtime {
421 std::cmp::min(self.config.compression_level as i32, 3)
422 } else {
423 self.config.compression_level as i32
424 };
425
426 let compressed = tokio::task::spawn_blocking({
427 move || -> Result<Vec<u8>> {
428 zstd::bulk::compress(&data_to_compress, compression_level)
429 .map_err(|e| StorageError::Compression(e.to_string()))
430 }
431 })
432 .await
433 .map_err(|e| StorageError::Compression(e.to_string()))??;
434
435 let write_data = {
437 let mut write_buffer = self.write_buffer.lock();
438 write_buffer.clear();
439
440 let compressed_size = compressed.len() as u32;
441 write_buffer.extend_from_slice(&compressed_size.to_le_bytes());
442 write_buffer.extend_from_slice(&compressed);
443
444 write_buffer.clone()
446 };
447
448 let writer = {
450 let mut writer_guard = self.writer.lock();
451 writer_guard.take()
452 };
453
454 if let Some(mut writer) = writer {
455 let write_result = async {
457 writer.write_all(&write_data).await?;
458 writer.flush().await
459 }
460 .await;
461
462 {
464 let mut writer_guard = self.writer.lock();
465 *writer_guard = Some(writer);
466 }
467
468 write_result?;
470 } else {
471 return Err(StorageError::Io(std::io::Error::new(
472 std::io::ErrorKind::BrokenPipe,
473 "Writer is closed",
474 )));
475 }
476 }
477 }
478
479 {
481 let mut stats = self.stats.lock();
482 stats.bytes_original += original_size + 4; }
484
485 Ok(())
486 }
487
488 pub async fn close(&self) -> Result<()> {
490 let writer = {
492 let mut writer_guard = self.writer.lock();
493 writer_guard.take()
494 };
495
496 let mut close_error = None;
497 if let Some(mut writer) = writer {
498 let close_result = async {
499 writer.flush().await?;
500 writer.shutdown().await
501 }
502 .await;
503
504 if let Err(e) = close_result {
506 log::error!("Error closing writer: {e}");
507 close_error = Some(e);
508 }
509 }
510
511 {
513 let mut pool = self.buffer_pool.lock();
514 pool.serialization_buffers.clear();
515 pool.compression_buffers.clear();
516 pool.simd_buffers.clear();
517 pool.available_serialization.clear();
518 pool.available_compression.clear();
519 pool.available_simd.clear();
520 }
521
522 {
524 let mut write_buf = self.write_buffer.lock();
525 write_buf.clear();
526 write_buf.shrink_to_fit();
527 }
528
529 {
530 let mut comp_buf = self.compression_buffer.lock();
531 comp_buf.clear();
532 comp_buf.shrink_to_fit();
533 }
534
535 match close_error {
537 Some(e) => Err(e.into()),
538 None => Ok(()),
539 }
540 }
541
542 pub fn get_stats(&self) -> ZerocopyWriterStats {
544 let stats = self.stats.lock();
545 let mut result = stats.clone();
546
547 if result.bytes_original > 0 && result.bytes_written > 0 {
549 result.compression_ratio = result.bytes_written as f64 / result.bytes_original as f64;
550 } else {
551 result.compression_ratio = 1.0;
552 }
553
554 result
555 }
556
557 #[must_use]
559 pub fn file_path(&self) -> &Path {
560 &self.file_path
561 }
562
563 #[must_use]
565 pub const fn compression_mode(&self) -> CompressionMode {
566 self.compression_mode
567 }
568}
569
570#[cfg(test)]
571mod tests {
572 use super::*;
573 use crate::monitor::schema::{PriceLevel, TradeSide};
574 use rust_decimal_macros::dec;
575 use tempfile::tempdir;
576
577 fn create_test_trade() -> TradeRecord {
578 TradeRecord {
579 timestamp_exchange: 1234567890123456789,
580 timestamp_system: 1234567890123456790,
581 symbol: "BTCUSDT".into(),
582 exchange: "binance".into(),
583 price: dec!(50000.123),
584 quantity: dec!(0.001),
585 side: TradeSide::Buy,
586 trade_id: "12345".into(),
587 buyer_order_id: Some("buyer123".into()),
588 seller_order_id: Some("seller456".into()),
589 sequence: 100,
590 }
591 }
592
593 fn create_test_orderbook() -> OrderBookRecord {
594 OrderBookRecord {
595 timestamp_exchange: 1234567890123456789,
596 timestamp_system: 1234567890123456790,
597 symbol: "BTCUSDT".into(),
598 exchange: "binance".into(),
599 bids: vec![PriceLevel {
600 price: dec!(49999.0),
601 quantity: dec!(1.0),
602 order_count: Some(1),
603 }],
604 asks: vec![PriceLevel {
605 price: dec!(50001.0),
606 quantity: dec!(1.0),
607 order_count: Some(1),
608 }],
609 sequence: 1,
610 checksum: Some("checksum".into()),
611 }
612 }
613
614 #[tokio::test]
615 async fn test_zerocopy_writer_creation() {
616 let temp_dir = tempdir().unwrap();
617 let file_path = temp_dir.path().join("test_zerocopy.fb");
618 let config = StorageConfig::default();
619
620 let writer = ZerocopyFileWriter::new(file_path.clone(), config, CompressionMode::None)
621 .await
622 .unwrap();
623
624 assert_eq!(writer.file_path(), file_path);
625 assert_eq!(writer.compression_mode(), CompressionMode::None);
626
627 let stats = writer.get_stats();
628 assert_eq!(stats.records_written, 0);
629 assert_eq!(stats.zero_copy_operations, 0);
630 }
631
632 #[tokio::test]
633 async fn test_zerocopy_trade_writing() {
634 let temp_dir = tempdir().unwrap();
635 let file_path = temp_dir.path().join("test_trades_zerocopy.fb");
636 let config = StorageConfig::default();
637
638 let writer = ZerocopyFileWriter::new(file_path, config, CompressionMode::None)
639 .await
640 .unwrap();
641
642 let trade = create_test_trade();
643 writer.write_trade_zerocopy(&trade).await.unwrap();
644
645 let stats = writer.get_stats();
646 assert_eq!(stats.records_written, 1);
647 assert_eq!(stats.zero_copy_operations, 1);
648 assert!(stats.bytes_written > 0);
649 assert!(stats.avg_latency_nanos > 0);
650 }
651
652 #[tokio::test]
653 async fn test_zerocopy_batch_writing() {
654 let temp_dir = tempdir().unwrap();
655 let file_path = temp_dir.path().join("test_batch_zerocopy.fb");
656 let config = StorageConfig::default();
657
658 let writer = ZerocopyFileWriter::new(file_path, config, CompressionMode::None)
659 .await
660 .unwrap();
661
662 let trades: Vec<TradeRecord> = (0..10).map(|_| create_test_trade()).collect();
664 let trade_refs: Vec<&TradeRecord> = trades.iter().collect();
665
666 let bytes_written = writer.write_trade_batch_simd(&trade_refs).await.unwrap();
667
668 assert!(bytes_written > 0);
669
670 let stats = writer.get_stats();
671 assert_eq!(stats.records_written, 10);
672 assert!(stats.simd_operations > 0);
673 assert_eq!(stats.zero_copy_operations, 10);
674 }
675
676 #[tokio::test]
677 async fn test_buffer_pool_reuse() {
678 let temp_dir = tempdir().unwrap();
679 let file_path = temp_dir.path().join("test_buffer_reuse.fb");
680 let config = StorageConfig::default();
681
682 let writer = ZerocopyFileWriter::new(file_path, config, CompressionMode::None)
683 .await
684 .unwrap();
685
686 let trade = create_test_trade();
687
688 for _ in 0..5 {
690 writer.write_trade_zerocopy(&trade).await.unwrap();
691 }
692
693 let stats = writer.get_stats();
694 assert_eq!(stats.records_written, 5);
695 assert!(stats.buffer_reuse_count > 0); }
697}