rusty_bin/monitor/storage/
manager.rs1use crate::monitor::storage::naming::SimpleDate;
4use rusty_common::collections::FxHashMap;
5use std::sync::Arc;
6use std::time::SystemTime;
7use tokio::fs;
8use tokio::sync::RwLock;
9
10use crate::monitor::{
11 config::storage::StorageConfig,
12 schema::{OrderBookRecord, TradeRecord},
13 storage::{
14 FileCompressor, FileNaming, FileWriter,
15 error::Result,
16 roller::FileRoller,
17 types::{FileInfo, StorageStats},
18 },
19};
20
21#[derive(Debug)]
23pub struct StorageManager {
24 pub config: StorageConfig,
26 writers: Arc<RwLock<FxHashMap<String, Arc<FileWriter>>>>,
27 compressor: Arc<FileCompressor>,
28 stats: Arc<RwLock<StorageStats>>,
29 file_roller: Option<Arc<FileRoller>>,
30}
31
32impl StorageManager {
33 pub async fn new(config: StorageConfig) -> Result<Self> {
35 fs::create_dir_all(&config.market_data_path).await?;
37
38 let compressor = Arc::new(FileCompressor::new(config.clone()));
39
40 let file_roller = Some(Arc::new(FileRoller::new(
42 config.market_data_path.clone(),
43 config.max_file_size_mb,
44 config.max_records_per_file,
45 config.retention_days,
46 )));
47
48 Ok(Self {
49 config,
50 writers: Arc::new(RwLock::new(FxHashMap::default())),
51 compressor,
52 stats: Arc::new(RwLock::new(StorageStats::default())),
53 file_roller,
54 })
55 }
56
57 pub fn get_file_roller(&self) -> Option<Arc<FileRoller>> {
59 self.file_roller.clone()
60 }
61
62 pub async fn get_writer(
64 &self,
65 exchange: &str,
66 symbol: &str,
67 data_type: &str,
68 ) -> Result<Arc<FileWriter>> {
69 let key = format!("{exchange}:{symbol}:{data_type}");
70
71 {
73 let writers = self.writers.read().await;
74 if let Some(writer) = writers.get(&key) {
75 return Ok(writer.clone());
76 }
77 }
78
79 let date = SimpleDate::from_system_time(SystemTime::now());
81 let naming = FileNaming::new(exchange, symbol, data_type, date);
82 let file_path = self.config.market_data_path.join(naming.filename());
83
84 let writer = Arc::new(FileWriter::new(file_path.clone(), self.config.clone()).await?);
85
86 if let Some(file_roller) = &self.file_roller {
88 file_roller.register_active_file(key.clone(), file_path);
89 }
90
91 {
93 let mut writers = self.writers.write().await;
94 writers.insert(key, writer.clone());
95 }
96
97 {
99 let mut stats = self.stats.write().await;
100 stats.files_written += 1;
101 }
102
103 Ok(writer)
104 }
105
106 pub async fn write_trade(&self, trade: &TradeRecord) -> Result<()> {
108 let writer = self
109 .get_writer(&trade.exchange, &trade.symbol, "trades")
110 .await?;
111
112 let stats_before = writer.get_stats().await;
114
115 writer.write_trade(trade).await?;
117
118 let stats_after = writer.get_stats().await;
120 let bytes_written = stats_after
121 .bytes_written
122 .saturating_sub(stats_before.bytes_written);
123
124 if let Some(file_roller) = &self.file_roller {
126 let writer_id = format!("{}:{}:trades", trade.exchange, trade.symbol);
127 file_roller.update_file_statistics(&writer_id, bytes_written, 1);
128 }
129
130 {
132 let mut stats = self.stats.write().await;
133 stats.records_written += 1;
134 stats.bytes_written += bytes_written;
135 }
136
137 Ok(())
138 }
139
140 pub async fn write_orderbook(&self, orderbook: &OrderBookRecord) -> Result<()> {
142 let writer = self
143 .get_writer(&orderbook.exchange, &orderbook.symbol, "orderbook")
144 .await?;
145
146 let stats_before = writer.get_stats().await;
148
149 writer.write_orderbook(orderbook).await?;
151
152 let stats_after = writer.get_stats().await;
154 let bytes_written = stats_after
155 .bytes_written
156 .saturating_sub(stats_before.bytes_written);
157
158 if let Some(file_roller) = &self.file_roller {
160 let writer_id = format!("{}:{}:orderbook", orderbook.exchange, orderbook.symbol);
161 file_roller.update_file_statistics(&writer_id, bytes_written, 1);
162 }
163
164 {
166 let mut stats = self.stats.write().await;
167 stats.records_written += 1;
168 stats.bytes_written += bytes_written;
169 }
170
171 Ok(())
172 }
173
174 pub async fn get_stats(&self) -> StorageStats {
176 self.stats.read().await.clone()
177 }
178
179 pub fn get_file_rotation_stats(
181 &self,
182 ) -> Option<FxHashMap<String, crate::monitor::storage::roller::ActiveFileInfo>> {
183 self.file_roller.as_ref().map(|roller| {
184 let stats = roller.get_active_files_statistics();
185 let mut result = FxHashMap::default();
186 for (key, info) in stats {
187 result.insert(key, info);
188 }
189 result
190 })
191 }
192
193 pub async fn list_files(
195 &self,
196 exchange: Option<&str>,
197 symbol: Option<&str>,
198 data_type: Option<&str>,
199 start_date: Option<SimpleDate>,
200 end_date: Option<SimpleDate>,
201 ) -> Result<Vec<FileInfo>> {
202 let mut files = Vec::new();
203
204 let mut entries = fs::read_dir(&self.config.market_data_path).await?;
206 while let Some(entry) = entries.next_entry().await? {
207 if let Some(filename) = entry.file_name().to_str()
208 && let Some(naming) = FileNaming::parse_filename(filename)
209 {
210 if let Some(ex) = exchange
212 && naming.exchange != ex
213 {
214 continue;
215 }
216 if let Some(sym) = symbol
217 && naming.symbol != sym
218 {
219 continue;
220 }
221 if let Some(dt) = data_type
222 && naming.data_type != dt
223 {
224 continue;
225 }
226 if let Some(start) = start_date
227 && naming.date < start
228 {
229 continue;
230 }
231 if let Some(end) = end_date
232 && naming.date > end
233 {
234 continue;
235 }
236
237 let metadata = entry.metadata().await?;
239 let file_info = FileInfo {
240 filename: filename.to_string(),
241 exchange: naming.exchange.to_string(),
242 symbol: naming.symbol.to_string(),
243 data_type: naming.data_type,
244 date: format!(
245 "{:04}-{:02}-{:02}",
246 naming.date.year, naming.date.month, naming.date.day
247 ),
248 file_size: metadata.len(),
249 compressed: filename.ends_with(".zst"),
250 };
251
252 files.push(file_info);
253 }
254 }
255
256 Ok(files)
257 }
258}