rusty_bin/monitor/storage/
compressor.rs

1//! File compression functionality for reducing storage space
2//!
3//! This module handles compression of old data files using zstd compression.
4
5use crate::monitor::{
6    config::storage::StorageConfig,
7    storage::{Result, StorageError, naming::FileNaming},
8};
9use std::path::{Path, PathBuf};
10use std::time::{Duration, SystemTime};
11use tokio::fs;
12use walkdir::WalkDir;
13
14/// File compressor for reducing storage space
15#[derive(Debug)]
16pub struct FileCompressor {
17    config: StorageConfig,
18}
19
20impl FileCompressor {
21    /// Create a new file compressor
22    #[must_use]
23    pub const fn new(config: StorageConfig) -> Self {
24        Self { config }
25    }
26
27    /// Compress a single file
28    pub async fn compress_file(
29        &self,
30        input_path: &Path,
31        output_path: &Path,
32    ) -> Result<CompressionResult> {
33        let start_time = std::time::Instant::now();
34
35        // Read input file
36        let input_data = fs::read(input_path).await?;
37        let original_size = input_data.len() as u64;
38
39        // Compress data
40        let compressed_data = tokio::task::spawn_blocking({
41            let data = input_data.clone();
42            let level = self.config.compression_level as i32;
43            move || -> Result<Vec<u8>> {
44                zstd::bulk::compress(&data, level)
45                    .map_err(|e| StorageError::Compression(e.to_string()))
46            }
47        })
48        .await
49        .map_err(|e| StorageError::Compression(e.to_string()))??;
50
51        let compressed_size = compressed_data.len() as u64;
52
53        // Ensure output directory exists
54        if let Some(parent) = output_path.parent() {
55            fs::create_dir_all(parent).await?;
56        }
57
58        // Write compressed file
59        fs::write(output_path, compressed_data).await?;
60
61        let compression_time = start_time.elapsed();
62        let compression_ratio = compressed_size as f64 / original_size as f64;
63
64        Ok(CompressionResult {
65            original_size,
66            compressed_size,
67            compression_ratio,
68            compression_time,
69        })
70    }
71
72    /// Decompress a file
73    pub async fn decompress_file(
74        &self,
75        input_path: &Path,
76        output_path: &Path,
77    ) -> Result<DecompressionResult> {
78        let start_time = std::time::Instant::now();
79
80        // Read compressed file
81        let compressed_data = fs::read(input_path).await?;
82        let compressed_size = compressed_data.len() as u64;
83
84        // Decompress data
85        let decompressed_data = tokio::task::spawn_blocking({
86            let data = compressed_data.clone();
87            move || -> Result<Vec<u8>> {
88                zstd::bulk::decompress(&data, 10 * 1024 * 1024) // 10MB max
89                    .map_err(|e| StorageError::Compression(e.to_string()))
90            }
91        })
92        .await
93        .map_err(|e| StorageError::Compression(e.to_string()))??;
94
95        let decompressed_size = decompressed_data.len() as u64;
96
97        // Ensure output directory exists
98        if let Some(parent) = output_path.parent() {
99            fs::create_dir_all(parent).await?;
100        }
101
102        // Write decompressed file
103        fs::write(output_path, decompressed_data).await?;
104
105        let decompression_time = start_time.elapsed();
106
107        Ok(DecompressionResult {
108            compressed_size,
109            decompressed_size,
110            decompression_time,
111        })
112    }
113
114    /// Compress old files based on age threshold
115    pub async fn compress_old_files(&self) -> Result<CompressionSummary> {
116        let cutoff_time = SystemTime::now() - Duration::from_secs(24 * 60 * 60); // Default to 24 hours if not configured
117
118        let mut summary = CompressionSummary::default();
119
120        // Process raw data directory
121        let files_to_compress = self
122            .find_files_to_compress(&self.config.market_data_path, cutoff_time)
123            .await?;
124
125        for file_path in files_to_compress {
126            match self.compress_single_file(&file_path).await {
127                Ok(result) => {
128                    summary.files_processed += 1;
129                    summary.original_size += result.original_size;
130                    summary.compressed_size += result.compressed_size;
131                    summary.total_time += result.compression_time;
132
133                    log::info!(
134                        "Compressed file: {} ({} -> {}, {:.1}% reduction)",
135                        file_path.display(),
136                        format_bytes(result.original_size),
137                        format_bytes(result.compressed_size),
138                        (1.0 - result.compression_ratio) * 100.0
139                    );
140                }
141                Err(e) => {
142                    summary.errors += 1;
143                    log::error!("Failed to compress file {}: {}", file_path.display(), e);
144                }
145            }
146        }
147
148        if summary.files_processed > 0 {
149            summary.average_compression_ratio =
150                summary.compressed_size as f64 / summary.original_size as f64;
151            log::info!(
152                "Compression summary: {} files, {:.1}% average compression",
153                summary.files_processed,
154                (1.0 - summary.average_compression_ratio) * 100.0
155            );
156        }
157
158        Ok(summary)
159    }
160
161    /// Compress a directory of files
162    pub async fn compress_directory(&self, dir_path: &Path, delete_original: bool) -> Result<u64> {
163        if !dir_path.exists() {
164            return Ok(0);
165        }
166
167        let mut compressed_count = 0;
168
169        for entry in WalkDir::new(dir_path).into_iter().filter_map(|e| e.ok()) {
170            if entry.file_type().is_file() {
171                let file_path = entry.path();
172
173                // Skip already compressed files
174                if file_path.extension().and_then(|s| s.to_str()) == Some("zst") {
175                    continue;
176                }
177
178                // Skip non-data files
179                if !self.is_data_file(file_path) {
180                    continue;
181                }
182
183                match self.compress_single_file(file_path).await {
184                    Ok(_) => {
185                        compressed_count += 1;
186
187                        if delete_original && let Err(e) = fs::remove_file(file_path).await {
188                            log::warn!(
189                                "Failed to delete original file {}: {}",
190                                file_path.display(),
191                                e
192                            );
193                        }
194                    }
195                    Err(e) => {
196                        log::error!("Failed to compress file {}: {}", file_path.display(), e);
197                    }
198                }
199            }
200        }
201
202        Ok(compressed_count)
203    }
204
205    /// Compress a single file and handle metadata
206    async fn compress_single_file(&self, file_path: &Path) -> Result<CompressionResult> {
207        let compressed_path = self.get_compressed_path(file_path)?;
208
209        // Skip if already compressed
210        if compressed_path.exists() {
211            return Err(StorageError::Compression(
212                "File already compressed".to_string(),
213            ));
214        }
215
216        let result = self.compress_file(file_path, &compressed_path).await?;
217
218        // Update metadata if it exists
219        if let Some(filename) = file_path.file_name().and_then(|n| n.to_str())
220            && let Some(_naming) = FileNaming::parse_filename(filename)
221        {
222            // Metadata update is no longer needed since we don't maintain separate metadata files
223        }
224
225        Ok(result)
226    }
227
228    /// Find files that need compression based on age
229    async fn find_files_to_compress(
230        &self,
231        dir_path: &Path,
232        cutoff_time: SystemTime,
233    ) -> Result<Vec<PathBuf>> {
234        if !dir_path.exists() {
235            return Ok(Vec::new());
236        }
237
238        let mut files_to_compress = Vec::new();
239
240        for entry in WalkDir::new(dir_path).into_iter().filter_map(|e| e.ok()) {
241            if entry.file_type().is_file() {
242                let file_path = entry.path();
243
244                // Skip already compressed files
245                if file_path.extension().and_then(|s| s.to_str()) == Some("zst") {
246                    continue;
247                }
248
249                // Skip non-data files
250                if !self.is_data_file(file_path) {
251                    continue;
252                }
253
254                // Check file age
255                if let Ok(metadata) = entry.metadata() {
256                    let modified_time = metadata
257                        .modified()
258                        .or_else(|_| metadata.created())
259                        .unwrap_or(SystemTime::UNIX_EPOCH);
260
261                    if modified_time < cutoff_time {
262                        files_to_compress.push(file_path.to_path_buf());
263                    }
264                }
265            }
266        }
267
268        Ok(files_to_compress)
269    }
270
271    /// Check if a file is a data file that should be compressed
272    fn is_data_file(&self, file_path: &Path) -> bool {
273        if let Some(extension) = file_path.extension().and_then(|s| s.to_str()) {
274            matches!(extension, "fb" | "json")
275        } else {
276            false
277        }
278    }
279
280    /// Get the compressed file path for a given input path
281    fn get_compressed_path(&self, input_path: &Path) -> Result<PathBuf> {
282        let mut compressed_path = input_path.to_path_buf();
283        let extension = compressed_path
284            .extension()
285            .and_then(|e| e.to_str())
286            .unwrap_or("");
287
288        // Add .zst extension if not already compressed
289        if extension != "zst" {
290            let new_extension = format!("{extension}.zst");
291            compressed_path.set_extension(new_extension);
292        }
293
294        Ok(compressed_path)
295    }
296
297    /// Update compression statistics (no longer updates metadata files)
298    #[allow(dead_code)]
299    async fn update_compression_stats(
300        &self,
301        naming: &FileNaming,
302        result: &CompressionResult,
303    ) -> Result<()> {
304        // Log compression statistics for monitoring
305        log::info!(
306            "Compressed {}: {} -> {} bytes (ratio: {:.2}%, time: {:?})",
307            naming.filename(),
308            result.original_size,
309            result.compressed_size,
310            (1.0 - result.compression_ratio) * 100.0,
311            result.compression_time
312        );
313        Ok(())
314    }
315}
316
317/// Result of a compression operation containing metrics about the compression process.
318///
319/// This structure provides detailed information about the compression operation,
320/// including size metrics, compression ratio, and timing information.
321#[derive(Debug, Clone)]
322pub struct CompressionResult {
323    /// The original size of the file before compression in bytes
324    pub original_size: u64,
325    /// The size of the compressed file in bytes
326    pub compressed_size: u64,
327    /// The compression ratio (compressed_size / original_size)
328    /// A value of 0.5 means the file was compressed to 50% of its original size
329    pub compression_ratio: f64,
330    /// The time taken to complete the compression operation
331    pub compression_time: std::time::Duration,
332}
333
334/// Result of a decompression operation containing metrics about the decompression process.
335///
336/// This structure provides detailed information about the decompression operation,
337/// including size metrics and timing information.
338#[derive(Debug, Clone)]
339pub struct DecompressionResult {
340    /// The size of the compressed file in bytes
341    pub compressed_size: u64,
342    /// The size of the decompressed file in bytes
343    pub decompressed_size: u64,
344    /// The time taken to complete the decompression operation
345    pub decompression_time: std::time::Duration,
346}
347
348/// Summary of multiple compression operations providing aggregate statistics.
349///
350/// This structure accumulates metrics from multiple compression operations,
351/// providing an overview of the overall compression process including success
352/// rate, space savings, and performance metrics.
353#[derive(Debug, Clone, Default)]
354pub struct CompressionSummary {
355    /// The total number of files successfully processed
356    pub files_processed: u64,
357    /// The total original size of all files before compression in bytes
358    pub original_size: u64,
359    /// The total compressed size of all files in bytes
360    pub compressed_size: u64,
361    /// The average compression ratio across all files (compressed_size / original_size)
362    pub average_compression_ratio: f64,
363    /// The total time spent on all compression operations
364    pub total_time: std::time::Duration,
365    /// The number of compression operations that failed
366    pub errors: u64,
367}
368
369/// Format bytes in human-readable format (B, KB, MB, GB, TB).
370///
371/// Converts a byte count into a human-readable string with appropriate units.
372/// Uses binary prefixes (1024 bytes = 1 KB) for consistency with storage systems.
373///
374/// # Arguments
375///
376/// * `bytes` - The number of bytes to format
377///
378/// # Returns
379///
380/// A formatted string with the size and appropriate unit
381///
382/// # Examples
383///
384/// ```
385/// assert_eq!(format_bytes(1024), "1.0 KB");
386/// assert_eq!(format_bytes(1536), "1.5 KB");
387/// assert_eq!(format_bytes(1024 * 1024), "1.0 MB");
388/// ```
389fn format_bytes(bytes: u64) -> String {
390    const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
391    let mut size = bytes as f64;
392    let mut unit_index = 0;
393
394    while size >= 1024.0 && unit_index < UNITS.len() - 1 {
395        size /= 1024.0;
396        unit_index += 1;
397    }
398
399    if unit_index == 0 {
400        format!("{} {}", bytes, UNITS[unit_index])
401    } else {
402        format!("{:.1} {}", size, UNITS[unit_index])
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use crate::monitor::config::storage::StorageConfig;
409
410    use super::*;
411    use tempfile::tempdir;
412
413    #[tokio::test]
414    async fn test_file_compressor_creation() {
415        let config = StorageConfig::default();
416        let _compressor = FileCompressor::new(config);
417
418        // Just test that it can be created without panic
419    }
420
421    #[tokio::test]
422    async fn test_compress_file() {
423        let temp_dir = tempdir().unwrap();
424        let config = StorageConfig {
425            market_data_path: temp_dir.path().to_path_buf(),
426            compression_level: 3,
427            ..Default::default()
428        };
429        let compressor = FileCompressor::new(config);
430
431        // Create test file
432        let input_path = temp_dir.path().join("test.fb");
433        let output_path = temp_dir.path().join("test.fb.zst");
434
435        let test_data = b"This is test data that should compress well. ".repeat(100);
436        fs::write(&input_path, &test_data).await.unwrap();
437
438        // Compress file
439        let result = compressor
440            .compress_file(&input_path, &output_path)
441            .await
442            .unwrap();
443
444        assert!(output_path.exists());
445        assert!(result.compressed_size < result.original_size);
446        assert!(result.compression_ratio < 1.0);
447        assert_eq!(result.original_size, test_data.len() as u64);
448    }
449
450    #[tokio::test]
451    async fn test_decompress_file() {
452        let temp_dir = tempdir().unwrap();
453        let config = StorageConfig {
454            market_data_path: temp_dir.path().to_path_buf(),
455            compression_level: 3,
456            ..Default::default()
457        };
458        let compressor = FileCompressor::new(config);
459
460        // Create and compress test file
461        let original_path = temp_dir.path().join("original.fb");
462        let compressed_path = temp_dir.path().join("compressed.fb.zst");
463        let decompressed_path = temp_dir.path().join("decompressed.fb");
464
465        let test_data = b"This is test data for compression/decompression test.";
466        fs::write(&original_path, test_data).await.unwrap();
467
468        // Compress
469        compressor
470            .compress_file(&original_path, &compressed_path)
471            .await
472            .unwrap();
473
474        // Decompress
475        let result = compressor
476            .decompress_file(&compressed_path, &decompressed_path)
477            .await
478            .unwrap();
479
480        assert!(decompressed_path.exists());
481        assert_eq!(result.decompressed_size, test_data.len() as u64);
482
483        // Verify content
484        let decompressed_data = fs::read(&decompressed_path).await.unwrap();
485        assert_eq!(decompressed_data, test_data);
486    }
487
488    #[test]
489    fn test_format_bytes() {
490        assert_eq!(format_bytes(512), "512 B");
491        assert_eq!(format_bytes(1024), "1.0 KB");
492        assert_eq!(format_bytes(1536), "1.5 KB");
493        assert_eq!(format_bytes(1024 * 1024), "1.0 MB");
494        assert_eq!(format_bytes(1024 * 1024 * 1024), "1.0 GB");
495    }
496}