rusty_bin/monitor/config/
mod.rs

1//! Main configuration for the monitor application.
2//!
3//! This module aggregates all configuration structures from sub-modules
4//! and provides a single entry point for loading and validating the configuration.
5
6pub mod development;
7pub mod exchange;
8pub mod general;
9pub mod logger;
10pub mod monitoring;
11pub mod network;
12pub mod storage;
13
14use self::{
15    development::DevelopmentConfig, exchange::ExchangesConfig, general::GeneralConfig,
16    logger::LoggerConfig, monitoring::MonitoringConfig, network::NetworkConfig,
17    storage::StorageConfig,
18};
19use anyhow::{Context, Result};
20use serde::{Deserialize, Serialize};
21use smartstring::alias::String;
22use std::path::{Path, PathBuf};
23
24/// Main configuration structure for the monitor
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct MonitorConfig {
27    /// General application configuration
28    pub general: GeneralConfig,
29    /// Storage and file management configuration
30    pub storage: StorageConfig,
31    /// Exchange-specific configurations
32    pub exchanges: ExchangesConfig,
33    /// Monitoring and metrics configuration
34    pub monitoring: MonitoringConfig,
35    /// Logging configuration
36    pub logging: LoggerConfig,
37    /// Network configuration
38    pub network: NetworkConfig,
39    /// Development and debugging configuration
40    pub development: DevelopmentConfig,
41}
42
43impl MonitorConfig {
44    /// Load configuration from a TOML file
45    pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
46        let content = std::fs::read_to_string(path.as_ref())
47            .with_context(|| format!("Failed to read config file: {:?}", path.as_ref()))?;
48
49        Self::from_toml_str(&content)
50    }
51
52    /// Load configuration from a TOML string
53    pub fn from_toml_str(content: &str) -> Result<Self> {
54        toml::from_str(content).with_context(|| "Failed to parse TOML configuration")
55    }
56
57    /// Load configuration with environment variable overrides
58    pub fn load_with_env() -> Result<Self> {
59        // Try to load from default config file
60        let config_path = std::env::var("MONITOR_CONFIG_PATH")
61            .unwrap_or_else(|_| "config/monitor.toml".to_string());
62
63        let mut config = Self::from_file(&config_path)
64            .with_context(|| format!("Failed to load config from {config_path}"))?;
65
66        // Apply environment variable overrides
67        config.apply_env_overrides()?;
68
69        Ok(config)
70    }
71
72    /// Apply environment variable overrides to the configuration
73    fn apply_env_overrides(&mut self) -> Result<()> {
74        // General settings
75        if let Ok(val) = std::env::var("MONITOR_COLLECT_TRADES") {
76            self.general.collect_trades = val
77                .parse()
78                .with_context(|| "Invalid MONITOR_COLLECT_TRADES value")?;
79        }
80
81        if let Ok(val) = std::env::var("MONITOR_COLLECT_ORDERBOOKS") {
82            self.general.collect_orderbooks = val
83                .parse()
84                .with_context(|| "Invalid MONITOR_COLLECT_ORDERBOOKS value")?;
85        }
86
87        if let Ok(val) = std::env::var("MONITOR_BUFFER_SIZE") {
88            self.general.buffer_size = val
89                .parse()
90                .with_context(|| "Invalid MONITOR_BUFFER_SIZE value")?;
91        }
92
93        // Storage settings
94        if let Ok(val) = std::env::var("MONITOR_BASE_PATH") {
95            self.storage.base_path = PathBuf::from(val);
96        }
97
98        if let Ok(val) = std::env::var("MONITOR_COMPRESSION_LEVEL") {
99            self.storage.compression_level = val
100                .parse()
101                .with_context(|| "Invalid MONITOR_COMPRESSION_LEVEL value")?;
102        }
103
104        // Logging settings
105        if let Ok(val) = std::env::var("MONITOR_LOG_LEVEL") {
106            self.logging.level = val.into();
107        }
108
109        Ok(())
110    }
111
112    /// Validate the configuration
113    pub fn validate(&self) -> Result<()> {
114        // Validate general settings
115        if self.general.buffer_size == 0 {
116            anyhow::bail!("Buffer size must be greater than 0");
117        }
118
119        if self.general.batch_size == 0 {
120            anyhow::bail!("Batch size must be greater than 0");
121        }
122
123        if self.general.batch_size > self.general.buffer_size {
124            anyhow::bail!("Batch size cannot be larger than buffer size");
125        }
126
127        // Validate storage settings
128        if self.storage.compression_level > 22 {
129            anyhow::bail!("Compression level must be between 0 and 22");
130        }
131
132        if self.storage.max_file_size_mb == 0 {
133            anyhow::bail!("Max file size must be greater than 0");
134        }
135
136        // Validate that at least one exchange is enabled
137        let has_enabled_exchange = [
138            &self.exchanges.binance,
139            &self.exchanges.upbit,
140            &self.exchanges.bybit,
141            &self.exchanges.coinbase,
142            &self.exchanges.bithumb,
143        ]
144        .iter()
145        .any(|exchange| exchange.as_ref().is_some_and(|e| e.enabled));
146
147        if !has_enabled_exchange {
148            anyhow::bail!("At least one exchange must be enabled");
149        }
150
151        // Create necessary directories
152        self.create_directories()?;
153
154        Ok(())
155    }
156
157    /// Get exchange configuration by name
158    pub fn get_exchange_config(&self, exchange_name: &str) -> Option<&exchange::ExchangeConfig> {
159        match exchange_name {
160            "binance" => self.exchanges.binance.as_ref(),
161            "upbit" => self.exchanges.upbit.as_ref(),
162            "bybit" => self.exchanges.bybit.as_ref(),
163            "coinbase" => self.exchanges.coinbase.as_ref(),
164            "bithumb" => self.exchanges.bithumb.as_ref(),
165            _ => None,
166        }
167    }
168
169    /// Get a list of all enabled exchanges
170    pub fn get_enabled_exchanges(&self) -> Vec<String> {
171        self.exchanges
172            .enabled_exchanges
173            .iter()
174            .filter(|&name| {
175                self.get_exchange_config(name)
176                    .is_some_and(|config| config.enabled)
177            })
178            .cloned()
179            .collect()
180    }
181
182    /// Create base directories if they don't exist
183    pub fn create_directories(&self) -> Result<()> {
184        std::fs::create_dir_all(&self.storage.base_path)
185            .with_context(|| format!("Failed to create base path {:?}", self.storage.base_path))?;
186        std::fs::create_dir_all(&self.storage.market_data_path).with_context(|| {
187            format!(
188                "Failed to create market data path {:?}",
189                self.storage.market_data_path
190            )
191        })?;
192        if self.development.enable_profiling
193            && let Some(parent) = self.development.profile_output_path.parent()
194        {
195            std::fs::create_dir_all(parent).with_context(|| {
196                format!("Failed to create profiling output directory {parent:?}")
197            })?;
198        }
199        if let Some(parent) = self.logging.file_path.parent() {
200            std::fs::create_dir_all(parent)
201                .with_context(|| format!("Failed to create log directory {parent:?}"))?;
202        }
203        Ok(())
204    }
205}
206
207impl Default for MonitorConfig {
208    fn default() -> Self {
209        Self {
210            general: GeneralConfig {
211                name: "rusty-monitor".into(),
212                version: "0.1.0".into(),
213                collect_trades: true,
214                collect_orderbooks: true,
215                auto_discover_symbols: false,
216                max_concurrent_connections: 100,
217                buffer_size: 1_000_000,
218                batch_size: 10000,
219                flush_interval_ms: 1000,
220            },
221            storage: StorageConfig {
222                base_path: "data".into(),
223                market_data_path: "data/market_data".into(),
224                file_prefix: "market_data".into(),
225                file_extension: "bin".into(),
226                date_format: "%Y%m%d".into(),
227                rotate_at_midnight: true,
228                enable_compression: true,
229                compression_algorithm: "zstd".into(),
230                compression_level: 3,
231                compress_after_hours: 24,
232                delete_raw_after_compression: true,
233                max_file_size_mb: 256,
234                max_batch_size: 10000,
235                max_records_per_file: 1_000_000,
236                retention_days: 7,
237                rotation_check_interval_s: 60,
238            },
239            exchanges: ExchangesConfig {
240                enabled_exchanges: vec!["binance".into(), "upbit".into()],
241                binance: Some(exchange::ExchangeConfig {
242                    enabled: true,
243                    spot_enabled: Some(true),
244                    futures_enabled: Some(true),
245                    max_symbols: 100,
246                    rate_limit_per_second: 1200,
247                    reconnect_delay_ms: 5000,
248                    max_reconnect_attempts: 10,
249                    include_symbols: vec![],
250                    exclude_symbols: vec![],
251                }),
252                upbit: Some(exchange::ExchangeConfig {
253                    enabled: true,
254                    spot_enabled: Some(true),
255                    futures_enabled: Some(false),
256                    max_symbols: 0,
257                    rate_limit_per_second: 1200,
258                    reconnect_delay_ms: 5000,
259                    max_reconnect_attempts: 10,
260                    include_symbols: vec![],
261                    exclude_symbols: vec![],
262                }),
263                bybit: None,
264                coinbase: None,
265                bithumb: None,
266            },
267            monitoring: MonitoringConfig {
268                enable_metrics: true,
269                log_level: "info".into(),
270                log_format: "json".into(),
271                track_latency: true,
272                track_throughput: true,
273                track_memory_usage: true,
274                track_disk_usage: true,
275                max_latency_ms: 500,
276                min_throughput_per_second: 100,
277                max_memory_usage_mb: 1024,
278                max_disk_usage_percent: 90,
279                metrics_interval_seconds: 15,
280                health_check_interval_seconds: 30,
281            },
282            logging: LoggerConfig {
283                level: "info".into(),
284                format: "text".into(),
285                output: "stdout".into(),
286                file_path: "logs/monitor.log".into(),
287                max_file_size_mb: 10,
288                max_files: 5,
289                rotate_daily: true,
290                log_trades: false,
291                log_orderbooks: false,
292                log_connections: true,
293                log_errors: true,
294                log_performance: false,
295            },
296            network: NetworkConfig {
297                connect_timeout_ms: 5000,
298                read_timeout_ms: 30000,
299                write_timeout_ms: 5000,
300                keepalive_interval_ms: 10000,
301                max_retries: 3,
302                retry_delay_ms: 1000,
303                exponential_backoff: true,
304                max_retry_delay_ms: 60000,
305            },
306            development: DevelopmentConfig {
307                debug_mode: false,
308                save_raw_messages: false,
309                validate_schemas: false,
310                enable_profiling: false,
311                profile_output_path: "profiles/".into(),
312                test_mode: false,
313                test_duration_seconds: 60,
314                test_symbols: vec![],
315                test_exchanges: vec![],
316            },
317        }
318    }
319}
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324
325    #[test]
326    fn test_default_config() {
327        let config = MonitorConfig::default();
328        assert_eq!(config.general.name, "rusty-monitor");
329        assert!(config.exchanges.binance.is_some());
330    }
331
332    #[test]
333    fn test_config_validation() {
334        let mut config = MonitorConfig::default();
335        config.general.buffer_size = 0;
336        assert!(config.validate().is_err());
337
338        config = MonitorConfig::default();
339        config.storage.compression_level = 23;
340        assert!(config.validate().is_err());
341    }
342
343    #[test]
344    fn test_get_enabled_exchanges() {
345        let config = MonitorConfig::default();
346        let enabled = config.get_enabled_exchanges();
347        assert_eq!(enabled, vec!["binance", "upbit"]);
348    }
349}