rusty_bin/monitor/storage/
compressor.rs1use crate::monitor::{
6 config::storage::StorageConfig,
7 storage::{Result, StorageError, naming::FileNaming},
8};
9use std::path::{Path, PathBuf};
10use std::time::{Duration, SystemTime};
11use tokio::fs;
12use walkdir::WalkDir;
13
14#[derive(Debug)]
16pub struct FileCompressor {
17 config: StorageConfig,
18}
19
20impl FileCompressor {
21 #[must_use]
23 pub const fn new(config: StorageConfig) -> Self {
24 Self { config }
25 }
26
27 pub async fn compress_file(
29 &self,
30 input_path: &Path,
31 output_path: &Path,
32 ) -> Result<CompressionResult> {
33 let start_time = std::time::Instant::now();
34
35 let input_data = fs::read(input_path).await?;
37 let original_size = input_data.len() as u64;
38
39 let compressed_data = tokio::task::spawn_blocking({
41 let data = input_data.clone();
42 let level = self.config.compression_level as i32;
43 move || -> Result<Vec<u8>> {
44 zstd::bulk::compress(&data, level)
45 .map_err(|e| StorageError::Compression(e.to_string()))
46 }
47 })
48 .await
49 .map_err(|e| StorageError::Compression(e.to_string()))??;
50
51 let compressed_size = compressed_data.len() as u64;
52
53 if let Some(parent) = output_path.parent() {
55 fs::create_dir_all(parent).await?;
56 }
57
58 fs::write(output_path, compressed_data).await?;
60
61 let compression_time = start_time.elapsed();
62 let compression_ratio = compressed_size as f64 / original_size as f64;
63
64 Ok(CompressionResult {
65 original_size,
66 compressed_size,
67 compression_ratio,
68 compression_time,
69 })
70 }
71
72 pub async fn decompress_file(
74 &self,
75 input_path: &Path,
76 output_path: &Path,
77 ) -> Result<DecompressionResult> {
78 let start_time = std::time::Instant::now();
79
80 let compressed_data = fs::read(input_path).await?;
82 let compressed_size = compressed_data.len() as u64;
83
84 let decompressed_data = tokio::task::spawn_blocking({
86 let data = compressed_data.clone();
87 move || -> Result<Vec<u8>> {
88 zstd::bulk::decompress(&data, 10 * 1024 * 1024) .map_err(|e| StorageError::Compression(e.to_string()))
90 }
91 })
92 .await
93 .map_err(|e| StorageError::Compression(e.to_string()))??;
94
95 let decompressed_size = decompressed_data.len() as u64;
96
97 if let Some(parent) = output_path.parent() {
99 fs::create_dir_all(parent).await?;
100 }
101
102 fs::write(output_path, decompressed_data).await?;
104
105 let decompression_time = start_time.elapsed();
106
107 Ok(DecompressionResult {
108 compressed_size,
109 decompressed_size,
110 decompression_time,
111 })
112 }
113
114 pub async fn compress_old_files(&self) -> Result<CompressionSummary> {
116 let cutoff_time = SystemTime::now() - Duration::from_secs(24 * 60 * 60); let mut summary = CompressionSummary::default();
119
120 let files_to_compress = self
122 .find_files_to_compress(&self.config.market_data_path, cutoff_time)
123 .await?;
124
125 for file_path in files_to_compress {
126 match self.compress_single_file(&file_path).await {
127 Ok(result) => {
128 summary.files_processed += 1;
129 summary.original_size += result.original_size;
130 summary.compressed_size += result.compressed_size;
131 summary.total_time += result.compression_time;
132
133 log::info!(
134 "Compressed file: {} ({} -> {}, {:.1}% reduction)",
135 file_path.display(),
136 format_bytes(result.original_size),
137 format_bytes(result.compressed_size),
138 (1.0 - result.compression_ratio) * 100.0
139 );
140 }
141 Err(e) => {
142 summary.errors += 1;
143 log::error!("Failed to compress file {}: {}", file_path.display(), e);
144 }
145 }
146 }
147
148 if summary.files_processed > 0 {
149 summary.average_compression_ratio =
150 summary.compressed_size as f64 / summary.original_size as f64;
151 log::info!(
152 "Compression summary: {} files, {:.1}% average compression",
153 summary.files_processed,
154 (1.0 - summary.average_compression_ratio) * 100.0
155 );
156 }
157
158 Ok(summary)
159 }
160
161 pub async fn compress_directory(&self, dir_path: &Path, delete_original: bool) -> Result<u64> {
163 if !dir_path.exists() {
164 return Ok(0);
165 }
166
167 let mut compressed_count = 0;
168
169 for entry in WalkDir::new(dir_path).into_iter().filter_map(|e| e.ok()) {
170 if entry.file_type().is_file() {
171 let file_path = entry.path();
172
173 if file_path.extension().and_then(|s| s.to_str()) == Some("zst") {
175 continue;
176 }
177
178 if !self.is_data_file(file_path) {
180 continue;
181 }
182
183 match self.compress_single_file(file_path).await {
184 Ok(_) => {
185 compressed_count += 1;
186
187 if delete_original && let Err(e) = fs::remove_file(file_path).await {
188 log::warn!(
189 "Failed to delete original file {}: {}",
190 file_path.display(),
191 e
192 );
193 }
194 }
195 Err(e) => {
196 log::error!("Failed to compress file {}: {}", file_path.display(), e);
197 }
198 }
199 }
200 }
201
202 Ok(compressed_count)
203 }
204
205 async fn compress_single_file(&self, file_path: &Path) -> Result<CompressionResult> {
207 let compressed_path = self.get_compressed_path(file_path)?;
208
209 if compressed_path.exists() {
211 return Err(StorageError::Compression(
212 "File already compressed".to_string(),
213 ));
214 }
215
216 let result = self.compress_file(file_path, &compressed_path).await?;
217
218 if let Some(filename) = file_path.file_name().and_then(|n| n.to_str())
220 && let Some(_naming) = FileNaming::parse_filename(filename)
221 {
222 }
224
225 Ok(result)
226 }
227
228 async fn find_files_to_compress(
230 &self,
231 dir_path: &Path,
232 cutoff_time: SystemTime,
233 ) -> Result<Vec<PathBuf>> {
234 if !dir_path.exists() {
235 return Ok(Vec::new());
236 }
237
238 let mut files_to_compress = Vec::new();
239
240 for entry in WalkDir::new(dir_path).into_iter().filter_map(|e| e.ok()) {
241 if entry.file_type().is_file() {
242 let file_path = entry.path();
243
244 if file_path.extension().and_then(|s| s.to_str()) == Some("zst") {
246 continue;
247 }
248
249 if !self.is_data_file(file_path) {
251 continue;
252 }
253
254 if let Ok(metadata) = entry.metadata() {
256 let modified_time = metadata
257 .modified()
258 .or_else(|_| metadata.created())
259 .unwrap_or(SystemTime::UNIX_EPOCH);
260
261 if modified_time < cutoff_time {
262 files_to_compress.push(file_path.to_path_buf());
263 }
264 }
265 }
266 }
267
268 Ok(files_to_compress)
269 }
270
271 fn is_data_file(&self, file_path: &Path) -> bool {
273 if let Some(extension) = file_path.extension().and_then(|s| s.to_str()) {
274 matches!(extension, "fb" | "json")
275 } else {
276 false
277 }
278 }
279
280 fn get_compressed_path(&self, input_path: &Path) -> Result<PathBuf> {
282 let mut compressed_path = input_path.to_path_buf();
283 let extension = compressed_path
284 .extension()
285 .and_then(|e| e.to_str())
286 .unwrap_or("");
287
288 if extension != "zst" {
290 let new_extension = format!("{extension}.zst");
291 compressed_path.set_extension(new_extension);
292 }
293
294 Ok(compressed_path)
295 }
296
297 #[allow(dead_code)]
299 async fn update_compression_stats(
300 &self,
301 naming: &FileNaming,
302 result: &CompressionResult,
303 ) -> Result<()> {
304 log::info!(
306 "Compressed {}: {} -> {} bytes (ratio: {:.2}%, time: {:?})",
307 naming.filename(),
308 result.original_size,
309 result.compressed_size,
310 (1.0 - result.compression_ratio) * 100.0,
311 result.compression_time
312 );
313 Ok(())
314 }
315}
316
317#[derive(Debug, Clone)]
322pub struct CompressionResult {
323 pub original_size: u64,
325 pub compressed_size: u64,
327 pub compression_ratio: f64,
330 pub compression_time: std::time::Duration,
332}
333
334#[derive(Debug, Clone)]
339pub struct DecompressionResult {
340 pub compressed_size: u64,
342 pub decompressed_size: u64,
344 pub decompression_time: std::time::Duration,
346}
347
348#[derive(Debug, Clone, Default)]
354pub struct CompressionSummary {
355 pub files_processed: u64,
357 pub original_size: u64,
359 pub compressed_size: u64,
361 pub average_compression_ratio: f64,
363 pub total_time: std::time::Duration,
365 pub errors: u64,
367}
368
369fn format_bytes(bytes: u64) -> String {
390 const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
391 let mut size = bytes as f64;
392 let mut unit_index = 0;
393
394 while size >= 1024.0 && unit_index < UNITS.len() - 1 {
395 size /= 1024.0;
396 unit_index += 1;
397 }
398
399 if unit_index == 0 {
400 format!("{} {}", bytes, UNITS[unit_index])
401 } else {
402 format!("{:.1} {}", size, UNITS[unit_index])
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use crate::monitor::config::storage::StorageConfig;
409
410 use super::*;
411 use tempfile::tempdir;
412
413 #[tokio::test]
414 async fn test_file_compressor_creation() {
415 let config = StorageConfig::default();
416 let _compressor = FileCompressor::new(config);
417
418 }
420
421 #[tokio::test]
422 async fn test_compress_file() {
423 let temp_dir = tempdir().unwrap();
424 let config = StorageConfig {
425 market_data_path: temp_dir.path().to_path_buf(),
426 compression_level: 3,
427 ..Default::default()
428 };
429 let compressor = FileCompressor::new(config);
430
431 let input_path = temp_dir.path().join("test.fb");
433 let output_path = temp_dir.path().join("test.fb.zst");
434
435 let test_data = b"This is test data that should compress well. ".repeat(100);
436 fs::write(&input_path, &test_data).await.unwrap();
437
438 let result = compressor
440 .compress_file(&input_path, &output_path)
441 .await
442 .unwrap();
443
444 assert!(output_path.exists());
445 assert!(result.compressed_size < result.original_size);
446 assert!(result.compression_ratio < 1.0);
447 assert_eq!(result.original_size, test_data.len() as u64);
448 }
449
450 #[tokio::test]
451 async fn test_decompress_file() {
452 let temp_dir = tempdir().unwrap();
453 let config = StorageConfig {
454 market_data_path: temp_dir.path().to_path_buf(),
455 compression_level: 3,
456 ..Default::default()
457 };
458 let compressor = FileCompressor::new(config);
459
460 let original_path = temp_dir.path().join("original.fb");
462 let compressed_path = temp_dir.path().join("compressed.fb.zst");
463 let decompressed_path = temp_dir.path().join("decompressed.fb");
464
465 let test_data = b"This is test data for compression/decompression test.";
466 fs::write(&original_path, test_data).await.unwrap();
467
468 compressor
470 .compress_file(&original_path, &compressed_path)
471 .await
472 .unwrap();
473
474 let result = compressor
476 .decompress_file(&compressed_path, &decompressed_path)
477 .await
478 .unwrap();
479
480 assert!(decompressed_path.exists());
481 assert_eq!(result.decompressed_size, test_data.len() as u64);
482
483 let decompressed_data = fs::read(&decompressed_path).await.unwrap();
485 assert_eq!(decompressed_data, test_data);
486 }
487
488 #[test]
489 fn test_format_bytes() {
490 assert_eq!(format_bytes(512), "512 B");
491 assert_eq!(format_bytes(1024), "1.0 KB");
492 assert_eq!(format_bytes(1536), "1.5 KB");
493 assert_eq!(format_bytes(1024 * 1024), "1.0 MB");
494 assert_eq!(format_bytes(1024 * 1024 * 1024), "1.0 GB");
495 }
496}