rusty_ems/factory/
config.rs1use std::env;
2use std::fs::File;
3use std::io::Read;
4use std::path::Path;
5use std::time::Duration;
6
7use anyhow::{Context, Result, anyhow};
8use rusty_common::collections::SmallVec;
9use rusty_model::venues::Venue;
10use serde::{Deserialize, Serialize};
11
12use crate::protocol::{ExchangeConfig, RateLimit, ReconnectSettings};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct EMSConfig {
17 pub exchanges: SmallVec<[ExchangeConfig; 8]>,
19
20 pub global: GlobalSettings,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct GlobalSettings {
27 pub execution_threads: usize,
29
30 pub max_queue_size: usize,
32
33 pub default_timeout_ms: u64,
35
36 pub log_api_calls: bool,
38}
39
40impl Default for GlobalSettings {
41 fn default() -> Self {
42 Self {
43 execution_threads: 10,
44 max_queue_size: 1000,
45 default_timeout_ms: 5000,
46 log_api_calls: false,
47 }
48 }
49}
50
51pub struct ConfigLoader;
53
54impl ConfigLoader {
55 pub fn from_env() -> Result<EMSConfig> {
57 let mut exchanges = SmallVec::new();
58
59 if let Ok(api_key) = env::var("BINANCE_API_KEY") {
61 let api_secret = env::var("BINANCE_API_SECRET")
62 .context("BINANCE_API_SECRET must be set if BINANCE_API_KEY is set")?;
63
64 let rest_url =
65 env::var("BINANCE_REST_URL").unwrap_or_else(|_| "https://api.binance.com".into());
66
67 let ws_url = env::var("BINANCE_WS_URL")
68 .unwrap_or_else(|_| "wss://stream.binance.com:9443/ws".into());
69
70 let timeout_ms = env::var("BINANCE_TIMEOUT_MS")
71 .unwrap_or_else(|_| "10000".into())
72 .parse::<u64>()
73 .context("BINANCE_TIMEOUT_MS must be a valid integer")?;
74
75 let config = ExchangeConfig {
76 venue: Venue::Binance,
77 rest_url: rest_url.into(),
78 ws_url: ws_url.into(),
79 api_key: api_key.into(),
80 api_secret: api_secret.into(),
81 timeout: Duration::from_millis(timeout_ms),
82 rate_limits: Self::binance_rate_limits(),
83 reconnect_settings: ReconnectSettings::default(),
84 };
85
86 exchanges.push(config);
87 }
88
89 if let Ok(api_key) = env::var("BYBIT_API_KEY") {
91 let api_secret = env::var("BYBIT_API_SECRET")
92 .context("BYBIT_API_SECRET must be set if BYBIT_API_KEY is set")?;
93
94 let rest_url =
95 env::var("BYBIT_REST_URL").unwrap_or_else(|_| "https://api.bybit.com".into());
96
97 let ws_url = env::var("BYBIT_WS_URL")
98 .unwrap_or_else(|_| "wss://stream.bybit.com/v5/public".into());
99
100 let timeout_ms = env::var("BYBIT_TIMEOUT_MS")
101 .unwrap_or_else(|_| "10000".into())
102 .parse::<u64>()
103 .context("BYBIT_TIMEOUT_MS must be a valid integer")?;
104
105 let config = ExchangeConfig {
106 venue: Venue::Bybit,
107 rest_url: rest_url.into(),
108 ws_url: ws_url.into(),
109 api_key: api_key.into(),
110 api_secret: api_secret.into(),
111 timeout: Duration::from_millis(timeout_ms),
112 rate_limits: Self::bybit_rate_limits(),
113 reconnect_settings: ReconnectSettings::default(),
114 };
115
116 exchanges.push(config);
117 }
118
119 let execution_threads = env::var("EMS_EXECUTION_THREADS")
121 .unwrap_or_else(|_| "10".into())
122 .parse::<usize>()
123 .context("EMS_EXECUTION_THREADS must be a valid integer")?;
124
125 let max_queue_size = env::var("EMS_MAX_QUEUE_SIZE")
126 .unwrap_or_else(|_| "1000".into())
127 .parse::<usize>()
128 .context("EMS_MAX_QUEUE_SIZE must be a valid integer")?;
129
130 let default_timeout_ms = env::var("EMS_DEFAULT_TIMEOUT_MS")
131 .unwrap_or_else(|_| "5000".into())
132 .parse::<u64>()
133 .context("EMS_DEFAULT_TIMEOUT_MS must be a valid integer")?;
134
135 let log_api_calls = env::var("EMS_LOG_API_CALLS")
136 .unwrap_or_else(|_| "false".into())
137 .parse::<bool>()
138 .context("EMS_LOG_API_CALLS must be a valid boolean")?;
139
140 let global = GlobalSettings {
141 execution_threads,
142 max_queue_size,
143 default_timeout_ms,
144 log_api_calls,
145 };
146
147 if exchanges.is_empty() {
148 return Err(anyhow!(
149 "No exchange configurations found in environment variables"
150 ));
151 }
152
153 Ok(EMSConfig { exchanges, global })
154 }
155
156 pub fn from_file<P: AsRef<Path>>(path: P) -> Result<EMSConfig> {
158 let mut file = File::open(path)?;
159 let mut contents = String::new();
160 file.read_to_string(&mut contents)?;
161
162 let mut contents_bytes = contents.into_bytes();
163 let config: EMSConfig = simd_json::from_slice(&mut contents_bytes)?;
164
165 if config.exchanges.is_empty() {
166 return Err(anyhow!("No exchange configurations found in config file"));
167 }
168
169 Ok(config)
170 }
171
172 fn binance_rate_limits() -> SmallVec<[RateLimit; 8]> {
174 let mut limits = SmallVec::new();
175
176 limits.push(RateLimit {
178 category: "order".into(),
179 max_requests: 50,
180 window_ms: 10000, is_weight_based: false,
182 });
183
184 limits.push(RateLimit {
186 category: "market_data".into(),
187 max_requests: 1200,
188 window_ms: 60000, is_weight_based: true,
190 });
191
192 limits.push(RateLimit {
194 category: "account_data".into(),
195 max_requests: 100,
196 window_ms: 60000, is_weight_based: true,
198 });
199
200 limits
201 }
202
203 fn bybit_rate_limits() -> SmallVec<[RateLimit; 8]> {
205 let mut limits = SmallVec::new();
206
207 limits.push(RateLimit {
209 category: "order".into(),
210 max_requests: 120,
211 window_ms: 60000, is_weight_based: false,
213 });
214
215 limits.push(RateLimit {
217 category: "market_data".into(),
218 max_requests: 120,
219 window_ms: 60000, is_weight_based: false,
221 });
222
223 limits
224 }
225}