rusty_bin/monitor/storage/
manager.rs

1//! Storage manager that coordinates file operations
2
3use crate::monitor::storage::naming::SimpleDate;
4use rusty_common::collections::FxHashMap;
5use std::sync::Arc;
6use std::time::SystemTime;
7use tokio::fs;
8use tokio::sync::RwLock;
9
10use crate::monitor::{
11    config::storage::StorageConfig,
12    schema::{OrderBookRecord, TradeRecord},
13    storage::{
14        FileCompressor, FileNaming, FileWriter,
15        error::Result,
16        roller::FileRoller,
17        types::{FileInfo, StorageStats},
18    },
19};
20
21/// Storage manager that coordinates file operations
22#[derive(Debug)]
23pub struct StorageManager {
24    /// Storage configuration settings
25    pub config: StorageConfig,
26    writers: Arc<RwLock<FxHashMap<String, Arc<FileWriter>>>>,
27    compressor: Arc<FileCompressor>,
28    stats: Arc<RwLock<StorageStats>>,
29    file_roller: Option<Arc<FileRoller>>,
30}
31
32impl StorageManager {
33    /// Create a new storage manager
34    pub async fn new(config: StorageConfig) -> Result<Self> {
35        // Ensure directory exists
36        fs::create_dir_all(&config.market_data_path).await?;
37
38        let compressor = Arc::new(FileCompressor::new(config.clone()));
39
40        // Create FileRoller for file rotation management
41        let file_roller = Some(Arc::new(FileRoller::new(
42            config.market_data_path.clone(),
43            config.max_file_size_mb,
44            config.max_records_per_file,
45            config.retention_days,
46        )));
47
48        Ok(Self {
49            config,
50            writers: Arc::new(RwLock::new(FxHashMap::default())),
51            compressor,
52            stats: Arc::new(RwLock::new(StorageStats::default())),
53            file_roller,
54        })
55    }
56
57    /// Get FileRoller instance
58    pub fn get_file_roller(&self) -> Option<Arc<FileRoller>> {
59        self.file_roller.clone()
60    }
61
62    /// Get or create a file writer for the given parameters
63    pub async fn get_writer(
64        &self,
65        exchange: &str,
66        symbol: &str,
67        data_type: &str,
68    ) -> Result<Arc<FileWriter>> {
69        let key = format!("{exchange}:{symbol}:{data_type}");
70
71        // Check if writer already exists
72        {
73            let writers = self.writers.read().await;
74            if let Some(writer) = writers.get(&key) {
75                return Ok(writer.clone());
76            }
77        }
78
79        // Create new writer
80        let date = SimpleDate::from_system_time(SystemTime::now());
81        let naming = FileNaming::new(exchange, symbol, data_type, date);
82        let file_path = self.config.market_data_path.join(naming.filename());
83
84        let writer = Arc::new(FileWriter::new(file_path.clone(), self.config.clone()).await?);
85
86        // Register with FileRoller for rotation management
87        if let Some(file_roller) = &self.file_roller {
88            file_roller.register_active_file(key.clone(), file_path);
89        }
90
91        // Store writer
92        {
93            let mut writers = self.writers.write().await;
94            writers.insert(key, writer.clone());
95        }
96
97        // Update stats
98        {
99            let mut stats = self.stats.write().await;
100            stats.files_written += 1;
101        }
102
103        Ok(writer)
104    }
105
106    /// Write trade data
107    pub async fn write_trade(&self, trade: &TradeRecord) -> Result<()> {
108        let writer = self
109            .get_writer(&trade.exchange, &trade.symbol, "trades")
110            .await?;
111
112        // Get stats before writing
113        let stats_before = writer.get_stats().await;
114
115        // Write the trade
116        writer.write_trade(trade).await?;
117
118        // Get stats after writing to calculate bytes written
119        let stats_after = writer.get_stats().await;
120        let bytes_written = stats_after
121            .bytes_written
122            .saturating_sub(stats_before.bytes_written);
123
124        // Update FileRoller statistics
125        if let Some(file_roller) = &self.file_roller {
126            let writer_id = format!("{}:{}:trades", trade.exchange, trade.symbol);
127            file_roller.update_file_statistics(&writer_id, bytes_written, 1);
128        }
129
130        // Update storage stats
131        {
132            let mut stats = self.stats.write().await;
133            stats.records_written += 1;
134            stats.bytes_written += bytes_written;
135        }
136
137        Ok(())
138    }
139
140    /// Write orderbook data
141    pub async fn write_orderbook(&self, orderbook: &OrderBookRecord) -> Result<()> {
142        let writer = self
143            .get_writer(&orderbook.exchange, &orderbook.symbol, "orderbook")
144            .await?;
145
146        // Get stats before writing
147        let stats_before = writer.get_stats().await;
148
149        // Write the orderbook
150        writer.write_orderbook(orderbook).await?;
151
152        // Get stats after writing to calculate bytes written
153        let stats_after = writer.get_stats().await;
154        let bytes_written = stats_after
155            .bytes_written
156            .saturating_sub(stats_before.bytes_written);
157
158        // Update FileRoller statistics
159        if let Some(file_roller) = &self.file_roller {
160            let writer_id = format!("{}:{}:orderbook", orderbook.exchange, orderbook.symbol);
161            file_roller.update_file_statistics(&writer_id, bytes_written, 1);
162        }
163
164        // Update storage stats
165        {
166            let mut stats = self.stats.write().await;
167            stats.records_written += 1;
168            stats.bytes_written += bytes_written;
169        }
170
171        Ok(())
172    }
173
174    /// Get storage statistics
175    pub async fn get_stats(&self) -> StorageStats {
176        self.stats.read().await.clone()
177    }
178
179    /// Get file rotation statistics
180    pub fn get_file_rotation_stats(
181        &self,
182    ) -> Option<FxHashMap<String, crate::monitor::storage::roller::ActiveFileInfo>> {
183        self.file_roller.as_ref().map(|roller| {
184            let stats = roller.get_active_files_statistics();
185            let mut result = FxHashMap::default();
186            for (key, info) in stats {
187                result.insert(key, info);
188            }
189            result
190        })
191    }
192
193    /// List files for a given date range
194    pub async fn list_files(
195        &self,
196        exchange: Option<&str>,
197        symbol: Option<&str>,
198        data_type: Option<&str>,
199        start_date: Option<SimpleDate>,
200        end_date: Option<SimpleDate>,
201    ) -> Result<Vec<FileInfo>> {
202        let mut files = Vec::new();
203
204        // Read raw data directory
205        let mut entries = fs::read_dir(&self.config.market_data_path).await?;
206        while let Some(entry) = entries.next_entry().await? {
207            if let Some(filename) = entry.file_name().to_str()
208                && let Some(naming) = FileNaming::parse_filename(filename)
209            {
210                // Apply filters
211                if let Some(ex) = exchange
212                    && naming.exchange != ex
213                {
214                    continue;
215                }
216                if let Some(sym) = symbol
217                    && naming.symbol != sym
218                {
219                    continue;
220                }
221                if let Some(dt) = data_type
222                    && naming.data_type != dt
223                {
224                    continue;
225                }
226                if let Some(start) = start_date
227                    && naming.date < start
228                {
229                    continue;
230                }
231                if let Some(end) = end_date
232                    && naming.date > end
233                {
234                    continue;
235                }
236
237                // Create file info
238                let metadata = entry.metadata().await?;
239                let file_info = FileInfo {
240                    filename: filename.to_string(),
241                    exchange: naming.exchange.to_string(),
242                    symbol: naming.symbol.to_string(),
243                    data_type: naming.data_type,
244                    date: format!(
245                        "{:04}-{:02}-{:02}",
246                        naming.date.year, naming.date.month, naming.date.day
247                    ),
248                    file_size: metadata.len(),
249                    compressed: filename.ends_with(".zst"),
250                };
251
252                files.push(file_info);
253            }
254        }
255
256        Ok(files)
257    }
258}