rusty_ems/factory/
mod.rs

1//! Factory module for creating exchange adapters and execution engines
2//!
3//! This module provides factory functions for creating configured exchange adapters
4//! and execution engines from configuration files. It handles the complexity of
5//! setting up rate limiters, authentication, and other exchange-specific configurations.
6
7/// Configuration structures and loading utilities for the EMS factory
8pub mod config;
9
10use rusty_common::collections::FxHashMap;
11use std::sync::Arc;
12
13use anyhow::{Result, anyhow};
14use flume::Sender;
15use rusty_model::venues::Venue;
16
17use crate::exchanges::TestExchangeOptimized;
18use crate::exchanges::binance::BinanceExchange;
19use crate::execution_engine::{Exchange, ExecutionReport};
20use crate::protocol::{ExchangeConfig, RateLimit};
21use crate::throttle::RateLimiter;
22
23pub use config::{ConfigLoader, EMSConfig, GlobalSettings};
24
25/// Factory for creating exchange adapters
26pub struct ExchangeFactory;
27
28impl ExchangeFactory {
29    /// Creates a new exchange adapter from configuration
30    pub fn create_exchange(config: &ExchangeConfig) -> Result<Arc<dyn Exchange>> {
31        match config.venue {
32            Venue::Binance => {
33                // Create rate limiters for Binance
34                let _rate_limiters = Self::create_rate_limiters(&config.rate_limits);
35
36                // Create the Binance exchange adapter
37                let exchange =
38                    BinanceExchange::new(config.api_key.to_string(), config.api_secret.to_string());
39
40                Ok(Arc::new(exchange))
41            }
42            Venue::Bybit => {
43                // For now, we don't have a Bybit exchange adapter in rusty-ems
44                // We'll implement this later
45                Err(anyhow!("Bybit exchange not yet implemented in rusty-ems"))
46            }
47            Venue::Test => {
48                // Create a test exchange for testing purposes
49                Ok(Arc::new(TestExchangeOptimized::new()))
50            }
51            _ => Err(anyhow!("Unsupported exchange: {:?}", config.venue)),
52        }
53    }
54
55    /// Creates an execution engine with all configured exchanges
56    pub async fn create_execution_engine_with_exchanges(
57        configs: &[ExchangeConfig],
58        execution_tx: Sender<ExecutionReport>,
59    ) -> Result<(
60        crate::execution_engine::ExecutionEngine,
61        Sender<crate::execution_engine::ExecutionEvent>,
62    )> {
63        use crate::execution_engine::ExecutionEngine;
64
65        // Check if there are any exchange configs
66        if configs.is_empty() {
67            return Err(anyhow!("No exchange configurations provided"));
68        }
69
70        // Create channels for the execution engine
71        let (event_tx, event_rx) = flume::unbounded();
72
73        // Create the execution engine
74        let mut engine = ExecutionEngine::new(event_rx, execution_tx);
75
76        // Create and register each exchange
77        for config in configs {
78            let exchange = Self::create_exchange(config)?;
79            engine.register_exchange(exchange);
80        }
81
82        Ok((engine, event_tx))
83    }
84
85    /// Creates rate limiters from configuration
86    fn create_rate_limiters(rate_limits: &[RateLimit]) -> FxHashMap<String, Arc<RateLimiter>> {
87        let mut limiters = FxHashMap::default();
88
89        for limit in rate_limits {
90            let limiter = if limit.is_weight_based {
91                // For weight-based limiters, we use the standard RateLimiter for now
92                // In a real system, we would use the WeightBasedRateLimiter
93                Arc::new(RateLimiter::new(limit.max_requests, limit.window_ms))
94            } else {
95                Arc::new(RateLimiter::new(limit.max_requests, limit.window_ms))
96            };
97
98            limiters.insert(limit.category.to_string(), limiter);
99        }
100
101        limiters
102    }
103}