rusty_bin/monitor/config/
mod.rs1pub mod development;
7pub mod exchange;
8pub mod general;
9pub mod logger;
10pub mod monitoring;
11pub mod network;
12pub mod storage;
13
14use self::{
15 development::DevelopmentConfig, exchange::ExchangesConfig, general::GeneralConfig,
16 logger::LoggerConfig, monitoring::MonitoringConfig, network::NetworkConfig,
17 storage::StorageConfig,
18};
19use anyhow::{Context, Result};
20use serde::{Deserialize, Serialize};
21use smartstring::alias::String;
22use std::path::{Path, PathBuf};
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct MonitorConfig {
27 pub general: GeneralConfig,
29 pub storage: StorageConfig,
31 pub exchanges: ExchangesConfig,
33 pub monitoring: MonitoringConfig,
35 pub logging: LoggerConfig,
37 pub network: NetworkConfig,
39 pub development: DevelopmentConfig,
41}
42
43impl MonitorConfig {
44 pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
46 let content = std::fs::read_to_string(path.as_ref())
47 .with_context(|| format!("Failed to read config file: {:?}", path.as_ref()))?;
48
49 Self::from_toml_str(&content)
50 }
51
52 pub fn from_toml_str(content: &str) -> Result<Self> {
54 toml::from_str(content).with_context(|| "Failed to parse TOML configuration")
55 }
56
57 pub fn load_with_env() -> Result<Self> {
59 let config_path = std::env::var("MONITOR_CONFIG_PATH")
61 .unwrap_or_else(|_| "config/monitor.toml".to_string());
62
63 let mut config = Self::from_file(&config_path)
64 .with_context(|| format!("Failed to load config from {config_path}"))?;
65
66 config.apply_env_overrides()?;
68
69 Ok(config)
70 }
71
72 fn apply_env_overrides(&mut self) -> Result<()> {
74 if let Ok(val) = std::env::var("MONITOR_COLLECT_TRADES") {
76 self.general.collect_trades = val
77 .parse()
78 .with_context(|| "Invalid MONITOR_COLLECT_TRADES value")?;
79 }
80
81 if let Ok(val) = std::env::var("MONITOR_COLLECT_ORDERBOOKS") {
82 self.general.collect_orderbooks = val
83 .parse()
84 .with_context(|| "Invalid MONITOR_COLLECT_ORDERBOOKS value")?;
85 }
86
87 if let Ok(val) = std::env::var("MONITOR_BUFFER_SIZE") {
88 self.general.buffer_size = val
89 .parse()
90 .with_context(|| "Invalid MONITOR_BUFFER_SIZE value")?;
91 }
92
93 if let Ok(val) = std::env::var("MONITOR_BASE_PATH") {
95 self.storage.base_path = PathBuf::from(val);
96 }
97
98 if let Ok(val) = std::env::var("MONITOR_COMPRESSION_LEVEL") {
99 self.storage.compression_level = val
100 .parse()
101 .with_context(|| "Invalid MONITOR_COMPRESSION_LEVEL value")?;
102 }
103
104 if let Ok(val) = std::env::var("MONITOR_LOG_LEVEL") {
106 self.logging.level = val.into();
107 }
108
109 Ok(())
110 }
111
112 pub fn validate(&self) -> Result<()> {
114 if self.general.buffer_size == 0 {
116 anyhow::bail!("Buffer size must be greater than 0");
117 }
118
119 if self.general.batch_size == 0 {
120 anyhow::bail!("Batch size must be greater than 0");
121 }
122
123 if self.general.batch_size > self.general.buffer_size {
124 anyhow::bail!("Batch size cannot be larger than buffer size");
125 }
126
127 if self.storage.compression_level > 22 {
129 anyhow::bail!("Compression level must be between 0 and 22");
130 }
131
132 if self.storage.max_file_size_mb == 0 {
133 anyhow::bail!("Max file size must be greater than 0");
134 }
135
136 let has_enabled_exchange = [
138 &self.exchanges.binance,
139 &self.exchanges.upbit,
140 &self.exchanges.bybit,
141 &self.exchanges.coinbase,
142 &self.exchanges.bithumb,
143 ]
144 .iter()
145 .any(|exchange| exchange.as_ref().is_some_and(|e| e.enabled));
146
147 if !has_enabled_exchange {
148 anyhow::bail!("At least one exchange must be enabled");
149 }
150
151 self.create_directories()?;
153
154 Ok(())
155 }
156
157 pub fn get_exchange_config(&self, exchange_name: &str) -> Option<&exchange::ExchangeConfig> {
159 match exchange_name {
160 "binance" => self.exchanges.binance.as_ref(),
161 "upbit" => self.exchanges.upbit.as_ref(),
162 "bybit" => self.exchanges.bybit.as_ref(),
163 "coinbase" => self.exchanges.coinbase.as_ref(),
164 "bithumb" => self.exchanges.bithumb.as_ref(),
165 _ => None,
166 }
167 }
168
169 pub fn get_enabled_exchanges(&self) -> Vec<String> {
171 self.exchanges
172 .enabled_exchanges
173 .iter()
174 .filter(|&name| {
175 self.get_exchange_config(name)
176 .is_some_and(|config| config.enabled)
177 })
178 .cloned()
179 .collect()
180 }
181
182 pub fn create_directories(&self) -> Result<()> {
184 std::fs::create_dir_all(&self.storage.base_path)
185 .with_context(|| format!("Failed to create base path {:?}", self.storage.base_path))?;
186 std::fs::create_dir_all(&self.storage.market_data_path).with_context(|| {
187 format!(
188 "Failed to create market data path {:?}",
189 self.storage.market_data_path
190 )
191 })?;
192 if self.development.enable_profiling
193 && let Some(parent) = self.development.profile_output_path.parent()
194 {
195 std::fs::create_dir_all(parent).with_context(|| {
196 format!("Failed to create profiling output directory {parent:?}")
197 })?;
198 }
199 if let Some(parent) = self.logging.file_path.parent() {
200 std::fs::create_dir_all(parent)
201 .with_context(|| format!("Failed to create log directory {parent:?}"))?;
202 }
203 Ok(())
204 }
205}
206
207impl Default for MonitorConfig {
208 fn default() -> Self {
209 Self {
210 general: GeneralConfig {
211 name: "rusty-monitor".into(),
212 version: "0.1.0".into(),
213 collect_trades: true,
214 collect_orderbooks: true,
215 auto_discover_symbols: false,
216 max_concurrent_connections: 100,
217 buffer_size: 1_000_000,
218 batch_size: 10000,
219 flush_interval_ms: 1000,
220 },
221 storage: StorageConfig {
222 base_path: "data".into(),
223 market_data_path: "data/market_data".into(),
224 file_prefix: "market_data".into(),
225 file_extension: "bin".into(),
226 date_format: "%Y%m%d".into(),
227 rotate_at_midnight: true,
228 enable_compression: true,
229 compression_algorithm: "zstd".into(),
230 compression_level: 3,
231 compress_after_hours: 24,
232 delete_raw_after_compression: true,
233 max_file_size_mb: 256,
234 max_batch_size: 10000,
235 max_records_per_file: 1_000_000,
236 retention_days: 7,
237 rotation_check_interval_s: 60,
238 },
239 exchanges: ExchangesConfig {
240 enabled_exchanges: vec!["binance".into(), "upbit".into()],
241 binance: Some(exchange::ExchangeConfig {
242 enabled: true,
243 spot_enabled: Some(true),
244 futures_enabled: Some(true),
245 max_symbols: 100,
246 rate_limit_per_second: 1200,
247 reconnect_delay_ms: 5000,
248 max_reconnect_attempts: 10,
249 include_symbols: vec![],
250 exclude_symbols: vec![],
251 }),
252 upbit: Some(exchange::ExchangeConfig {
253 enabled: true,
254 spot_enabled: Some(true),
255 futures_enabled: Some(false),
256 max_symbols: 0,
257 rate_limit_per_second: 1200,
258 reconnect_delay_ms: 5000,
259 max_reconnect_attempts: 10,
260 include_symbols: vec![],
261 exclude_symbols: vec![],
262 }),
263 bybit: None,
264 coinbase: None,
265 bithumb: None,
266 },
267 monitoring: MonitoringConfig {
268 enable_metrics: true,
269 log_level: "info".into(),
270 log_format: "json".into(),
271 track_latency: true,
272 track_throughput: true,
273 track_memory_usage: true,
274 track_disk_usage: true,
275 max_latency_ms: 500,
276 min_throughput_per_second: 100,
277 max_memory_usage_mb: 1024,
278 max_disk_usage_percent: 90,
279 metrics_interval_seconds: 15,
280 health_check_interval_seconds: 30,
281 },
282 logging: LoggerConfig {
283 level: "info".into(),
284 format: "text".into(),
285 output: "stdout".into(),
286 file_path: "logs/monitor.log".into(),
287 max_file_size_mb: 10,
288 max_files: 5,
289 rotate_daily: true,
290 log_trades: false,
291 log_orderbooks: false,
292 log_connections: true,
293 log_errors: true,
294 log_performance: false,
295 },
296 network: NetworkConfig {
297 connect_timeout_ms: 5000,
298 read_timeout_ms: 30000,
299 write_timeout_ms: 5000,
300 keepalive_interval_ms: 10000,
301 max_retries: 3,
302 retry_delay_ms: 1000,
303 exponential_backoff: true,
304 max_retry_delay_ms: 60000,
305 },
306 development: DevelopmentConfig {
307 debug_mode: false,
308 save_raw_messages: false,
309 validate_schemas: false,
310 enable_profiling: false,
311 profile_output_path: "profiles/".into(),
312 test_mode: false,
313 test_duration_seconds: 60,
314 test_symbols: vec![],
315 test_exchanges: vec![],
316 },
317 }
318 }
319}
320
321#[cfg(test)]
322mod tests {
323 use super::*;
324
325 #[test]
326 fn test_default_config() {
327 let config = MonitorConfig::default();
328 assert_eq!(config.general.name, "rusty-monitor");
329 assert!(config.exchanges.binance.is_some());
330 }
331
332 #[test]
333 fn test_config_validation() {
334 let mut config = MonitorConfig::default();
335 config.general.buffer_size = 0;
336 assert!(config.validate().is_err());
337
338 config = MonitorConfig::default();
339 config.storage.compression_level = 23;
340 assert!(config.validate().is_err());
341 }
342
343 #[test]
344 fn test_get_enabled_exchanges() {
345 let config = MonitorConfig::default();
346 let enabled = config.get_enabled_exchanges();
347 assert_eq!(enabled, vec!["binance", "upbit"]);
348 }
349}