rusty_bin/monitor/storage/
writer.rs

1//! File writer for market data
2//!
3//! This module handles writing trade and orderbook data to files using FlatBuffers serialization
4//! with real-time zstd compression.
5
6use crate::monitor::schema::{OrderBookRecord, OrderBookSerializer, TradeRecord, TradeSerializer};
7use crate::monitor::storage::{Result, StorageConfig, StorageError};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use tokio::fs::{File, OpenOptions};
11use tokio::io::{AsyncWriteExt, BufWriter};
12use tokio::sync::Mutex;
13
14/// Compression mode for file writer
15#[derive(Debug, Clone, Copy, PartialEq)]
16pub enum CompressionMode {
17    /// No compression - raw FlatBuffers
18    None,
19    /// Real-time zstd compression
20    Realtime,
21    /// Buffered compression (compress in batches)
22    Buffered,
23}
24
25/// Writer types for different compression modes
26#[derive(Debug)]
27enum WriterType {
28    /// Raw file writer without compression
29    Raw(BufWriter<File>),
30    /// Compressed writer using zstd
31    Compressed {
32        buffer: Vec<u8>,
33        file: BufWriter<File>,
34        compression_level: i32,
35    },
36}
37
38/// File writer for market data with real-time compression
39#[derive(Debug)]
40pub struct FileWriter {
41    file_path: PathBuf,
42    config: StorageConfig,
43    compression_mode: CompressionMode,
44    writer: Arc<Mutex<Option<WriterType>>>,
45    record_count: Arc<Mutex<u64>>,
46    bytes_written: Arc<Mutex<u64>>,
47    bytes_original: Arc<Mutex<u64>>,
48    first_timestamp: Arc<Mutex<Option<u64>>>,
49    last_timestamp: Arc<Mutex<Option<u64>>>,
50}
51
52impl FileWriter {
53    /// Create a new file writer with real-time compression
54    pub async fn new(file_path: PathBuf, config: StorageConfig) -> Result<Self> {
55        Self::new_with_compression(file_path, config, CompressionMode::Realtime).await
56    }
57
58    /// Create a new file writer with specified compression mode
59    pub async fn new_with_compression(
60        file_path: PathBuf,
61        config: StorageConfig,
62        compression_mode: CompressionMode,
63    ) -> Result<Self> {
64        // Ensure parent directory exists
65        if let Some(parent) = file_path.parent() {
66            tokio::fs::create_dir_all(parent).await?;
67        }
68
69        // Determine actual file path based on compression
70        let actual_path = match compression_mode {
71            CompressionMode::None => file_path.clone(),
72            CompressionMode::Realtime | CompressionMode::Buffered => {
73                let mut path = file_path.clone();
74                path.set_extension("fb.zst");
75                path
76            }
77        };
78
79        // Open file for writing
80        let file = OpenOptions::new()
81            .create(true)
82            .append(true)
83            .open(&actual_path)
84            .await?;
85
86        // Create writer based on compression mode
87        let writer = match compression_mode {
88            CompressionMode::None => WriterType::Raw(BufWriter::new(file)),
89            CompressionMode::Realtime | CompressionMode::Buffered => {
90                // Use fast compression level for real-time
91                let compression_level = if compression_mode == CompressionMode::Realtime {
92                    std::cmp::min(config.compression_level as i32, 3)
93                } else {
94                    config.compression_level as i32
95                };
96
97                WriterType::Compressed {
98                    buffer: Vec::with_capacity(64 * 1024), // 64KB buffer
99                    file: BufWriter::new(file),
100                    compression_level,
101                }
102            }
103        };
104
105        Ok(Self {
106            file_path: actual_path,
107            config,
108            compression_mode,
109            writer: Arc::new(Mutex::new(Some(writer))),
110            record_count: Arc::new(Mutex::new(0)),
111            bytes_written: Arc::new(Mutex::new(0)),
112            bytes_original: Arc::new(Mutex::new(0)),
113            first_timestamp: Arc::new(Mutex::new(None)),
114            last_timestamp: Arc::new(Mutex::new(None)),
115        })
116    }
117
118    /// Write a trade record with real-time compression
119    pub async fn write_trade(&self, trade: &TradeRecord) -> Result<()> {
120        let serialized = TradeSerializer::serialize_trade(trade)
121            .map_err(|e| StorageError::Serialization(e.to_string()))?;
122
123        self.write_data(&serialized, trade.timestamp_exchange).await
124    }
125
126    /// Write an orderbook record with real-time compression
127    pub async fn write_orderbook(&self, orderbook: &OrderBookRecord) -> Result<()> {
128        let serialized = OrderBookSerializer::serialize_orderbook(orderbook)
129            .map_err(|e| StorageError::Serialization(e.to_string()))?;
130
131        self.write_data(&serialized, orderbook.timestamp_exchange)
132            .await
133    }
134
135    /// Write raw data with real-time compression
136    async fn write_data(&self, data: &[u8], timestamp: u64) -> Result<()> {
137        // Check if we need to roll the file before writing
138        if self.should_roll_file().await? {
139            return Err(StorageError::FileRolling(
140                "File needs to be rolled before writing".to_string(),
141            ));
142        }
143
144        let original_size = data.len() as u64;
145        let bytes_written_to_file;
146
147        {
148            let mut writer_guard = self.writer.lock().await;
149            if let Some(ref mut writer) = *writer_guard {
150                match writer {
151                    WriterType::Raw(buf_writer) => {
152                        // Write length prefix (4 bytes) followed by data
153                        let length = data.len() as u32;
154                        let length_bytes = length.to_le_bytes();
155
156                        buf_writer.write_all(&length_bytes).await?;
157                        buf_writer.write_all(data).await?;
158                        buf_writer.flush().await?;
159
160                        bytes_written_to_file = 4 + original_size;
161                    }
162                    WriterType::Compressed {
163                        buffer,
164                        file,
165                        compression_level,
166                    } => {
167                        // Compress data in real-time
168                        let length = data.len() as u32;
169                        let length_bytes = length.to_le_bytes();
170
171                        // Compress the length + data together
172                        buffer.clear();
173                        buffer.extend_from_slice(&length_bytes);
174                        buffer.extend_from_slice(data);
175
176                        let compressed = tokio::task::spawn_blocking({
177                            let data_to_compress = buffer.clone();
178                            let level = *compression_level;
179                            move || -> Result<Vec<u8>> {
180                                zstd::bulk::compress(&data_to_compress, level)
181                                    .map_err(|e| StorageError::Compression(e.to_string()))
182                            }
183                        })
184                        .await
185                        .map_err(|e| StorageError::Compression(e.to_string()))??;
186
187                        // Write compressed size prefix + compressed data
188                        let compressed_size = compressed.len() as u32;
189                        let compressed_size_bytes = compressed_size.to_le_bytes();
190
191                        file.write_all(&compressed_size_bytes).await?;
192                        file.write_all(&compressed).await?;
193                        file.flush().await?;
194
195                        bytes_written_to_file = 4 + compressed.len() as u64;
196                    }
197                }
198            } else {
199                return Err(StorageError::Io(std::io::Error::new(
200                    std::io::ErrorKind::BrokenPipe,
201                    "Writer is closed",
202                )));
203            }
204        }
205
206        // Update statistics
207        {
208            let mut count = self.record_count.lock().await;
209            *count += 1;
210        }
211        {
212            let mut bytes = self.bytes_written.lock().await;
213            *bytes += bytes_written_to_file;
214        }
215        {
216            let mut original = self.bytes_original.lock().await;
217            *original += original_size + 4; // Include length prefix in original size
218        }
219        {
220            let mut first = self.first_timestamp.lock().await;
221            if first.is_none() {
222                *first = Some(timestamp);
223            }
224        }
225        {
226            let mut last = self.last_timestamp.lock().await;
227            *last = Some(timestamp);
228        }
229
230        Ok(())
231    }
232
233    /// Check if file should be rolled
234    async fn should_roll_file(&self) -> Result<bool> {
235        // Check file size (use compressed size if compression is enabled)
236        let bytes_written = {
237            let guard = self.bytes_written.lock().await;
238            *guard
239        };
240        if bytes_written >= self.config.max_file_size_mb * 1024 * 1024 {
241            return Ok(true);
242        }
243
244        // Check record count
245        let record_count = {
246            let guard = self.record_count.lock().await;
247            *guard
248        };
249        if record_count >= self.config.max_records_per_file {
250            return Ok(true);
251        }
252
253        // Check if it's a new day (for daily rolling)
254        let metadata = tokio::fs::metadata(&self.file_path).await?;
255        let file_created = metadata.created().or_else(|_| metadata.modified())?;
256
257        let file_date = crate::monitor::storage::naming::SimpleDate::from_system_time(file_created);
258        let current_date = crate::monitor::storage::naming::SimpleDate::from_system_time(
259            std::time::SystemTime::now(),
260        );
261
262        Ok(file_date != current_date)
263    }
264
265    /// Close the file writer and finalize compression
266    pub async fn close(&self) -> Result<()> {
267        let mut writer_guard = self.writer.lock().await;
268        if let Some(writer) = writer_guard.take() {
269            match writer {
270                WriterType::Raw(mut buf_writer) => {
271                    buf_writer.flush().await?;
272                    buf_writer.shutdown().await?;
273                }
274                WriterType::Compressed { mut file, .. } => {
275                    file.flush().await?;
276                    file.shutdown().await?;
277                }
278            }
279        }
280        Ok(())
281    }
282
283    /// Get file statistics including compression ratio
284    pub async fn get_stats(&self) -> FileStats {
285        let record_count = {
286            let guard = self.record_count.lock().await;
287            *guard
288        };
289        let bytes_written = {
290            let guard = self.bytes_written.lock().await;
291            *guard
292        };
293        let bytes_original = {
294            let guard = self.bytes_original.lock().await;
295            *guard
296        };
297        let first_timestamp = {
298            let guard = self.first_timestamp.lock().await;
299            *guard
300        };
301        let last_timestamp = {
302            let guard = self.last_timestamp.lock().await;
303            *guard
304        };
305
306        // Calculate compression ratio
307        let compression_ratio = if bytes_original > 0 && bytes_written > 0 {
308            bytes_written as f64 / bytes_original as f64
309        } else {
310            1.0
311        };
312
313        FileStats {
314            file_path: self.file_path.clone(),
315            record_count,
316            bytes_written,
317            bytes_original,
318            compression_ratio,
319            compression_mode: self.compression_mode,
320            first_timestamp,
321            last_timestamp,
322        }
323    }
324
325    /// Get file path
326    #[must_use]
327    pub fn file_path(&self) -> &Path {
328        &self.file_path
329    }
330
331    /// Get compression mode
332    #[must_use]
333    pub const fn compression_mode(&self) -> CompressionMode {
334        self.compression_mode
335    }
336}
337
338impl Drop for FileWriter {
339    fn drop(&mut self) {
340        // For async mutexes, we can't properly close in Drop since we can't await
341        // The file will be closed when the BufWriter is dropped
342        // This is not ideal but acceptable for this use case
343    }
344}
345
346/// File statistics with compression information
347#[derive(Debug, Clone)]
348pub struct FileStats {
349    /// Path to the written file
350    pub file_path: PathBuf,
351    /// Total number of records written
352    pub record_count: u64,
353    /// Total bytes written to disk (after compression if enabled)
354    pub bytes_written: u64,
355    /// Total bytes before compression
356    pub bytes_original: u64,
357    /// Compression ratio (1.0 = no compression, higher = better compression)
358    pub compression_ratio: f64,
359    /// Compression mode used for this file
360    pub compression_mode: CompressionMode,
361    /// Timestamp of the first record (nanoseconds since epoch)
362    pub first_timestamp: Option<u64>,
363    /// Timestamp of the last record (nanoseconds since epoch)
364    pub last_timestamp: Option<u64>,
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370    use crate::monitor::schema::{OrderBookRecord, PriceLevel, TradeRecord, TradeSide};
371    use rust_decimal_macros::dec;
372    use tempfile::tempdir;
373
374    // Helper to create a dummy TradeRecord
375    fn create_test_trade() -> TradeRecord {
376        TradeRecord {
377            timestamp_exchange: 1,
378            timestamp_system: 1,
379            symbol: "BTCUSDT".into(),
380            exchange: "binance".into(),
381            price: dec!(1.0),
382            quantity: dec!(1.0),
383            side: TradeSide::Buy,
384            trade_id: "1".into(),
385            buyer_order_id: None,
386            seller_order_id: None,
387            sequence: 1,
388        }
389    }
390
391    // Helper to create a dummy OrderBookRecord
392    fn create_test_orderbook() -> OrderBookRecord {
393        OrderBookRecord {
394            timestamp_exchange: 1,
395            timestamp_system: 1,
396            symbol: "BTCUSDT".into(),
397            exchange: "binance".into(),
398            bids: vec![PriceLevel {
399                price: dec!(1.0),
400                quantity: dec!(1.0),
401                order_count: Some(1),
402            }],
403            asks: vec![],
404            sequence: 1,
405            checksum: None,
406        }
407    }
408
409    #[tokio::test]
410    async fn test_file_writer_creation() {
411        let temp_dir = tempdir().unwrap();
412        let file_path = temp_dir.path().join("test.fb");
413        let config = StorageConfig::default();
414
415        let writer = FileWriter::new(file_path.clone(), config).await.unwrap();
416        // Note: actual path will have .zst extension due to compression
417        assert!(writer.file_path().to_string_lossy().contains("test.fb"));
418
419        let stats = writer.get_stats().await;
420        assert_eq!(stats.record_count, 0);
421        assert_eq!(stats.bytes_written, 0);
422        assert_eq!(stats.compression_ratio, 1.0);
423    }
424
425    #[tokio::test]
426    async fn test_write_trade_triggers_rolling() {
427        let temp_dir = tempdir().unwrap();
428        let config = StorageConfig {
429            max_records_per_file: 1, // Roll after 1 record
430            ..Default::default()
431        };
432
433        let file_path = temp_dir.path().join("test_trades.fb");
434        let writer = FileWriter::new(file_path, config).await.unwrap();
435        let trade = create_test_trade();
436
437        // This first write might succeed or fail depending on exact state, but we don't care.
438        let _ = writer.write_trade(&trade).await;
439
440        // The second write MUST return a rolling error because the record count is >= 1.
441        let result = writer.write_trade(&trade).await;
442        assert!(matches!(result, Err(StorageError::FileRolling(_))));
443    }
444
445    #[tokio::test]
446    async fn test_write_orderbook_triggers_rolling() {
447        let temp_dir = tempdir().unwrap();
448        let config = StorageConfig {
449            max_file_size_mb: 1,
450            max_records_per_file: 1, // Roll after 1 record
451            ..Default::default()
452        };
453        let file_path = temp_dir.path().join("test_orderbook.fb");
454        let writer = FileWriter::new(file_path, config).await.unwrap();
455        let orderbook = create_test_orderbook();
456
457        // First write should succeed
458        writer.write_orderbook(&orderbook).await.unwrap();
459
460        // Second write should return a rolling error
461        let result = writer.write_orderbook(&orderbook).await;
462        assert!(matches!(result, Err(StorageError::FileRolling(_))));
463    }
464}