rusty_bin/monitor/storage/
roller.rs

1//! File rolling functionality for managing file rotation
2//!
3//! This module handles automatic file rotation based on time, size, and record count criteria.
4
5use crate::monitor::storage::naming::SimpleDate;
6use crate::monitor::storage::{FileNaming, Result, StorageError};
7use crate::monitor::utils::time::TimingUtils;
8use parking_lot::RwLock;
9use rusty_common::collections::FxHashMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use std::time::SystemTime;
13use tokio::fs;
14use walkdir::WalkDir;
15
16/// File roller for managing file rotation based on configurable policies
17#[derive(Debug)]
18pub struct FileRoller {
19    base_path: PathBuf,
20    max_file_size_bytes: u64,
21    max_records_per_file: u64,
22    retention_days: u32,
23    active_files: Arc<RwLock<FxHashMap<String, ActiveFileInfo>>>,
24}
25
26/// Information about a currently active file being written to
27#[derive(Debug, Clone)]
28pub struct ActiveFileInfo {
29    path: PathBuf,
30    created_at: SystemTime,
31    last_written: SystemTime,
32    size_bytes: u64,
33    record_count: u64,
34}
35
36impl ActiveFileInfo {
37    /// Get the current file size in bytes
38    #[must_use]
39    pub const fn size_bytes(&self) -> u64 {
40        self.size_bytes
41    }
42
43    /// Get the current record count
44    #[must_use]
45    pub const fn record_count(&self) -> u64 {
46        self.record_count
47    }
48
49    /// Get when the file was created
50    #[must_use]
51    pub const fn created_at(&self) -> SystemTime {
52        self.created_at
53    }
54
55    /// Get when the file was last written to
56    #[must_use]
57    pub const fn last_written(&self) -> SystemTime {
58        self.last_written
59    }
60
61    /// Update file statistics
62    pub fn update_stats(&mut self, size_delta: u64, record_delta: u64) {
63        self.size_bytes += size_delta;
64        self.record_count += record_delta;
65        self.last_written = SystemTime::now();
66    }
67}
68
69impl FileRoller {
70    /// Create a new file roller with specified policies
71    #[must_use]
72    pub fn new(
73        base_path: PathBuf,
74        max_file_size_mb: u64,
75        max_records_per_file: u64,
76        retention_days: u32,
77    ) -> Self {
78        Self {
79            base_path,
80            max_file_size_bytes: max_file_size_mb * 1024 * 1024,
81            max_records_per_file,
82            retention_days,
83            active_files: Arc::new(RwLock::new(FxHashMap::default())),
84        }
85    }
86
87    /// Register a new active file for rotation management
88    pub fn register_active_file(&self, writer_id: String, file_path: PathBuf) {
89        let file_info = ActiveFileInfo {
90            path: file_path,
91            created_at: SystemTime::now(),
92            last_written: SystemTime::now(),
93            size_bytes: 0,
94            record_count: 0,
95        };
96
97        let mut active_files = self.active_files.write();
98        active_files.insert(writer_id, file_info);
99    }
100
101    /// Update statistics for an active file
102    pub fn update_file_statistics(&self, writer_id: &str, size_delta: u64, record_delta: u64) {
103        let mut active_files = self.active_files.write();
104        if let Some(file_info) = active_files.get_mut(writer_id) {
105            file_info.update_stats(size_delta, record_delta);
106        }
107    }
108
109    /// Check if a file needs rotation based on configured policies
110    pub fn check_rotation_policy(&self, writer_id: &str) -> bool {
111        let active_files = self.active_files.read();
112        if let Some(file_info) = active_files.get(writer_id) {
113            // Check size limit
114            if file_info.size_bytes >= self.max_file_size_bytes {
115                return true;
116            }
117
118            // Check record count limit
119            if file_info.record_count >= self.max_records_per_file {
120                return true;
121            }
122
123            // Check time-based rotation (daily)
124            let current_date = SimpleDate::from_system_time(SystemTime::now());
125            let file_date = SimpleDate::from_system_time(file_info.created_at);
126            if current_date != file_date {
127                return true;
128            }
129        }
130
131        false
132    }
133
134    /// Execute rotation for all files that meet rotation criteria
135    pub async fn execute_rotation_check(&self) -> Result<Vec<String>> {
136        let mut rotated_files = Vec::new();
137
138        // Get list of files that need rotation
139        let files_to_rotate: Vec<(String, PathBuf)> = {
140            let active_files = self.active_files.read();
141            active_files
142                .iter()
143                .filter(|(writer_id, _)| self.check_rotation_policy(writer_id))
144                .map(|(writer_id, info)| (writer_id.clone(), info.path.clone()))
145                .collect()
146        };
147
148        log::info!("Rotation needed for {} files", files_to_rotate.len());
149
150        // Rotate each file
151        for (writer_id, file_path) in files_to_rotate {
152            match self.execute_single_rotation(&writer_id, &file_path).await {
153                Ok(new_path) => {
154                    rotated_files.push(new_path);
155                    log::info!("Rotated file: {} -> {}", file_path.display(), writer_id);
156                }
157                Err(e) => {
158                    log::error!("Failed to rotate file {}: {}", file_path.display(), e);
159                }
160            }
161        }
162
163        Ok(rotated_files)
164    }
165
166    /// Execute rotation for a single file
167    async fn execute_single_rotation(&self, writer_id: &str, file_path: &Path) -> Result<String> {
168        // Parse the current filename to get components
169        let filename = file_path
170            .file_name()
171            .and_then(|n| n.to_str())
172            .ok_or_else(|| StorageError::Path("Invalid filename".to_string()))?;
173
174        let naming = FileNaming::parse_filename(filename)
175            .ok_or_else(|| StorageError::Path("Cannot parse filename".to_string()))?;
176
177        // Create new filename with current date
178        let current_date = SimpleDate::from_system_time(SystemTime::now());
179        let new_naming = FileNaming::new(
180            naming.exchange.clone(),
181            naming.symbol.clone(),
182            naming.data_type.clone(),
183            current_date,
184        );
185
186        let new_filename = new_naming.filename();
187        let new_path = file_path.parent().unwrap().join(&new_filename);
188
189        // Handle filename conflicts by adding sequence numbers
190        let final_path = self.resolve_filename_conflict(&new_path).await?;
191
192        // Move the file
193        fs::rename(file_path, &final_path).await?;
194
195        // Remove from active files registry
196        {
197            let mut active_files = self.active_files.write();
198            active_files.remove(writer_id);
199        }
200
201        // Log the rotation
202        self.log_file_rotation(&final_path, &naming).await?;
203
204        Ok(final_path.to_string_lossy().to_string())
205    }
206
207    /// Resolve filename conflicts by adding sequence numbers
208    async fn resolve_filename_conflict(&self, base_path: &Path) -> Result<PathBuf> {
209        if !base_path.exists() {
210            return Ok(base_path.to_path_buf());
211        }
212
213        let base_name = base_path
214            .file_stem()
215            .and_then(|n| n.to_str())
216            .ok_or_else(|| StorageError::Path("Invalid base filename".to_string()))?;
217
218        let extension = base_path.extension().and_then(|e| e.to_str()).unwrap_or("");
219
220        let parent = base_path
221            .parent()
222            .ok_or_else(|| StorageError::Path("No parent directory".to_string()))?;
223
224        for sequence in 1..1000 {
225            let new_name = if extension.is_empty() {
226                format!("{base_name}_{sequence:03}")
227            } else {
228                format!("{base_name}_{sequence:03}.{extension}")
229            };
230
231            let new_path = parent.join(new_name);
232            if !new_path.exists() {
233                return Ok(new_path);
234            }
235        }
236
237        Err(StorageError::FileRolling(
238            "Cannot find unique filename after 999 attempts".to_string(),
239        ))
240    }
241
242    /// Log information about a rotated file
243    async fn log_file_rotation(&self, file_path: &Path, naming: &FileNaming) -> Result<()> {
244        let metadata = fs::metadata(file_path).await?;
245        let file_size_bytes = metadata.len();
246
247        log::info!(
248            "File rotation completed: {} (size: {} bytes)",
249            naming.filename(),
250            file_size_bytes
251        );
252
253        Ok(())
254    }
255
256    /// Clean up old files based on retention policy
257    pub async fn cleanup_old_files(&self) -> Result<u64> {
258        let cutoff_date = TimingUtils::days_ago(self.retention_days);
259
260        let deleted_count = self.cleanup_directory(&self.base_path, cutoff_date).await?;
261
262        if deleted_count > 0 {
263            log::info!("Cleaned up {deleted_count} old files");
264        }
265
266        Ok(deleted_count)
267    }
268
269    /// Clean up files in a specific directory older than cutoff date
270    async fn cleanup_directory(&self, directory: &Path, cutoff_date: SimpleDate) -> Result<u64> {
271        if !directory.exists() {
272            return Ok(0);
273        }
274
275        let mut deleted_count = 0;
276
277        for entry in WalkDir::new(directory)
278            .into_iter()
279            .filter_map(|e| e.ok())
280            .filter(|e| e.file_type().is_file())
281        {
282            if let Some(filename_str) = entry.file_name().to_str()
283                && let Some(naming) = FileNaming::parse_filename(filename_str)
284                && naming.date < cutoff_date
285            {
286                match fs::remove_file(entry.path()).await {
287                    Ok(()) => {
288                        deleted_count += 1;
289                        log::debug!("Deleted old file: {}", entry.path().display());
290                    }
291                    Err(e) => {
292                        log::warn!(
293                            "Failed to delete old file {}: {}",
294                            entry.path().display(),
295                            e
296                        );
297                    }
298                }
299            }
300        }
301
302        Ok(deleted_count)
303    }
304
305    /// Get statistics for all currently active files
306    pub fn get_active_files_statistics(&self) -> FxHashMap<String, ActiveFileInfo> {
307        let active_files = self.active_files.read();
308        active_files.clone()
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315    use tempfile::tempdir;
316    use tokio::fs;
317
318    #[tokio::test]
319    async fn test_file_roller_creation() {
320        let roller = FileRoller::new(PathBuf::from("/tmp"), 1, 100, 30);
321
322        let stats = roller.get_active_files_statistics();
323        assert!(stats.is_empty());
324    }
325
326    #[tokio::test]
327    async fn test_register_and_update_file() {
328        let roller = FileRoller::new(PathBuf::from("/tmp"), 1, 100, 30);
329
330        let temp_dir = tempdir().unwrap();
331        let file_path = temp_dir.path().join("test_file.dat");
332        fs::write(&file_path, "test data").await.unwrap();
333
334        // Register the file
335        let writer_id = "test:BTCUSDT:trades".to_string();
336        roller.register_active_file(writer_id.clone(), file_path);
337
338        // Update stats
339        roller.update_file_statistics(&writer_id, 100, 1);
340
341        let stats = roller.get_active_files_statistics();
342        assert_eq!(stats.len(), 1);
343        assert!(stats.contains_key(&writer_id));
344
345        let file_info = &stats[&writer_id];
346        assert_eq!(file_info.size_bytes(), 100);
347        assert_eq!(file_info.record_count(), 1);
348    }
349
350    #[tokio::test]
351    async fn test_rotation_policy_checks() {
352        let temp_dir = tempdir().unwrap();
353        let roller = FileRoller::new(temp_dir.path().to_path_buf(), 1, 100, 30);
354        let writer_id = "test_writer".to_string();
355        let file_path = temp_dir.path().join("test_file.dat");
356        fs::write(&file_path, "initial data").await.unwrap();
357
358        roller.register_active_file(writer_id.clone(), file_path);
359
360        // Should not rotate initially
361        assert!(!roller.check_rotation_policy(&writer_id));
362
363        // Test size-based rotation
364        roller.update_file_statistics(&writer_id, 2 * 1024 * 1024, 10); // 2MB, 10 records
365        assert!(roller.check_rotation_policy(&writer_id));
366
367        // Test record-based rotation (create new roller for clean state)
368        let roller2 = FileRoller::new(temp_dir.path().to_path_buf(), 10, 50, 30);
369        let file_path2 = temp_dir.path().join("test_file2.dat");
370        fs::write(&file_path2, "initial data").await.unwrap();
371        roller2.register_active_file(writer_id.clone(), file_path2);
372        roller2.update_file_statistics(&writer_id, 1024, 100); // 1KB, 100 records
373        assert!(roller2.check_rotation_policy(&writer_id));
374    }
375
376    #[tokio::test]
377    async fn test_filename_conflict_resolution() {
378        let temp_dir = tempdir().unwrap();
379        let roller = FileRoller::new(temp_dir.path().to_path_buf(), 1, 100, 30);
380
381        let base_path = temp_dir.path().join("test.parquet");
382
383        // Create the base file to trigger conflict
384        fs::write(&base_path, "existing file").await.unwrap();
385
386        // Resolve conflict
387        let unique_path = roller.resolve_filename_conflict(&base_path).await.unwrap();
388        assert!(unique_path.to_string_lossy().contains("_001"));
389        assert!(!unique_path.exists());
390    }
391}