1use crate::risk_manager::{RiskError, RiskManager};
7use async_trait::async_trait;
9use rusty_common::SmartString;
10use rusty_model::trading_order::Order;
11use tokio::sync::mpsc;
12
13#[derive(Debug, Clone)]
15pub enum ExecutionEvent {
16 OrderAccepted(Order),
18 OrderRejected(Order, SmartString),
20 OrderFilled(Order),
22 PartialFill(Order, u64),
24}
25
26#[async_trait]
28pub trait Exchange: Send + Sync {
29 async fn send_order(&self, order: Order) -> crate::Result<()>;
31 async fn cancel_order(&self, order_id: std::string::String) -> crate::Result<()>;
33 async fn get_order_status(
35 &self,
36 order_id: std::string::String,
37 ) -> crate::Result<std::string::String>;
38}
39
40pub struct ExecutionEngine<E: Exchange> {
42 risk_manager: RiskManager,
44 event_sender: mpsc::Sender<ExecutionEvent>,
46 exchange: E,
48}
49
50impl<E: Exchange> ExecutionEngine<E> {
51 #[must_use]
53 pub const fn new(
54 risk_manager: RiskManager,
55 event_sender: mpsc::Sender<ExecutionEvent>,
56 exchange: E,
57 ) -> Self {
58 Self {
59 risk_manager,
60 event_sender,
61 exchange,
62 }
63 }
64
65 pub async fn process_order(&mut self, order: Order) -> Result<(), RiskError> {
72 if let Err(risk_error) = self.risk_manager.validate_order(&order) {
74 self.event_sender
76 .send(ExecutionEvent::OrderRejected(
77 order,
78 SmartString::from(risk_error.to_string()),
79 ))
80 .await
81 .expect("Failed to send order rejected event");
82 return Err(risk_error);
83 }
84
85 match self.exchange.send_order(order.clone()).await {
87 Ok(()) => {
88 self.event_sender
90 .send(ExecutionEvent::OrderAccepted(order))
91 .await
92 .expect("Failed to send order accepted event");
93 Ok(())
94 }
95 Err(e) => {
96 self.event_sender
97 .send(ExecutionEvent::OrderRejected(
98 order,
99 SmartString::from(e.to_string()),
100 ))
101 .await
102 .expect("Failed to send order rejected event");
103 Ok(())
104 }
105 }
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use super::*;
112 use crate::risk_manager::RiskManager;
113 use rust_decimal_macros::dec;
114 use rusty_model::{
115 enums::{OrderSide, OrderType},
116 venues::Venue,
117 };
118 use std::sync::Arc;
119 use tokio::sync::Mutex;
120
121 struct MockExchange {
123 send_order_calls: Arc<Mutex<Vec<Order>>>,
125 should_fail: bool,
127 error_message: String,
129 }
130
131 impl MockExchange {
132 fn new(should_fail: bool, error_message: String) -> Self {
133 Self {
134 send_order_calls: Arc::new(Mutex::new(Vec::new())),
135 should_fail,
136 error_message,
137 }
138 }
139
140 #[allow(dead_code)]
141 async fn get_send_order_calls(&self) -> Vec<Order> {
142 let calls = self.send_order_calls.lock().await;
143 calls.clone()
144 }
145 }
146
147 #[async_trait]
148 impl Exchange for MockExchange {
149 async fn send_order(&self, order: Order) -> crate::Result<()> {
150 let mut calls = self.send_order_calls.lock().await;
152 calls.push(order);
153
154 if self.should_fail {
155 Err(crate::OmsError::Exchange(self.error_message.clone().into()))
156 } else {
157 Ok(())
158 }
159 }
160
161 async fn cancel_order(&self, _order_id: String) -> crate::Result<()> {
162 Ok(())
164 }
165
166 async fn get_order_status(&self, _order_id: String) -> crate::Result<String> {
167 Ok("FILLED".into())
169 }
170 }
171
172 fn create_test_order() -> Order {
173 Order::new(
174 Venue::Test,
175 "BTCUSDT",
176 OrderSide::Buy,
177 OrderType::Limit,
178 dec!(1.0),
179 Some(dec!(50000.0)),
180 rusty_model::types::ClientId::new("test_client"),
181 )
182 }
183
184 #[tokio::test]
185 async fn test_process_order_success() {
186 let mock_exchange = MockExchange::new(false, std::string::String::new());
188 let send_order_calls = mock_exchange.send_order_calls.clone();
189
190 let (event_sender, mut event_receiver) = mpsc::channel(10);
192
193 let risk_manager = RiskManager::new(dec!(100.0), dec!(1.0), dec!(1000000.0));
195
196 let mut execution_engine = ExecutionEngine::new(risk_manager, event_sender, mock_exchange);
198
199 let order = create_test_order();
201 let result = execution_engine.process_order(order.clone()).await;
202
203 assert!(result.is_ok());
205
206 let calls = send_order_calls.lock().await;
208 assert_eq!(calls.len(), 1);
209 assert_eq!(calls[0].id, order.id);
210
211 if let Ok(event) = event_receiver.try_recv() {
213 match event {
214 ExecutionEvent::OrderAccepted(accepted_order) => {
215 assert_eq!(accepted_order.id, order.id);
216 }
217 _ => panic!("Expected OrderAccepted event"),
218 }
219 } else {
220 panic!("Expected an event");
221 }
222 }
223
224 #[tokio::test]
225 async fn test_process_order_exchange_failure() {
226 let error_message = "Exchange error".to_string();
228 let mock_exchange = MockExchange::new(true, error_message.clone());
229 let send_order_calls = mock_exchange.send_order_calls.clone();
230
231 let (event_sender, mut event_receiver) = mpsc::channel(10);
233
234 let risk_manager = RiskManager::new(dec!(100.0), dec!(1.0), dec!(1000000.0));
236
237 let mut execution_engine = ExecutionEngine::new(risk_manager, event_sender, mock_exchange);
239
240 let order = create_test_order();
242 let result = execution_engine.process_order(order.clone()).await;
243
244 assert!(result.is_ok());
246
247 let calls = send_order_calls.lock().await;
249 assert_eq!(calls.len(), 1);
250 assert_eq!(calls[0].id, order.id);
251
252 if let Ok(event) = event_receiver.try_recv() {
254 match event {
255 ExecutionEvent::OrderRejected(rejected_order, reason) => {
256 assert_eq!(rejected_order.id, order.id);
257 assert!(reason.contains(&error_message));
258 }
259 _ => panic!("Expected OrderRejected event"),
260 }
261 } else {
262 panic!("Expected an event");
263 }
264 }
265
266 #[tokio::test]
267 async fn test_process_order_risk_failure() {
268 let mock_exchange = MockExchange::new(false, std::string::String::new());
270
271 let (event_sender, mut event_receiver) = mpsc::channel(10);
273
274 let risk_manager = RiskManager::new(dec!(0.5), dec!(1.0), dec!(1000000.0));
276
277 let mut execution_engine = ExecutionEngine::new(risk_manager, event_sender, mock_exchange);
279
280 let order = create_test_order(); let result = execution_engine.process_order(order.clone()).await;
283
284 assert!(result.is_err());
286 match result {
287 Err(RiskError::QuantityTooLarge) => (),
288 _ => panic!("Expected QuantityTooLarge error"),
289 }
290
291 if let Ok(event) = event_receiver.try_recv() {
293 match event {
294 ExecutionEvent::OrderRejected(rejected_order, reason) => {
295 assert_eq!(rejected_order.id, order.id);
296 assert_eq!(reason, "Order quantity exceeds maximum allowed");
297 }
298 _ => panic!("Expected OrderRejected event"),
299 }
300 } else {
301 panic!("Expected an OrderRejected event");
302 }
303 }
304
305 #[tokio::test]
306 async fn test_process_order_price_risk_failure() {
307 let mock_exchange = MockExchange::new(false, std::string::String::new());
309
310 let (event_sender, mut event_receiver) = mpsc::channel(10);
312
313 let risk_manager = RiskManager::new(dec!(10.0), dec!(60000.0), dec!(100000.0));
315
316 let mut execution_engine = ExecutionEngine::new(risk_manager, event_sender, mock_exchange);
318
319 let order = Order::new(
321 Venue::Test,
322 "BTCUSDT",
323 OrderSide::Buy,
324 OrderType::Limit,
325 dec!(1.0),
326 Some(dec!(50000.0)), rusty_model::types::ClientId::new("test_client"),
328 );
329
330 let result = execution_engine.process_order(order.clone()).await;
331
332 assert!(result.is_err());
334 match result {
335 Err(RiskError::PriceOutOfRange) => (),
336 _ => panic!("Expected PriceOutOfRange error"),
337 }
338
339 if let Ok(event) = event_receiver.try_recv() {
341 match event {
342 ExecutionEvent::OrderRejected(rejected_order, reason) => {
343 assert_eq!(rejected_order.id, order.id);
344 assert_eq!(reason, "Order price outside allowed range");
345 }
346 _ => panic!("Expected OrderRejected event"),
347 }
348 } else {
349 panic!("Expected an OrderRejected event");
350 }
351 }
352}