rusty_oms/
execution_engine.rs

1//! Execution engine module for order processing and exchange interaction
2//!
3//! This module handles the coordination between risk validation and
4//! exchange communication, emitting events for order lifecycle updates.
5
6use crate::risk_manager::{RiskError, RiskManager};
7// OrderBook import removed - not used
8use async_trait::async_trait;
9use rusty_common::SmartString;
10use rusty_model::trading_order::Order;
11use tokio::sync::mpsc;
12
13/// Events emitted during order execution lifecycle
14#[derive(Debug, Clone)]
15pub enum ExecutionEvent {
16    /// Order was accepted by the exchange
17    OrderAccepted(Order),
18    /// Order was rejected with a reason
19    OrderRejected(Order, SmartString),
20    /// Order was completely filled
21    OrderFilled(Order),
22    /// Order was partially filled with the filled quantity
23    PartialFill(Order, u64),
24}
25
26/// Trait for exchange implementations
27#[async_trait]
28pub trait Exchange: Send + Sync {
29    /// Send an order to the exchange
30    async fn send_order(&self, order: Order) -> crate::Result<()>;
31    /// Cancel an existing order
32    async fn cancel_order(&self, order_id: std::string::String) -> crate::Result<()>;
33    /// Get the current status of an order
34    async fn get_order_status(
35        &self,
36        order_id: std::string::String,
37    ) -> crate::Result<std::string::String>;
38}
39
40/// Engine that processes orders through risk checks and exchange submission
41pub struct ExecutionEngine<E: Exchange> {
42    /// Risk manager for pre-trade validation
43    risk_manager: RiskManager,
44    /// Channel sender for execution events
45    event_sender: mpsc::Sender<ExecutionEvent>,
46    /// Exchange implementation
47    exchange: E,
48}
49
50impl<E: Exchange> ExecutionEngine<E> {
51    /// Create a new execution engine
52    #[must_use]
53    pub const fn new(
54        risk_manager: RiskManager,
55        event_sender: mpsc::Sender<ExecutionEvent>,
56        exchange: E,
57    ) -> Self {
58        Self {
59            risk_manager,
60            event_sender,
61            exchange,
62        }
63    }
64
65    /// Process an order through risk validation and exchange submission
66    ///
67    /// # Errors
68    ///
69    /// Returns `RiskError` if the order fails risk validation.
70    /// Exchange errors are handled by emitting rejection events.
71    pub async fn process_order(&mut self, order: Order) -> Result<(), RiskError> {
72        // First, validate the order through risk management
73        if let Err(risk_error) = self.risk_manager.validate_order(&order) {
74            // Emit OrderRejected event before returning the error
75            self.event_sender
76                .send(ExecutionEvent::OrderRejected(
77                    order,
78                    SmartString::from(risk_error.to_string()),
79                ))
80                .await
81                .expect("Failed to send order rejected event");
82            return Err(risk_error);
83        }
84
85        // Send order to exchange
86        match self.exchange.send_order(order.clone()).await {
87            Ok(()) => {
88                // Send order accepted event
89                self.event_sender
90                    .send(ExecutionEvent::OrderAccepted(order))
91                    .await
92                    .expect("Failed to send order accepted event");
93                Ok(())
94            }
95            Err(e) => {
96                self.event_sender
97                    .send(ExecutionEvent::OrderRejected(
98                        order,
99                        SmartString::from(e.to_string()),
100                    ))
101                    .await
102                    .expect("Failed to send order rejected event");
103                Ok(())
104            }
105        }
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112    use crate::risk_manager::RiskManager;
113    use rust_decimal_macros::dec;
114    use rusty_model::{
115        enums::{OrderSide, OrderType},
116        venues::Venue,
117    };
118    use std::sync::Arc;
119    use tokio::sync::Mutex;
120
121    // Mock Exchange implementation for testing
122    struct MockExchange {
123        // Track calls to send_order
124        send_order_calls: Arc<Mutex<Vec<Order>>>,
125        // Control whether send_order succeeds or fails
126        should_fail: bool,
127        // Error message to return when should_fail is true
128        error_message: String,
129    }
130
131    impl MockExchange {
132        fn new(should_fail: bool, error_message: String) -> Self {
133            Self {
134                send_order_calls: Arc::new(Mutex::new(Vec::new())),
135                should_fail,
136                error_message,
137            }
138        }
139
140        #[allow(dead_code)]
141        async fn get_send_order_calls(&self) -> Vec<Order> {
142            let calls = self.send_order_calls.lock().await;
143            calls.clone()
144        }
145    }
146
147    #[async_trait]
148    impl Exchange for MockExchange {
149        async fn send_order(&self, order: Order) -> crate::Result<()> {
150            // Record the call
151            let mut calls = self.send_order_calls.lock().await;
152            calls.push(order);
153
154            if self.should_fail {
155                Err(crate::OmsError::Exchange(self.error_message.clone().into()))
156            } else {
157                Ok(())
158            }
159        }
160
161        async fn cancel_order(&self, _order_id: String) -> crate::Result<()> {
162            // Not used in these tests
163            Ok(())
164        }
165
166        async fn get_order_status(&self, _order_id: String) -> crate::Result<String> {
167            // Not used in these tests
168            Ok("FILLED".into())
169        }
170    }
171
172    fn create_test_order() -> Order {
173        Order::new(
174            Venue::Test,
175            "BTCUSDT",
176            OrderSide::Buy,
177            OrderType::Limit,
178            dec!(1.0),
179            Some(dec!(50000.0)),
180            rusty_model::types::ClientId::new("test_client"),
181        )
182    }
183
184    #[tokio::test]
185    async fn test_process_order_success() {
186        // Create a mock exchange that succeeds
187        let mock_exchange = MockExchange::new(false, std::string::String::new());
188        let send_order_calls = mock_exchange.send_order_calls.clone();
189
190        // Create a channel for execution events
191        let (event_sender, mut event_receiver) = mpsc::channel(10);
192
193        // Create a risk manager with generous limits
194        let risk_manager = RiskManager::new(dec!(100.0), dec!(1.0), dec!(1000000.0));
195
196        // Create the execution engine
197        let mut execution_engine = ExecutionEngine::new(risk_manager, event_sender, mock_exchange);
198
199        // Process an order
200        let order = create_test_order();
201        let result = execution_engine.process_order(order.clone()).await;
202
203        // Verify the result
204        assert!(result.is_ok());
205
206        // Verify the order was sent to the exchange
207        let calls = send_order_calls.lock().await;
208        assert_eq!(calls.len(), 1);
209        assert_eq!(calls[0].id, order.id);
210
211        // Verify the event was sent
212        if let Ok(event) = event_receiver.try_recv() {
213            match event {
214                ExecutionEvent::OrderAccepted(accepted_order) => {
215                    assert_eq!(accepted_order.id, order.id);
216                }
217                _ => panic!("Expected OrderAccepted event"),
218            }
219        } else {
220            panic!("Expected an event");
221        }
222    }
223
224    #[tokio::test]
225    async fn test_process_order_exchange_failure() {
226        // Create a mock exchange that fails
227        let error_message = "Exchange error".to_string();
228        let mock_exchange = MockExchange::new(true, error_message.clone());
229        let send_order_calls = mock_exchange.send_order_calls.clone();
230
231        // Create a channel for execution events
232        let (event_sender, mut event_receiver) = mpsc::channel(10);
233
234        // Create a risk manager with generous limits
235        let risk_manager = RiskManager::new(dec!(100.0), dec!(1.0), dec!(1000000.0));
236
237        // Create the execution engine
238        let mut execution_engine = ExecutionEngine::new(risk_manager, event_sender, mock_exchange);
239
240        // Process an order
241        let order = create_test_order();
242        let result = execution_engine.process_order(order.clone()).await;
243
244        // Verify the result - should still be Ok since we handle exchange errors
245        assert!(result.is_ok());
246
247        // Verify the order was sent to the exchange
248        let calls = send_order_calls.lock().await;
249        assert_eq!(calls.len(), 1);
250        assert_eq!(calls[0].id, order.id);
251
252        // Verify the event was sent
253        if let Ok(event) = event_receiver.try_recv() {
254            match event {
255                ExecutionEvent::OrderRejected(rejected_order, reason) => {
256                    assert_eq!(rejected_order.id, order.id);
257                    assert!(reason.contains(&error_message));
258                }
259                _ => panic!("Expected OrderRejected event"),
260            }
261        } else {
262            panic!("Expected an event");
263        }
264    }
265
266    #[tokio::test]
267    async fn test_process_order_risk_failure() {
268        // Create a mock exchange
269        let mock_exchange = MockExchange::new(false, std::string::String::new());
270
271        // Create a channel for execution events
272        let (event_sender, mut event_receiver) = mpsc::channel(10);
273
274        // Create a risk manager with strict limits that will reject the order
275        let risk_manager = RiskManager::new(dec!(0.5), dec!(1.0), dec!(1000000.0));
276
277        // Create the execution engine
278        let mut execution_engine = ExecutionEngine::new(risk_manager, event_sender, mock_exchange);
279
280        // Process an order with quantity that exceeds the risk limit
281        let order = create_test_order(); // Quantity is 1.0, which exceeds the limit of 0.5
282        let result = execution_engine.process_order(order.clone()).await;
283
284        // Verify the result - should be an error
285        assert!(result.is_err());
286        match result {
287            Err(RiskError::QuantityTooLarge) => (),
288            _ => panic!("Expected QuantityTooLarge error"),
289        }
290
291        // Verify that an OrderRejected event was emitted
292        if let Ok(event) = event_receiver.try_recv() {
293            match event {
294                ExecutionEvent::OrderRejected(rejected_order, reason) => {
295                    assert_eq!(rejected_order.id, order.id);
296                    assert_eq!(reason, "Order quantity exceeds maximum allowed");
297                }
298                _ => panic!("Expected OrderRejected event"),
299            }
300        } else {
301            panic!("Expected an OrderRejected event");
302        }
303    }
304
305    #[tokio::test]
306    async fn test_process_order_price_risk_failure() {
307        // Create a mock exchange
308        let mock_exchange = MockExchange::new(false, std::string::String::new());
309
310        // Create a channel for execution events
311        let (event_sender, mut event_receiver) = mpsc::channel(10);
312
313        // Create a risk manager with price limits that will reject the order
314        let risk_manager = RiskManager::new(dec!(10.0), dec!(60000.0), dec!(100000.0));
315
316        // Create the execution engine
317        let mut execution_engine = ExecutionEngine::new(risk_manager, event_sender, mock_exchange);
318
319        // Create an order with price below the minimum
320        let order = Order::new(
321            Venue::Test,
322            "BTCUSDT",
323            OrderSide::Buy,
324            OrderType::Limit,
325            dec!(1.0),
326            Some(dec!(50000.0)), // Price is below the minimum of 60000.0
327            rusty_model::types::ClientId::new("test_client"),
328        );
329
330        let result = execution_engine.process_order(order.clone()).await;
331
332        // Verify the result - should be an error
333        assert!(result.is_err());
334        match result {
335            Err(RiskError::PriceOutOfRange) => (),
336            _ => panic!("Expected PriceOutOfRange error"),
337        }
338
339        // Verify that an OrderRejected event was emitted
340        if let Ok(event) = event_receiver.try_recv() {
341            match event {
342                ExecutionEvent::OrderRejected(rejected_order, reason) => {
343                    assert_eq!(rejected_order.id, order.id);
344                    assert_eq!(reason, "Order price outside allowed range");
345                }
346                _ => panic!("Expected OrderRejected event"),
347            }
348        } else {
349            panic!("Expected an OrderRejected event");
350        }
351    }
352}