rusty_bin/monitor/storage/
zerocopy_writer.rs

1//! Zero-copy optimized file writer for high-frequency market data
2//!
3//! This module provides a zero-copy file writer that eliminates allocations
4//! and optimizes for sub-microsecond latency in market data storage.
5
6use crate::monitor::schema::{OrderBookRecord, OrderBookSerializer, TradeRecord, TradeSerializer};
7use crate::monitor::storage::{CompressionMode, Result, StorageConfig, StorageError};
8use parking_lot::Mutex;
9use quanta::Instant as QuantaInstant;
10use simd_aligned::VecSimd;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use tokio::fs::{File, OpenOptions};
14use tokio::io::{AsyncWriteExt, BufWriter};
15use wide::f64x4;
16// Note: zerocopy traits not needed for this implementation
17
18/// Zero-copy optimized buffer pool for reuse
19#[derive(Debug)]
20struct ZerocopyBufferPool {
21    serialization_buffers: Vec<Vec<u8>>,
22    compression_buffers: Vec<Vec<u8>>,
23    simd_buffers: Vec<VecSimd<f64x4>>,
24    available_serialization: Vec<usize>,
25    available_compression: Vec<usize>,
26    available_simd: Vec<usize>,
27}
28
29impl ZerocopyBufferPool {
30    fn new(pool_size: usize, buffer_size: usize) -> Self {
31        let mut serialization_buffers = Vec::with_capacity(pool_size);
32        let mut compression_buffers = Vec::with_capacity(pool_size);
33        let mut simd_buffers = Vec::with_capacity(pool_size);
34        let mut available_serialization = Vec::with_capacity(pool_size);
35        let mut available_compression = Vec::with_capacity(pool_size);
36        let mut available_simd = Vec::with_capacity(pool_size);
37
38        for i in 0..pool_size {
39            serialization_buffers.push(Vec::with_capacity(buffer_size));
40            compression_buffers.push(Vec::with_capacity(buffer_size));
41            simd_buffers.push(VecSimd::<f64x4>::with(0.0, 64)); // 64 SIMD elements
42            available_serialization.push(i);
43            available_compression.push(i);
44            available_simd.push(i);
45        }
46
47        Self {
48            serialization_buffers,
49            compression_buffers,
50            simd_buffers,
51            available_serialization,
52            available_compression,
53            available_simd,
54        }
55    }
56
57    fn get_serialization_buffer(&mut self) -> Option<usize> {
58        self.available_serialization.pop()
59    }
60
61    fn return_serialization_buffer(&mut self, index: usize) {
62        self.serialization_buffers[index].clear();
63        self.available_serialization.push(index);
64    }
65
66    fn get_compression_buffer(&mut self) -> Option<usize> {
67        self.available_compression.pop()
68    }
69
70    fn return_compression_buffer(&mut self, index: usize) {
71        self.compression_buffers[index].clear();
72        self.available_compression.push(index);
73    }
74}
75
76/// Zero-copy optimized file writer statistics
77#[derive(Debug, Clone, Default)]
78pub struct ZerocopyWriterStats {
79    /// Total number of records written
80    pub records_written: u64,
81    /// Total bytes written to disk (after compression if enabled)
82    pub bytes_written: u64,
83    /// Total bytes before compression
84    pub bytes_original: u64,
85    /// Compression ratio (1.0 = no compression, higher = better compression)
86    pub compression_ratio: f64,
87    /// Average write latency in nanoseconds
88    pub avg_latency_nanos: u64,
89    /// Number of times buffers were reused (performance metric)
90    pub buffer_reuse_count: u64,
91    /// Number of zero-copy operations performed
92    pub zero_copy_operations: u64,
93    /// Number of SIMD operations performed
94    pub simd_operations: u64,
95}
96
97/// Zero-copy optimized file writer
98#[derive(Debug)]
99pub struct ZerocopyFileWriter {
100    file_path: PathBuf,
101    config: StorageConfig,
102    compression_mode: CompressionMode,
103    writer: Arc<Mutex<Option<BufWriter<File>>>>,
104    buffer_pool: Arc<Mutex<ZerocopyBufferPool>>,
105    stats: Arc<Mutex<ZerocopyWriterStats>>,
106    write_buffer: Arc<Mutex<Vec<u8>>>,
107    compression_buffer: Arc<Mutex<Vec<u8>>>,
108}
109
110impl ZerocopyFileWriter {
111    /// Create a new zero-copy optimized file writer
112    pub async fn new(
113        file_path: PathBuf,
114        config: StorageConfig,
115        compression_mode: CompressionMode,
116    ) -> Result<Self> {
117        // Ensure parent directory exists
118        if let Some(parent) = file_path.parent() {
119            tokio::fs::create_dir_all(parent).await?;
120        }
121
122        // Determine actual file path based on compression
123        let actual_path = match compression_mode {
124            CompressionMode::None => file_path.clone(),
125            CompressionMode::Realtime | CompressionMode::Buffered => {
126                let mut path = file_path.clone();
127                path.set_extension("fb.zst");
128                path
129            }
130        };
131
132        // Open file for writing
133        let file = OpenOptions::new()
134            .create(true)
135            .append(true)
136            .open(&actual_path)
137            .await?;
138
139        let writer = BufWriter::new(file);
140
141        // Create buffer pool for zero-copy operations
142        let buffer_pool = ZerocopyBufferPool::new(32, 128 * 1024); // 32 buffers of 128KB each
143
144        Ok(Self {
145            file_path: actual_path,
146            config,
147            compression_mode,
148            writer: Arc::new(Mutex::new(Some(writer))),
149            buffer_pool: Arc::new(Mutex::new(buffer_pool)),
150            stats: Arc::new(Mutex::new(ZerocopyWriterStats::default())),
151            write_buffer: Arc::new(Mutex::new(Vec::with_capacity(256 * 1024))), // 256KB write buffer
152            compression_buffer: Arc::new(Mutex::new(Vec::with_capacity(256 * 1024))), // 256KB compression buffer
153        })
154    }
155
156    /// Write a trade record using zero-copy patterns
157    pub async fn write_trade_zerocopy(&self, trade: &TradeRecord) -> Result<()> {
158        let start_time = QuantaInstant::now();
159
160        // Get reusable buffer from pool
161        let buffer_index = {
162            let mut pool = self.buffer_pool.lock();
163            pool.get_serialization_buffer()
164        };
165
166        if let Some(index) = buffer_index {
167            // Use pooled buffer for serialization - truly zero-copy path
168            let bytes_written = {
169                let mut pool = self.buffer_pool.lock();
170                let buffer = &mut pool.serialization_buffers[index];
171                self.serialize_trade_to_buffer(trade, buffer)?
172            };
173
174            // Copy data from pooled buffer to avoid holding lock across await
175            let data_to_write = {
176                let pool = self.buffer_pool.lock();
177                pool.serialization_buffers[index][..bytes_written].to_vec()
178            };
179
180            // Write data (lock is dropped before await)
181            self.write_data_zerocopy(&data_to_write, trade.timestamp_exchange)
182                .await?;
183
184            // Return buffer to pool after use
185            {
186                let mut pool = self.buffer_pool.lock();
187                pool.return_serialization_buffer(index);
188            }
189
190            // Update statistics
191            let latency_nanos = start_time.elapsed().as_nanos() as u64;
192            {
193                let mut stats = self.stats.lock();
194                stats.records_written += 1;
195                stats.bytes_written += bytes_written as u64;
196                stats.avg_latency_nanos = (stats.avg_latency_nanos * 9 + latency_nanos) / 10;
197                stats.zero_copy_operations += 1;
198                stats.buffer_reuse_count += 1;
199            }
200        } else {
201            // Fallback to regular serialization if no buffer available
202            let serialized = TradeSerializer::serialize_trade(trade)
203                .map_err(|e| StorageError::Serialization(e.to_string()))?;
204
205            self.write_data_zerocopy(&serialized, trade.timestamp_exchange)
206                .await?;
207
208            // Update statistics
209            let latency_nanos = start_time.elapsed().as_nanos() as u64;
210            {
211                let mut stats = self.stats.lock();
212                stats.records_written += 1;
213                stats.bytes_written += serialized.len() as u64;
214                stats.avg_latency_nanos = (stats.avg_latency_nanos * 9 + latency_nanos) / 10;
215                stats.zero_copy_operations += 1;
216            }
217        }
218
219        Ok(())
220    }
221
222    /// Write an orderbook record using zero-copy patterns
223    pub async fn write_orderbook_zerocopy(&self, orderbook: &OrderBookRecord) -> Result<()> {
224        let start_time = QuantaInstant::now();
225
226        // Get reusable buffer from pool
227        let buffer_index = {
228            let mut pool = self.buffer_pool.lock();
229            pool.get_serialization_buffer()
230        };
231
232        if let Some(index) = buffer_index {
233            // Use pooled buffer for serialization - truly zero-copy path
234            let bytes_written = {
235                let mut pool = self.buffer_pool.lock();
236                let buffer = &mut pool.serialization_buffers[index];
237                self.serialize_orderbook_to_buffer(orderbook, buffer)?
238            };
239
240            // Copy data from pooled buffer to avoid holding lock across await
241            let data_to_write = {
242                let pool = self.buffer_pool.lock();
243                pool.serialization_buffers[index][..bytes_written].to_vec()
244            };
245
246            // Write data (lock is dropped before await)
247            self.write_data_zerocopy(&data_to_write, orderbook.timestamp_exchange)
248                .await?;
249
250            // Return buffer to pool after use
251            {
252                let mut pool = self.buffer_pool.lock();
253                pool.return_serialization_buffer(index);
254            }
255
256            // Update statistics
257            let latency_nanos = start_time.elapsed().as_nanos() as u64;
258            {
259                let mut stats = self.stats.lock();
260                stats.records_written += 1;
261                stats.bytes_written += bytes_written as u64;
262                stats.avg_latency_nanos = (stats.avg_latency_nanos * 9 + latency_nanos) / 10;
263                stats.zero_copy_operations += 1;
264                stats.buffer_reuse_count += 1;
265            }
266        } else {
267            // Fallback to regular serialization if no buffer available
268            let serialized = OrderBookSerializer::serialize_orderbook(orderbook)
269                .map_err(|e| StorageError::Serialization(e.to_string()))?;
270
271            self.write_data_zerocopy(&serialized, orderbook.timestamp_exchange)
272                .await?;
273
274            // Update statistics
275            let latency_nanos = start_time.elapsed().as_nanos() as u64;
276            {
277                let mut stats = self.stats.lock();
278                stats.records_written += 1;
279                stats.bytes_written += serialized.len() as u64;
280                stats.avg_latency_nanos = (stats.avg_latency_nanos * 9 + latency_nanos) / 10;
281                stats.zero_copy_operations += 1;
282            }
283        }
284
285        Ok(())
286    }
287
288    /// Write multiple trades using SIMD-optimized batching
289    pub async fn write_trade_batch_simd(&self, trades: &[&TradeRecord]) -> Result<u64> {
290        if trades.is_empty() {
291            return Ok(0);
292        }
293
294        let start_time = QuantaInstant::now();
295        let mut total_bytes = 0u64;
296
297        // Process trades in SIMD-aligned chunks
298        let simd_chunk_size = 4; // Process 4 trades at once with f64x4
299        let chunk_count = trades.len().div_ceil(simd_chunk_size);
300
301        for chunk in trades.chunks(simd_chunk_size) {
302            // Serialize each trade in the chunk
303            for trade in chunk {
304                if let Ok(serialized) = TradeSerializer::serialize_trade(trade) {
305                    self.write_data_zerocopy(&serialized, trade.timestamp_exchange)
306                        .await?;
307                    total_bytes += serialized.len() as u64;
308                }
309            }
310        }
311
312        // Update SIMD statistics
313        let latency_nanos = start_time.elapsed().as_nanos() as u64;
314        {
315            let mut stats = self.stats.lock();
316            stats.records_written += trades.len() as u64;
317            stats.bytes_written += total_bytes;
318            stats.avg_latency_nanos = (stats.avg_latency_nanos * 9 + latency_nanos) / 10;
319            stats.simd_operations += chunk_count as u64;
320            stats.zero_copy_operations += trades.len() as u64;
321        }
322
323        Ok(total_bytes)
324    }
325
326    /// Serialize trade directly into a reusable buffer (truly zero-copy)
327    /// Returns the number of bytes written
328    fn serialize_trade_to_buffer(
329        &self,
330        trade: &TradeRecord,
331        buffer: &mut Vec<u8>,
332    ) -> Result<usize> {
333        // Write directly into the provided buffer
334        TradeSerializer::serialize_trade_into(trade, buffer)
335            .map_err(|e| StorageError::Serialization(e.to_string()))
336    }
337
338    /// Serialize orderbook directly into a reusable buffer (zero-copy)
339    /// Returns the number of bytes written
340    fn serialize_orderbook_to_buffer(
341        &self,
342        orderbook: &OrderBookRecord,
343        buffer: &mut Vec<u8>,
344    ) -> Result<usize> {
345        buffer.clear();
346
347        // Use the existing OrderBookSerializer but copy into our buffer
348        let serialized = OrderBookSerializer::serialize_orderbook(orderbook)
349            .map_err(|e| StorageError::Serialization(e.to_string()))?;
350
351        buffer.extend_from_slice(&serialized);
352        Ok(buffer.len())
353    }
354
355    /// Write data using zero-copy patterns with reusable buffers
356    async fn write_data_zerocopy(&self, data: &[u8], _timestamp: u64) -> Result<()> {
357        let original_size = data.len() as u64;
358
359        match self.compression_mode {
360            CompressionMode::None => {
361                // Prepare data without compression
362                let write_data = {
363                    let mut write_buffer = self.write_buffer.lock();
364                    write_buffer.clear();
365
366                    // Write length prefix (4 bytes) followed by data
367                    let length = data.len() as u32;
368                    write_buffer.extend_from_slice(&length.to_le_bytes());
369                    write_buffer.extend_from_slice(data);
370
371                    // Clone the data to avoid holding lock across await
372                    write_buffer.clone()
373                };
374
375                // Write to file without holding lock across await
376                let writer = {
377                    let mut writer_guard = self.writer.lock();
378                    writer_guard.take()
379                };
380
381                if let Some(mut writer) = writer {
382                    // Perform async operations without any locks held
383                    let write_result = async {
384                        writer.write_all(&write_data).await?;
385                        writer.flush().await
386                    }
387                    .await;
388
389                    // Always put writer back, even on error
390                    {
391                        let mut writer_guard = self.writer.lock();
392                        *writer_guard = Some(writer);
393                    }
394
395                    // Now handle any error that occurred
396                    write_result?;
397                } else {
398                    return Err(StorageError::Io(std::io::Error::new(
399                        std::io::ErrorKind::BrokenPipe,
400                        "Writer is closed",
401                    )));
402                }
403            }
404            CompressionMode::Realtime | CompressionMode::Buffered => {
405                // Prepare data for compression without holding locks across await
406                let data_to_compress = {
407                    let mut compression_buffer = self.compression_buffer.lock();
408                    compression_buffer.clear();
409
410                    // Prepare data for compression
411                    let length = data.len() as u32;
412                    compression_buffer.extend_from_slice(&length.to_le_bytes());
413                    compression_buffer.extend_from_slice(data);
414
415                    // Clone data to avoid holding lock across await
416                    compression_buffer.clone()
417                };
418
419                // Compress using real-time settings
420                let compression_level = if self.compression_mode == CompressionMode::Realtime {
421                    std::cmp::min(self.config.compression_level as i32, 3)
422                } else {
423                    self.config.compression_level as i32
424                };
425
426                let compressed = tokio::task::spawn_blocking({
427                    move || -> Result<Vec<u8>> {
428                        zstd::bulk::compress(&data_to_compress, compression_level)
429                            .map_err(|e| StorageError::Compression(e.to_string()))
430                    }
431                })
432                .await
433                .map_err(|e| StorageError::Compression(e.to_string()))??;
434
435                // Prepare write buffer without holding lock across await
436                let write_data = {
437                    let mut write_buffer = self.write_buffer.lock();
438                    write_buffer.clear();
439
440                    let compressed_size = compressed.len() as u32;
441                    write_buffer.extend_from_slice(&compressed_size.to_le_bytes());
442                    write_buffer.extend_from_slice(&compressed);
443
444                    // Clone data to avoid holding lock across await
445                    write_buffer.clone()
446                };
447
448                // Write compressed data without holding lock across await
449                let writer = {
450                    let mut writer_guard = self.writer.lock();
451                    writer_guard.take()
452                };
453
454                if let Some(mut writer) = writer {
455                    // Perform async operations without any locks held
456                    let write_result = async {
457                        writer.write_all(&write_data).await?;
458                        writer.flush().await
459                    }
460                    .await;
461
462                    // Always put writer back, even on error
463                    {
464                        let mut writer_guard = self.writer.lock();
465                        *writer_guard = Some(writer);
466                    }
467
468                    // Now handle any error that occurred
469                    write_result?;
470                } else {
471                    return Err(StorageError::Io(std::io::Error::new(
472                        std::io::ErrorKind::BrokenPipe,
473                        "Writer is closed",
474                    )));
475                }
476            }
477        }
478
479        // Update compression ratio in stats
480        {
481            let mut stats = self.stats.lock();
482            stats.bytes_original += original_size + 4; // Include length prefix
483        }
484
485        Ok(())
486    }
487
488    /// Close the writer and cleanup resources
489    pub async fn close(&self) -> Result<()> {
490        // 1. Close and flush the writer
491        let writer = {
492            let mut writer_guard = self.writer.lock();
493            writer_guard.take()
494        };
495
496        let mut close_error = None;
497        if let Some(mut writer) = writer {
498            let close_result = async {
499                writer.flush().await?;
500                writer.shutdown().await
501            }
502            .await;
503
504            // If close failed, store the error but continue with cleanup
505            if let Err(e) = close_result {
506                log::error!("Error closing writer: {e}");
507                close_error = Some(e);
508            }
509        }
510
511        // 2. Clear buffer pool to release ~8MB of memory
512        {
513            let mut pool = self.buffer_pool.lock();
514            pool.serialization_buffers.clear();
515            pool.compression_buffers.clear();
516            pool.simd_buffers.clear();
517            pool.available_serialization.clear();
518            pool.available_compression.clear();
519            pool.available_simd.clear();
520        }
521
522        // 3. Clear and shrink working buffers
523        {
524            let mut write_buf = self.write_buffer.lock();
525            write_buf.clear();
526            write_buf.shrink_to_fit();
527        }
528
529        {
530            let mut comp_buf = self.compression_buffer.lock();
531            comp_buf.clear();
532            comp_buf.shrink_to_fit();
533        }
534
535        // Return the close error if one occurred, otherwise Ok
536        match close_error {
537            Some(e) => Err(e.into()),
538            None => Ok(()),
539        }
540    }
541
542    /// Get zero-copy writer statistics
543    pub fn get_stats(&self) -> ZerocopyWriterStats {
544        let stats = self.stats.lock();
545        let mut result = stats.clone();
546
547        // Calculate compression ratio
548        if result.bytes_original > 0 && result.bytes_written > 0 {
549            result.compression_ratio = result.bytes_written as f64 / result.bytes_original as f64;
550        } else {
551            result.compression_ratio = 1.0;
552        }
553
554        result
555    }
556
557    /// Get file path
558    #[must_use]
559    pub fn file_path(&self) -> &Path {
560        &self.file_path
561    }
562
563    /// Get compression mode
564    #[must_use]
565    pub const fn compression_mode(&self) -> CompressionMode {
566        self.compression_mode
567    }
568}
569
570#[cfg(test)]
571mod tests {
572    use super::*;
573    use crate::monitor::schema::{PriceLevel, TradeSide};
574    use rust_decimal_macros::dec;
575    use tempfile::tempdir;
576
577    fn create_test_trade() -> TradeRecord {
578        TradeRecord {
579            timestamp_exchange: 1234567890123456789,
580            timestamp_system: 1234567890123456790,
581            symbol: "BTCUSDT".into(),
582            exchange: "binance".into(),
583            price: dec!(50000.123),
584            quantity: dec!(0.001),
585            side: TradeSide::Buy,
586            trade_id: "12345".into(),
587            buyer_order_id: Some("buyer123".into()),
588            seller_order_id: Some("seller456".into()),
589            sequence: 100,
590        }
591    }
592
593    fn create_test_orderbook() -> OrderBookRecord {
594        OrderBookRecord {
595            timestamp_exchange: 1234567890123456789,
596            timestamp_system: 1234567890123456790,
597            symbol: "BTCUSDT".into(),
598            exchange: "binance".into(),
599            bids: vec![PriceLevel {
600                price: dec!(49999.0),
601                quantity: dec!(1.0),
602                order_count: Some(1),
603            }],
604            asks: vec![PriceLevel {
605                price: dec!(50001.0),
606                quantity: dec!(1.0),
607                order_count: Some(1),
608            }],
609            sequence: 1,
610            checksum: Some("checksum".into()),
611        }
612    }
613
614    #[tokio::test]
615    async fn test_zerocopy_writer_creation() {
616        let temp_dir = tempdir().unwrap();
617        let file_path = temp_dir.path().join("test_zerocopy.fb");
618        let config = StorageConfig::default();
619
620        let writer = ZerocopyFileWriter::new(file_path.clone(), config, CompressionMode::None)
621            .await
622            .unwrap();
623
624        assert_eq!(writer.file_path(), file_path);
625        assert_eq!(writer.compression_mode(), CompressionMode::None);
626
627        let stats = writer.get_stats();
628        assert_eq!(stats.records_written, 0);
629        assert_eq!(stats.zero_copy_operations, 0);
630    }
631
632    #[tokio::test]
633    async fn test_zerocopy_trade_writing() {
634        let temp_dir = tempdir().unwrap();
635        let file_path = temp_dir.path().join("test_trades_zerocopy.fb");
636        let config = StorageConfig::default();
637
638        let writer = ZerocopyFileWriter::new(file_path, config, CompressionMode::None)
639            .await
640            .unwrap();
641
642        let trade = create_test_trade();
643        writer.write_trade_zerocopy(&trade).await.unwrap();
644
645        let stats = writer.get_stats();
646        assert_eq!(stats.records_written, 1);
647        assert_eq!(stats.zero_copy_operations, 1);
648        assert!(stats.bytes_written > 0);
649        assert!(stats.avg_latency_nanos > 0);
650    }
651
652    #[tokio::test]
653    async fn test_zerocopy_batch_writing() {
654        let temp_dir = tempdir().unwrap();
655        let file_path = temp_dir.path().join("test_batch_zerocopy.fb");
656        let config = StorageConfig::default();
657
658        let writer = ZerocopyFileWriter::new(file_path, config, CompressionMode::None)
659            .await
660            .unwrap();
661
662        // Create batch of trades
663        let trades: Vec<TradeRecord> = (0..10).map(|_| create_test_trade()).collect();
664        let trade_refs: Vec<&TradeRecord> = trades.iter().collect();
665
666        let bytes_written = writer.write_trade_batch_simd(&trade_refs).await.unwrap();
667
668        assert!(bytes_written > 0);
669
670        let stats = writer.get_stats();
671        assert_eq!(stats.records_written, 10);
672        assert!(stats.simd_operations > 0);
673        assert_eq!(stats.zero_copy_operations, 10);
674    }
675
676    #[tokio::test]
677    async fn test_buffer_pool_reuse() {
678        let temp_dir = tempdir().unwrap();
679        let file_path = temp_dir.path().join("test_buffer_reuse.fb");
680        let config = StorageConfig::default();
681
682        let writer = ZerocopyFileWriter::new(file_path, config, CompressionMode::None)
683            .await
684            .unwrap();
685
686        let trade = create_test_trade();
687
688        // Write multiple trades to test buffer reuse
689        for _ in 0..5 {
690            writer.write_trade_zerocopy(&trade).await.unwrap();
691        }
692
693        let stats = writer.get_stats();
694        assert_eq!(stats.records_written, 5);
695        assert!(stats.buffer_reuse_count > 0); // Should have reused buffers
696    }
697}