rusty_ems/exchanges/
test_optimized.rs

1use rusty_common::collections::FxHashMap;
2use std::sync::Arc;
3
4use anyhow::Result;
5use async_trait::async_trait;
6use flume::Sender;
7use parking_lot::RwLock;
8use quanta::Clock;
9use rust_decimal::Decimal;
10use rusty_common::SmartString;
11use rusty_common::utils::id_generation;
12use smallvec::SmallVec;
13
14use rusty_model::{
15    enums::{OrderStatus, OrderType},
16    instruments::InstrumentId,
17    trading_order::Order,
18    venues::Venue,
19};
20
21use crate::execution_engine::{Exchange, ExecutionReport};
22
23/// A test exchange implementation for testing and simulation
24/// that implements the optimized Exchange trait
25pub struct TestExchange {
26    /// Active orders managed by this exchange
27    orders: Arc<RwLock<FxHashMap<String, Order>>>,
28
29    /// High-precision clock
30    clock: Clock,
31
32    /// Whether the exchange is connected
33    is_connected: Arc<RwLock<bool>>,
34}
35
36impl Default for TestExchange {
37    fn default() -> Self {
38        Self::new()
39    }
40}
41
42impl TestExchange {
43    /// Creates a new test exchange
44    #[must_use]
45    pub fn new() -> Self {
46        Self {
47            orders: Arc::new(RwLock::new(FxHashMap::default())),
48            clock: Clock::new(),
49            is_connected: Arc::new(RwLock::new(false)),
50        }
51    }
52}
53
54#[async_trait]
55impl Exchange for TestExchange {
56    fn venue(&self) -> Venue {
57        Venue::Test
58    }
59
60    async fn place_order(&self, order: Order, report_tx: Sender<ExecutionReport>) -> Result<()> {
61        // Store the order
62        self.orders
63            .write()
64            .insert(order.id.to_string(), order.clone());
65
66        // Send an acknowledgment report
67        let ack_report = ExecutionReport {
68            id: id_generation::generate_acknowledgment_id(&order.id.to_string()),
69            order_id: order.id.to_string().into(),
70            exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
71            system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
72            instrument_id: InstrumentId::new(order.symbol.clone(), order.venue),
73            status: OrderStatus::New,
74            filled_quantity: Decimal::ZERO,
75            remaining_quantity: order.quantity,
76            execution_price: None,
77            reject_reason: None,
78            exchange_execution_id: None,
79            is_final: false,
80        };
81
82        report_tx.send(ack_report)?;
83
84        // For test purposes, immediately fill market orders
85        if order.order_type == OrderType::Market {
86            // Send a fill report
87            let fill_report = ExecutionReport {
88                id: id_generation::generate_fill_id(&order.id.to_string()),
89                order_id: order.id.to_string().into(),
90                exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
91                system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
92                instrument_id: InstrumentId::new(order.symbol.clone(), order.venue),
93                status: OrderStatus::Filled,
94                filled_quantity: order.quantity,
95                remaining_quantity: Decimal::ZERO,
96                execution_price: order.price,
97                reject_reason: None,
98                exchange_execution_id: Some(id_generation::generate_execution_id(
99                    &order.id.to_string(),
100                )),
101                is_final: true,
102            };
103
104            report_tx.send(fill_report)?;
105
106            // Update order status in storage
107            // We don't currently track status in Order directly
108            // Status is inferred from ExecutionReport flow
109        }
110
111        Ok(())
112    }
113
114    async fn cancel_order(
115        &self,
116        order_id: SmartString,
117        report_tx: Sender<ExecutionReport>,
118    ) -> Result<()> {
119        let order_exists;
120        let mut order_snapshot_for_report: Option<Order> = None;
121
122        {
123            let orders_guard = self.orders.read();
124            if let Some(order) = orders_guard.get(order_id.as_str()) {
125                order_exists = true;
126                order_snapshot_for_report = Some(order.clone());
127            } else {
128                order_exists = false;
129            }
130            // orders_guard is dropped here
131        }
132
133        if !order_exists {
134            let reject_report = ExecutionReport {
135                id: id_generation::generate_rejection_id(&format!("cancel_{order_id}")),
136                order_id,
137                exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
138                system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
139                instrument_id: InstrumentId::new("UNKNOWN", Venue::Test),
140                status: OrderStatus::Rejected,
141                filled_quantity: Decimal::ZERO,
142                remaining_quantity: Decimal::ZERO,
143                execution_price: None,
144                reject_reason: Some("Order not found for cancellation".into()),
145                exchange_execution_id: None,
146                is_final: true,
147            };
148            report_tx.send(reject_report)?;
149            return Ok(());
150        }
151
152        // Process cancellation if order exists
153        self.orders.write().remove(&order_id.to_string()); // Remove the order
154
155        if let Some(order_ref) = order_snapshot_for_report {
156            let cancel_report = ExecutionReport {
157                id: id_generation::generate_cancel_id(&order_id),
158                order_id,
159                exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
160                system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
161                instrument_id: InstrumentId::new(order_ref.symbol.clone(), order_ref.venue),
162                status: OrderStatus::Cancelled,
163                filled_quantity: Decimal::ZERO,
164                remaining_quantity: order_ref.quantity,
165                execution_price: None,
166                reject_reason: None,
167                exchange_execution_id: None,
168                is_final: true,
169            };
170            report_tx.send(cancel_report)?;
171        }
172
173        Ok(())
174    }
175
176    async fn modify_order(
177        &self,
178        order_id: SmartString,
179        new_price: Option<Decimal>,
180        new_quantity: Option<Decimal>,
181        report_tx: Sender<ExecutionReport>,
182    ) -> Result<()> {
183        let mut order_to_modify_and_report: Option<Order> = None;
184        let mut order_found = false;
185
186        {
187            let mut orders_guard = self.orders.write(); // Need write lock to modify
188            if let Some(stored_order) = orders_guard.get_mut(&order_id.to_string()) {
189                if let Some(price) = new_price {
190                    stored_order.price = Some(price);
191                }
192                if let Some(quantity) = new_quantity {
193                    stored_order.quantity = quantity;
194                }
195                order_to_modify_and_report = Some(stored_order.clone());
196                order_found = true;
197            }
198            // orders_guard is dropped here
199        }
200
201        if !order_found {
202            let reject_report = ExecutionReport {
203                id: id_generation::generate_rejection_id(&format!("modify_{order_id}")),
204                order_id,
205                exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
206                system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
207                instrument_id: InstrumentId::new("UNKNOWN", Venue::Test),
208                status: OrderStatus::Rejected,
209                filled_quantity: Decimal::ZERO,
210                remaining_quantity: Decimal::ZERO,
211                execution_price: None,
212                reject_reason: Some("Order not found for modification".into()),
213                exchange_execution_id: None,
214                is_final: true,
215            };
216            report_tx.send(reject_report)?;
217            return Ok(());
218        }
219
220        if let Some(order_snapshot) = order_to_modify_and_report {
221            let ack_report = ExecutionReport {
222                id: id_generation::generate_report_id("modify", &order_id),
223                order_id,
224                exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
225                system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
226                instrument_id: InstrumentId::new(
227                    order_snapshot.symbol.clone(),
228                    order_snapshot.venue,
229                ),
230                status: OrderStatus::Open, // Modified orders are now open
231                filled_quantity: Decimal::ZERO,
232                remaining_quantity: order_snapshot.quantity,
233                execution_price: None,
234                reject_reason: None,
235                exchange_execution_id: None,
236                is_final: false,
237            };
238            report_tx.send(ack_report)?;
239        }
240
241        Ok(())
242    }
243
244    async fn cancel_all_orders(
245        &self,
246        instrument_id_filter: Option<InstrumentId>,
247        report_tx: Sender<ExecutionReport>,
248    ) -> Result<()> {
249        let mut orders_to_cancel_ids = Vec::new();
250        let mut orders_snapshots_for_reporting = Vec::new();
251
252        {
253            let orders_guard = self.orders.read();
254            for (id, order) in orders_guard.iter() {
255                // Filter by instrument_id if provided
256                if instrument_id_filter.as_ref().is_none_or(|filter_id| {
257                    filter_id == &InstrumentId::new(order.symbol.clone(), order.venue)
258                }) {
259                    orders_to_cancel_ids.push(id.clone());
260                    orders_snapshots_for_reporting.push(order.clone());
261                }
262            }
263            // orders_guard is dropped here
264        }
265
266        let mut orders_actually_removed_ids = Vec::new();
267        if !orders_to_cancel_ids.is_empty() {
268            let mut orders_write_guard = self.orders.write();
269            for id_to_remove in &orders_to_cancel_ids {
270                if orders_write_guard.remove(id_to_remove).is_some() {
271                    orders_actually_removed_ids.push(id_to_remove.clone());
272                }
273            }
274            // orders_write_guard is dropped here
275        }
276
277        for order_snapshot in orders_snapshots_for_reporting {
278            // Only send cancel report if it was actually found and intended for cancellation initially
279            // And further, ensure it was part of the batch successfully removed.
280            if orders_to_cancel_ids.contains(&order_snapshot.id.to_string())
281                && orders_actually_removed_ids.contains(&order_snapshot.id.to_string())
282            {
283                let cancel_report = ExecutionReport {
284                    id: id_generation::generate_cancel_id(&format!("all_{}", order_snapshot.id)),
285                    order_id: order_snapshot.id.to_string().into(),
286                    exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
287                    system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
288                    instrument_id: InstrumentId::new(
289                        order_snapshot.symbol.clone(),
290                        order_snapshot.venue,
291                    ),
292                    status: OrderStatus::Cancelled,
293                    filled_quantity: Decimal::ZERO,
294                    remaining_quantity: Decimal::ZERO,
295                    execution_price: None,
296                    reject_reason: None,
297                    exchange_execution_id: None,
298                    is_final: true,
299                };
300                // Handle potential send error
301                if let Err(e) = report_tx.send(cancel_report) {
302                    eprintln!(
303                        "Failed to send cancel_all report for order {}: {}",
304                        order_snapshot.id, e
305                    );
306                }
307            }
308        }
309        Ok(())
310    }
311
312    async fn get_order_status(&self, order_id: &str) -> Result<OrderStatus> {
313        let orders_guard = self.orders.read();
314        if let Some(_order) = orders_guard.get(order_id) {
315            // TestExchange doesn't explicitly store OrderStatus per order in its map.
316            // It sends reports. For a get_order_status, it might need to infer or simulate.
317            // For now, if order exists, assume it's New (if not filled/cancelled by other logic).
318            // This is a simplification for TestExchange.
319            Ok(OrderStatus::New)
320        } else {
321            Ok(OrderStatus::Rejected) // Or an error like NotFound
322        }
323    }
324
325    async fn connect(&self, _report_sender: Sender<ExecutionReport>) -> Result<()> {
326        *self.is_connected.write() = true;
327        Ok(())
328    }
329
330    async fn disconnect(&self) -> Result<()> {
331        *self.is_connected.write() = false;
332        Ok(())
333    }
334
335    async fn is_connected(&self) -> bool {
336        *self.is_connected.read()
337    }
338
339    async fn get_instruments(&self) -> Result<SmallVec<[InstrumentId; 32]>> {
340        let instruments = {
341            let orders_guard = self.orders.read();
342            let mut instruments = SmallVec::new();
343            for order in orders_guard.values() {
344                if !instruments.iter().any(|i: &InstrumentId| {
345                    i == &InstrumentId::new(order.symbol.clone(), order.venue)
346                }) {
347                    instruments.push(InstrumentId::new(order.symbol.clone(), order.venue));
348                }
349            }
350            instruments
351        };
352        Ok(instruments)
353    }
354
355    async fn send_fix_message(&self, _message: Vec<u8>) -> Result<()> {
356        // Test exchange doesn't support FIX protocol
357        anyhow::bail!("FIX protocol not supported in test exchange")
358    }
359
360    async fn receive_fix_message(&self) -> Result<Vec<u8>> {
361        // Test exchange doesn't support FIX protocol
362        anyhow::bail!("FIX protocol not supported in test exchange")
363    }
364}