rusty_bin/monitor/storage/
roller.rs1use crate::monitor::storage::naming::SimpleDate;
6use crate::monitor::storage::{FileNaming, Result, StorageError};
7use crate::monitor::utils::time::TimingUtils;
8use parking_lot::RwLock;
9use rusty_common::collections::FxHashMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use std::time::SystemTime;
13use tokio::fs;
14use walkdir::WalkDir;
15
16#[derive(Debug)]
18pub struct FileRoller {
19 base_path: PathBuf,
20 max_file_size_bytes: u64,
21 max_records_per_file: u64,
22 retention_days: u32,
23 active_files: Arc<RwLock<FxHashMap<String, ActiveFileInfo>>>,
24}
25
26#[derive(Debug, Clone)]
28pub struct ActiveFileInfo {
29 path: PathBuf,
30 created_at: SystemTime,
31 last_written: SystemTime,
32 size_bytes: u64,
33 record_count: u64,
34}
35
36impl ActiveFileInfo {
37 #[must_use]
39 pub const fn size_bytes(&self) -> u64 {
40 self.size_bytes
41 }
42
43 #[must_use]
45 pub const fn record_count(&self) -> u64 {
46 self.record_count
47 }
48
49 #[must_use]
51 pub const fn created_at(&self) -> SystemTime {
52 self.created_at
53 }
54
55 #[must_use]
57 pub const fn last_written(&self) -> SystemTime {
58 self.last_written
59 }
60
61 pub fn update_stats(&mut self, size_delta: u64, record_delta: u64) {
63 self.size_bytes += size_delta;
64 self.record_count += record_delta;
65 self.last_written = SystemTime::now();
66 }
67}
68
69impl FileRoller {
70 #[must_use]
72 pub fn new(
73 base_path: PathBuf,
74 max_file_size_mb: u64,
75 max_records_per_file: u64,
76 retention_days: u32,
77 ) -> Self {
78 Self {
79 base_path,
80 max_file_size_bytes: max_file_size_mb * 1024 * 1024,
81 max_records_per_file,
82 retention_days,
83 active_files: Arc::new(RwLock::new(FxHashMap::default())),
84 }
85 }
86
87 pub fn register_active_file(&self, writer_id: String, file_path: PathBuf) {
89 let file_info = ActiveFileInfo {
90 path: file_path,
91 created_at: SystemTime::now(),
92 last_written: SystemTime::now(),
93 size_bytes: 0,
94 record_count: 0,
95 };
96
97 let mut active_files = self.active_files.write();
98 active_files.insert(writer_id, file_info);
99 }
100
101 pub fn update_file_statistics(&self, writer_id: &str, size_delta: u64, record_delta: u64) {
103 let mut active_files = self.active_files.write();
104 if let Some(file_info) = active_files.get_mut(writer_id) {
105 file_info.update_stats(size_delta, record_delta);
106 }
107 }
108
109 pub fn check_rotation_policy(&self, writer_id: &str) -> bool {
111 let active_files = self.active_files.read();
112 if let Some(file_info) = active_files.get(writer_id) {
113 if file_info.size_bytes >= self.max_file_size_bytes {
115 return true;
116 }
117
118 if file_info.record_count >= self.max_records_per_file {
120 return true;
121 }
122
123 let current_date = SimpleDate::from_system_time(SystemTime::now());
125 let file_date = SimpleDate::from_system_time(file_info.created_at);
126 if current_date != file_date {
127 return true;
128 }
129 }
130
131 false
132 }
133
134 pub async fn execute_rotation_check(&self) -> Result<Vec<String>> {
136 let mut rotated_files = Vec::new();
137
138 let files_to_rotate: Vec<(String, PathBuf)> = {
140 let active_files = self.active_files.read();
141 active_files
142 .iter()
143 .filter(|(writer_id, _)| self.check_rotation_policy(writer_id))
144 .map(|(writer_id, info)| (writer_id.clone(), info.path.clone()))
145 .collect()
146 };
147
148 log::info!("Rotation needed for {} files", files_to_rotate.len());
149
150 for (writer_id, file_path) in files_to_rotate {
152 match self.execute_single_rotation(&writer_id, &file_path).await {
153 Ok(new_path) => {
154 rotated_files.push(new_path);
155 log::info!("Rotated file: {} -> {}", file_path.display(), writer_id);
156 }
157 Err(e) => {
158 log::error!("Failed to rotate file {}: {}", file_path.display(), e);
159 }
160 }
161 }
162
163 Ok(rotated_files)
164 }
165
166 async fn execute_single_rotation(&self, writer_id: &str, file_path: &Path) -> Result<String> {
168 let filename = file_path
170 .file_name()
171 .and_then(|n| n.to_str())
172 .ok_or_else(|| StorageError::Path("Invalid filename".to_string()))?;
173
174 let naming = FileNaming::parse_filename(filename)
175 .ok_or_else(|| StorageError::Path("Cannot parse filename".to_string()))?;
176
177 let current_date = SimpleDate::from_system_time(SystemTime::now());
179 let new_naming = FileNaming::new(
180 naming.exchange.clone(),
181 naming.symbol.clone(),
182 naming.data_type.clone(),
183 current_date,
184 );
185
186 let new_filename = new_naming.filename();
187 let new_path = file_path.parent().unwrap().join(&new_filename);
188
189 let final_path = self.resolve_filename_conflict(&new_path).await?;
191
192 fs::rename(file_path, &final_path).await?;
194
195 {
197 let mut active_files = self.active_files.write();
198 active_files.remove(writer_id);
199 }
200
201 self.log_file_rotation(&final_path, &naming).await?;
203
204 Ok(final_path.to_string_lossy().to_string())
205 }
206
207 async fn resolve_filename_conflict(&self, base_path: &Path) -> Result<PathBuf> {
209 if !base_path.exists() {
210 return Ok(base_path.to_path_buf());
211 }
212
213 let base_name = base_path
214 .file_stem()
215 .and_then(|n| n.to_str())
216 .ok_or_else(|| StorageError::Path("Invalid base filename".to_string()))?;
217
218 let extension = base_path.extension().and_then(|e| e.to_str()).unwrap_or("");
219
220 let parent = base_path
221 .parent()
222 .ok_or_else(|| StorageError::Path("No parent directory".to_string()))?;
223
224 for sequence in 1..1000 {
225 let new_name = if extension.is_empty() {
226 format!("{base_name}_{sequence:03}")
227 } else {
228 format!("{base_name}_{sequence:03}.{extension}")
229 };
230
231 let new_path = parent.join(new_name);
232 if !new_path.exists() {
233 return Ok(new_path);
234 }
235 }
236
237 Err(StorageError::FileRolling(
238 "Cannot find unique filename after 999 attempts".to_string(),
239 ))
240 }
241
242 async fn log_file_rotation(&self, file_path: &Path, naming: &FileNaming) -> Result<()> {
244 let metadata = fs::metadata(file_path).await?;
245 let file_size_bytes = metadata.len();
246
247 log::info!(
248 "File rotation completed: {} (size: {} bytes)",
249 naming.filename(),
250 file_size_bytes
251 );
252
253 Ok(())
254 }
255
256 pub async fn cleanup_old_files(&self) -> Result<u64> {
258 let cutoff_date = TimingUtils::days_ago(self.retention_days);
259
260 let deleted_count = self.cleanup_directory(&self.base_path, cutoff_date).await?;
261
262 if deleted_count > 0 {
263 log::info!("Cleaned up {deleted_count} old files");
264 }
265
266 Ok(deleted_count)
267 }
268
269 async fn cleanup_directory(&self, directory: &Path, cutoff_date: SimpleDate) -> Result<u64> {
271 if !directory.exists() {
272 return Ok(0);
273 }
274
275 let mut deleted_count = 0;
276
277 for entry in WalkDir::new(directory)
278 .into_iter()
279 .filter_map(|e| e.ok())
280 .filter(|e| e.file_type().is_file())
281 {
282 if let Some(filename_str) = entry.file_name().to_str()
283 && let Some(naming) = FileNaming::parse_filename(filename_str)
284 && naming.date < cutoff_date
285 {
286 match fs::remove_file(entry.path()).await {
287 Ok(()) => {
288 deleted_count += 1;
289 log::debug!("Deleted old file: {}", entry.path().display());
290 }
291 Err(e) => {
292 log::warn!(
293 "Failed to delete old file {}: {}",
294 entry.path().display(),
295 e
296 );
297 }
298 }
299 }
300 }
301
302 Ok(deleted_count)
303 }
304
305 pub fn get_active_files_statistics(&self) -> FxHashMap<String, ActiveFileInfo> {
307 let active_files = self.active_files.read();
308 active_files.clone()
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315 use tempfile::tempdir;
316 use tokio::fs;
317
318 #[tokio::test]
319 async fn test_file_roller_creation() {
320 let roller = FileRoller::new(PathBuf::from("/tmp"), 1, 100, 30);
321
322 let stats = roller.get_active_files_statistics();
323 assert!(stats.is_empty());
324 }
325
326 #[tokio::test]
327 async fn test_register_and_update_file() {
328 let roller = FileRoller::new(PathBuf::from("/tmp"), 1, 100, 30);
329
330 let temp_dir = tempdir().unwrap();
331 let file_path = temp_dir.path().join("test_file.dat");
332 fs::write(&file_path, "test data").await.unwrap();
333
334 let writer_id = "test:BTCUSDT:trades".to_string();
336 roller.register_active_file(writer_id.clone(), file_path);
337
338 roller.update_file_statistics(&writer_id, 100, 1);
340
341 let stats = roller.get_active_files_statistics();
342 assert_eq!(stats.len(), 1);
343 assert!(stats.contains_key(&writer_id));
344
345 let file_info = &stats[&writer_id];
346 assert_eq!(file_info.size_bytes(), 100);
347 assert_eq!(file_info.record_count(), 1);
348 }
349
350 #[tokio::test]
351 async fn test_rotation_policy_checks() {
352 let temp_dir = tempdir().unwrap();
353 let roller = FileRoller::new(temp_dir.path().to_path_buf(), 1, 100, 30);
354 let writer_id = "test_writer".to_string();
355 let file_path = temp_dir.path().join("test_file.dat");
356 fs::write(&file_path, "initial data").await.unwrap();
357
358 roller.register_active_file(writer_id.clone(), file_path);
359
360 assert!(!roller.check_rotation_policy(&writer_id));
362
363 roller.update_file_statistics(&writer_id, 2 * 1024 * 1024, 10); assert!(roller.check_rotation_policy(&writer_id));
366
367 let roller2 = FileRoller::new(temp_dir.path().to_path_buf(), 10, 50, 30);
369 let file_path2 = temp_dir.path().join("test_file2.dat");
370 fs::write(&file_path2, "initial data").await.unwrap();
371 roller2.register_active_file(writer_id.clone(), file_path2);
372 roller2.update_file_statistics(&writer_id, 1024, 100); assert!(roller2.check_rotation_policy(&writer_id));
374 }
375
376 #[tokio::test]
377 async fn test_filename_conflict_resolution() {
378 let temp_dir = tempdir().unwrap();
379 let roller = FileRoller::new(temp_dir.path().to_path_buf(), 1, 100, 30);
380
381 let base_path = temp_dir.path().join("test.parquet");
382
383 fs::write(&base_path, "existing file").await.unwrap();
385
386 let unique_path = roller.resolve_filename_conflict(&base_path).await.unwrap();
388 assert!(unique_path.to_string_lossy().contains("_001"));
389 assert!(!unique_path.exists());
390 }
391}