rusty_bin/monitor/storage/
writer.rs1use crate::monitor::schema::{OrderBookRecord, OrderBookSerializer, TradeRecord, TradeSerializer};
7use crate::monitor::storage::{Result, StorageConfig, StorageError};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use tokio::fs::{File, OpenOptions};
11use tokio::io::{AsyncWriteExt, BufWriter};
12use tokio::sync::Mutex;
13
14#[derive(Debug, Clone, Copy, PartialEq)]
16pub enum CompressionMode {
17 None,
19 Realtime,
21 Buffered,
23}
24
25#[derive(Debug)]
27enum WriterType {
28 Raw(BufWriter<File>),
30 Compressed {
32 buffer: Vec<u8>,
33 file: BufWriter<File>,
34 compression_level: i32,
35 },
36}
37
38#[derive(Debug)]
40pub struct FileWriter {
41 file_path: PathBuf,
42 config: StorageConfig,
43 compression_mode: CompressionMode,
44 writer: Arc<Mutex<Option<WriterType>>>,
45 record_count: Arc<Mutex<u64>>,
46 bytes_written: Arc<Mutex<u64>>,
47 bytes_original: Arc<Mutex<u64>>,
48 first_timestamp: Arc<Mutex<Option<u64>>>,
49 last_timestamp: Arc<Mutex<Option<u64>>>,
50}
51
52impl FileWriter {
53 pub async fn new(file_path: PathBuf, config: StorageConfig) -> Result<Self> {
55 Self::new_with_compression(file_path, config, CompressionMode::Realtime).await
56 }
57
58 pub async fn new_with_compression(
60 file_path: PathBuf,
61 config: StorageConfig,
62 compression_mode: CompressionMode,
63 ) -> Result<Self> {
64 if let Some(parent) = file_path.parent() {
66 tokio::fs::create_dir_all(parent).await?;
67 }
68
69 let actual_path = match compression_mode {
71 CompressionMode::None => file_path.clone(),
72 CompressionMode::Realtime | CompressionMode::Buffered => {
73 let mut path = file_path.clone();
74 path.set_extension("fb.zst");
75 path
76 }
77 };
78
79 let file = OpenOptions::new()
81 .create(true)
82 .append(true)
83 .open(&actual_path)
84 .await?;
85
86 let writer = match compression_mode {
88 CompressionMode::None => WriterType::Raw(BufWriter::new(file)),
89 CompressionMode::Realtime | CompressionMode::Buffered => {
90 let compression_level = if compression_mode == CompressionMode::Realtime {
92 std::cmp::min(config.compression_level as i32, 3)
93 } else {
94 config.compression_level as i32
95 };
96
97 WriterType::Compressed {
98 buffer: Vec::with_capacity(64 * 1024), file: BufWriter::new(file),
100 compression_level,
101 }
102 }
103 };
104
105 Ok(Self {
106 file_path: actual_path,
107 config,
108 compression_mode,
109 writer: Arc::new(Mutex::new(Some(writer))),
110 record_count: Arc::new(Mutex::new(0)),
111 bytes_written: Arc::new(Mutex::new(0)),
112 bytes_original: Arc::new(Mutex::new(0)),
113 first_timestamp: Arc::new(Mutex::new(None)),
114 last_timestamp: Arc::new(Mutex::new(None)),
115 })
116 }
117
118 pub async fn write_trade(&self, trade: &TradeRecord) -> Result<()> {
120 let serialized = TradeSerializer::serialize_trade(trade)
121 .map_err(|e| StorageError::Serialization(e.to_string()))?;
122
123 self.write_data(&serialized, trade.timestamp_exchange).await
124 }
125
126 pub async fn write_orderbook(&self, orderbook: &OrderBookRecord) -> Result<()> {
128 let serialized = OrderBookSerializer::serialize_orderbook(orderbook)
129 .map_err(|e| StorageError::Serialization(e.to_string()))?;
130
131 self.write_data(&serialized, orderbook.timestamp_exchange)
132 .await
133 }
134
135 async fn write_data(&self, data: &[u8], timestamp: u64) -> Result<()> {
137 if self.should_roll_file().await? {
139 return Err(StorageError::FileRolling(
140 "File needs to be rolled before writing".to_string(),
141 ));
142 }
143
144 let original_size = data.len() as u64;
145 let bytes_written_to_file;
146
147 {
148 let mut writer_guard = self.writer.lock().await;
149 if let Some(ref mut writer) = *writer_guard {
150 match writer {
151 WriterType::Raw(buf_writer) => {
152 let length = data.len() as u32;
154 let length_bytes = length.to_le_bytes();
155
156 buf_writer.write_all(&length_bytes).await?;
157 buf_writer.write_all(data).await?;
158 buf_writer.flush().await?;
159
160 bytes_written_to_file = 4 + original_size;
161 }
162 WriterType::Compressed {
163 buffer,
164 file,
165 compression_level,
166 } => {
167 let length = data.len() as u32;
169 let length_bytes = length.to_le_bytes();
170
171 buffer.clear();
173 buffer.extend_from_slice(&length_bytes);
174 buffer.extend_from_slice(data);
175
176 let compressed = tokio::task::spawn_blocking({
177 let data_to_compress = buffer.clone();
178 let level = *compression_level;
179 move || -> Result<Vec<u8>> {
180 zstd::bulk::compress(&data_to_compress, level)
181 .map_err(|e| StorageError::Compression(e.to_string()))
182 }
183 })
184 .await
185 .map_err(|e| StorageError::Compression(e.to_string()))??;
186
187 let compressed_size = compressed.len() as u32;
189 let compressed_size_bytes = compressed_size.to_le_bytes();
190
191 file.write_all(&compressed_size_bytes).await?;
192 file.write_all(&compressed).await?;
193 file.flush().await?;
194
195 bytes_written_to_file = 4 + compressed.len() as u64;
196 }
197 }
198 } else {
199 return Err(StorageError::Io(std::io::Error::new(
200 std::io::ErrorKind::BrokenPipe,
201 "Writer is closed",
202 )));
203 }
204 }
205
206 {
208 let mut count = self.record_count.lock().await;
209 *count += 1;
210 }
211 {
212 let mut bytes = self.bytes_written.lock().await;
213 *bytes += bytes_written_to_file;
214 }
215 {
216 let mut original = self.bytes_original.lock().await;
217 *original += original_size + 4; }
219 {
220 let mut first = self.first_timestamp.lock().await;
221 if first.is_none() {
222 *first = Some(timestamp);
223 }
224 }
225 {
226 let mut last = self.last_timestamp.lock().await;
227 *last = Some(timestamp);
228 }
229
230 Ok(())
231 }
232
233 async fn should_roll_file(&self) -> Result<bool> {
235 let bytes_written = {
237 let guard = self.bytes_written.lock().await;
238 *guard
239 };
240 if bytes_written >= self.config.max_file_size_mb * 1024 * 1024 {
241 return Ok(true);
242 }
243
244 let record_count = {
246 let guard = self.record_count.lock().await;
247 *guard
248 };
249 if record_count >= self.config.max_records_per_file {
250 return Ok(true);
251 }
252
253 let metadata = tokio::fs::metadata(&self.file_path).await?;
255 let file_created = metadata.created().or_else(|_| metadata.modified())?;
256
257 let file_date = crate::monitor::storage::naming::SimpleDate::from_system_time(file_created);
258 let current_date = crate::monitor::storage::naming::SimpleDate::from_system_time(
259 std::time::SystemTime::now(),
260 );
261
262 Ok(file_date != current_date)
263 }
264
265 pub async fn close(&self) -> Result<()> {
267 let mut writer_guard = self.writer.lock().await;
268 if let Some(writer) = writer_guard.take() {
269 match writer {
270 WriterType::Raw(mut buf_writer) => {
271 buf_writer.flush().await?;
272 buf_writer.shutdown().await?;
273 }
274 WriterType::Compressed { mut file, .. } => {
275 file.flush().await?;
276 file.shutdown().await?;
277 }
278 }
279 }
280 Ok(())
281 }
282
283 pub async fn get_stats(&self) -> FileStats {
285 let record_count = {
286 let guard = self.record_count.lock().await;
287 *guard
288 };
289 let bytes_written = {
290 let guard = self.bytes_written.lock().await;
291 *guard
292 };
293 let bytes_original = {
294 let guard = self.bytes_original.lock().await;
295 *guard
296 };
297 let first_timestamp = {
298 let guard = self.first_timestamp.lock().await;
299 *guard
300 };
301 let last_timestamp = {
302 let guard = self.last_timestamp.lock().await;
303 *guard
304 };
305
306 let compression_ratio = if bytes_original > 0 && bytes_written > 0 {
308 bytes_written as f64 / bytes_original as f64
309 } else {
310 1.0
311 };
312
313 FileStats {
314 file_path: self.file_path.clone(),
315 record_count,
316 bytes_written,
317 bytes_original,
318 compression_ratio,
319 compression_mode: self.compression_mode,
320 first_timestamp,
321 last_timestamp,
322 }
323 }
324
325 #[must_use]
327 pub fn file_path(&self) -> &Path {
328 &self.file_path
329 }
330
331 #[must_use]
333 pub const fn compression_mode(&self) -> CompressionMode {
334 self.compression_mode
335 }
336}
337
338impl Drop for FileWriter {
339 fn drop(&mut self) {
340 }
344}
345
346#[derive(Debug, Clone)]
348pub struct FileStats {
349 pub file_path: PathBuf,
351 pub record_count: u64,
353 pub bytes_written: u64,
355 pub bytes_original: u64,
357 pub compression_ratio: f64,
359 pub compression_mode: CompressionMode,
361 pub first_timestamp: Option<u64>,
363 pub last_timestamp: Option<u64>,
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370 use crate::monitor::schema::{OrderBookRecord, PriceLevel, TradeRecord, TradeSide};
371 use rust_decimal_macros::dec;
372 use tempfile::tempdir;
373
374 fn create_test_trade() -> TradeRecord {
376 TradeRecord {
377 timestamp_exchange: 1,
378 timestamp_system: 1,
379 symbol: "BTCUSDT".into(),
380 exchange: "binance".into(),
381 price: dec!(1.0),
382 quantity: dec!(1.0),
383 side: TradeSide::Buy,
384 trade_id: "1".into(),
385 buyer_order_id: None,
386 seller_order_id: None,
387 sequence: 1,
388 }
389 }
390
391 fn create_test_orderbook() -> OrderBookRecord {
393 OrderBookRecord {
394 timestamp_exchange: 1,
395 timestamp_system: 1,
396 symbol: "BTCUSDT".into(),
397 exchange: "binance".into(),
398 bids: vec![PriceLevel {
399 price: dec!(1.0),
400 quantity: dec!(1.0),
401 order_count: Some(1),
402 }],
403 asks: vec![],
404 sequence: 1,
405 checksum: None,
406 }
407 }
408
409 #[tokio::test]
410 async fn test_file_writer_creation() {
411 let temp_dir = tempdir().unwrap();
412 let file_path = temp_dir.path().join("test.fb");
413 let config = StorageConfig::default();
414
415 let writer = FileWriter::new(file_path.clone(), config).await.unwrap();
416 assert!(writer.file_path().to_string_lossy().contains("test.fb"));
418
419 let stats = writer.get_stats().await;
420 assert_eq!(stats.record_count, 0);
421 assert_eq!(stats.bytes_written, 0);
422 assert_eq!(stats.compression_ratio, 1.0);
423 }
424
425 #[tokio::test]
426 async fn test_write_trade_triggers_rolling() {
427 let temp_dir = tempdir().unwrap();
428 let config = StorageConfig {
429 max_records_per_file: 1, ..Default::default()
431 };
432
433 let file_path = temp_dir.path().join("test_trades.fb");
434 let writer = FileWriter::new(file_path, config).await.unwrap();
435 let trade = create_test_trade();
436
437 let _ = writer.write_trade(&trade).await;
439
440 let result = writer.write_trade(&trade).await;
442 assert!(matches!(result, Err(StorageError::FileRolling(_))));
443 }
444
445 #[tokio::test]
446 async fn test_write_orderbook_triggers_rolling() {
447 let temp_dir = tempdir().unwrap();
448 let config = StorageConfig {
449 max_file_size_mb: 1,
450 max_records_per_file: 1, ..Default::default()
452 };
453 let file_path = temp_dir.path().join("test_orderbook.fb");
454 let writer = FileWriter::new(file_path, config).await.unwrap();
455 let orderbook = create_test_orderbook();
456
457 writer.write_orderbook(&orderbook).await.unwrap();
459
460 let result = writer.write_orderbook(&orderbook).await;
462 assert!(matches!(result, Err(StorageError::FileRolling(_))));
463 }
464}