rusty_ems/exchanges/
test.rs

1use rusty_common::collections::FxHashMap;
2use std::sync::Arc;
3
4use anyhow::Result;
5use async_trait::async_trait;
6use flume::Sender;
7use log;
8use parking_lot::RwLock;
9use quanta::Clock;
10use rust_decimal::Decimal;
11use rusty_common::SmartString;
12use rusty_common::utils::id_generation;
13use smallvec::SmallVec;
14
15use rusty_model::{
16    enums::{OrderStatus, OrderType},
17    instruments::InstrumentId,
18    trading_order::Order,
19    venues::Venue,
20};
21
22use crate::execution_engine::{Exchange, ExecutionReport};
23
24/// A test exchange implementation for testing and simulation
25pub struct TestExchange {
26    /// Active orders managed by this exchange
27    orders: Arc<RwLock<FxHashMap<String, Order>>>,
28
29    /// High-precision clock
30    clock: Clock,
31
32    /// Whether the exchange is connected
33    is_connected: Arc<RwLock<bool>>,
34}
35
36impl Default for TestExchange {
37    fn default() -> Self {
38        Self::new()
39    }
40}
41
42impl TestExchange {
43    /// Creates a new test exchange
44    #[must_use]
45    pub fn new() -> Self {
46        Self {
47            orders: Arc::new(RwLock::new(FxHashMap::default())),
48            clock: Clock::new(),
49            is_connected: Arc::new(RwLock::new(false)),
50        }
51    }
52}
53
54#[async_trait]
55impl Exchange for TestExchange {
56    fn venue(&self) -> Venue {
57        Venue::Test
58    }
59
60    async fn place_order(
61        &self,
62        order: Order,
63        report_sender: Sender<ExecutionReport>,
64    ) -> Result<()> {
65        // Store the order
66        self.orders
67            .write()
68            .insert(order.id.to_string(), order.clone());
69
70        // Send an acknowledgment report
71        let ack_report = ExecutionReport {
72            id: id_generation::generate_acknowledgment_id(&order.id.to_string()),
73            order_id: order.id.to_string().into(),
74            exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
75            system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
76            instrument_id: InstrumentId::new(order.symbol.clone(), order.venue),
77            status: OrderStatus::New,
78            filled_quantity: Decimal::ZERO,
79            remaining_quantity: order.quantity,
80            execution_price: None,
81            reject_reason: None,
82            exchange_execution_id: None,
83            is_final: false,
84        };
85
86        report_sender.send_async(ack_report).await?;
87
88        // For test purposes, immediately fill market orders
89        if order.order_type == OrderType::Market {
90            // Send a fill report
91            let fill_report = ExecutionReport {
92                id: id_generation::generate_fill_id(&order.id.to_string()),
93                order_id: order.id.to_string().into(),
94                exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
95                system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
96                instrument_id: InstrumentId::new(order.symbol.clone(), order.venue),
97                status: OrderStatus::Filled,
98                filled_quantity: order.quantity,
99                remaining_quantity: Decimal::ZERO,
100                execution_price: order.price,
101                reject_reason: None,
102                exchange_execution_id: Some(id_generation::generate_execution_id(
103                    &order.id.to_string(),
104                )),
105                is_final: true,
106            };
107
108            report_sender.send_async(fill_report).await?;
109
110            // Update order status in storage
111            // We don't currently track status in Order directly
112            // Status is inferred from ExecutionReport flow
113            // TODO: Revisit how TestExchange tracks mutable status of Order if necessary
114        }
115
116        Ok(())
117    }
118
119    async fn cancel_order(
120        &self,
121        order_id: SmartString,
122        report_sender: Sender<ExecutionReport>,
123    ) -> Result<()> {
124        let order_exists;
125        let mut order_snapshot_for_report: Option<Order> = None;
126
127        {
128            let orders_guard = self.orders.read();
129            if let Some(o) = orders_guard.get(order_id.as_str()) {
130                order_exists = true;
131                order_snapshot_for_report = Some(o.clone());
132            } else {
133                order_exists = false;
134            }
135            // orders_guard is dropped here
136        }
137
138        if !order_exists {
139            let reject_report = ExecutionReport {
140                id: id_generation::generate_rejection_id(&format!("cancel_{order_id}")),
141                order_id: order_id.clone(),
142                exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
143                system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
144                instrument_id: InstrumentId::new("UNKNOWN", Venue::Test),
145                status: OrderStatus::Rejected,
146                filled_quantity: Decimal::ZERO,
147                remaining_quantity: Decimal::ZERO,
148                execution_price: None,
149                reject_reason: Some("Order not found for cancellation".into()),
150                exchange_execution_id: None,
151                is_final: true,
152            };
153            report_sender.send_async(reject_report).await?;
154            return Ok(());
155        }
156
157        // Process cancellation if order exists
158        self.orders.write().remove(&order_id.to_string()); // Remove the order
159
160        if let Some(order_ref) = order_snapshot_for_report {
161            let cancel_report = ExecutionReport {
162                id: id_generation::generate_cancel_id(&order_id),
163                order_id: order_id.clone(),
164                exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
165                system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
166                instrument_id: InstrumentId::new(order_ref.symbol.clone(), order_ref.venue),
167                status: OrderStatus::Cancelled,
168                filled_quantity: Decimal::ZERO,
169                remaining_quantity: order_ref.quantity,
170                execution_price: None,
171                reject_reason: None,
172                exchange_execution_id: None,
173                is_final: true,
174            };
175            report_sender.send_async(cancel_report).await?;
176        }
177
178        Ok(())
179    }
180
181    async fn modify_order(
182        &self,
183        order_id: SmartString,
184        new_price: Option<Decimal>,
185        new_quantity: Option<Decimal>,
186        report_sender: Sender<ExecutionReport>,
187    ) -> Result<()> {
188        let mut order_to_modify_and_report: Option<Order> = None;
189        let mut order_found = false;
190
191        {
192            let mut orders_guard = self.orders.write(); // Need write lock to modify
193            if let Some(stored_order) = orders_guard.get_mut(&order_id.to_string()) {
194                if let Some(price) = new_price {
195                    stored_order.price = Some(price);
196                }
197                if let Some(quantity) = new_quantity {
198                    stored_order.quantity = quantity;
199                }
200                order_to_modify_and_report = Some(stored_order.clone());
201                order_found = true;
202            }
203            // orders_guard is dropped here
204        }
205
206        if !order_found {
207            let reject_report = ExecutionReport {
208                id: id_generation::generate_rejection_id(&format!("modify_{order_id}")),
209                order_id: order_id.clone(),
210                exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
211                system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
212                instrument_id: InstrumentId::new("UNKNOWN", Venue::Test),
213                status: OrderStatus::Rejected,
214                filled_quantity: Decimal::ZERO,
215                remaining_quantity: Decimal::ZERO,
216                execution_price: None,
217                reject_reason: Some("Order not found for modification".into()),
218                exchange_execution_id: None,
219                is_final: true,
220            };
221            report_sender.send_async(reject_report).await?;
222            return Ok(());
223        }
224
225        if let Some(order_snapshot) = order_to_modify_and_report {
226            let ack_report = ExecutionReport {
227                id: id_generation::generate_acknowledgment_id(&format!("modify_{order_id}")),
228                order_id: order_id.clone(),
229                exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
230                system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
231                instrument_id: InstrumentId::new(
232                    order_snapshot.symbol.clone(),
233                    order_snapshot.venue,
234                ),
235                status: OrderStatus::New, // Or Replaced if such status is added
236                filled_quantity: Decimal::ZERO,
237                remaining_quantity: order_snapshot.quantity,
238                execution_price: None,
239                reject_reason: None,
240                exchange_execution_id: None,
241                is_final: false, // Typically a modify ack is not final
242            };
243            report_sender.send_async(ack_report).await?;
244        }
245
246        Ok(())
247    }
248
249    async fn cancel_all_orders(
250        &self,
251        instrument_id_filter: Option<InstrumentId>,
252        report_sender: Sender<ExecutionReport>,
253    ) -> Result<()> {
254        let mut orders_to_cancel_ids = Vec::new();
255        let mut orders_snapshots_for_reporting = Vec::new();
256
257        {
258            let orders_guard = self.orders.read();
259            for (id, order) in orders_guard.iter() {
260                // Filter by instrument_id if provided
261                if instrument_id_filter.as_ref().is_none_or(|filter_id| {
262                    filter_id == &InstrumentId::new(order.symbol.clone(), order.venue)
263                }) {
264                    // And if order is in a cancellable state (e.g., not already Filled/Cancelled/Rejected)
265                    // For TestExchange, we assume any order present in the map and matching instrument is cancellable.
266                    // A more robust check would involve checking a status field if Order had one,
267                    // or by cross-referencing with past execution reports.
268                    // Here, we simplify: if it's in the map and matches instrument, it's considered for cancellation.
269                    orders_to_cancel_ids.push(id.clone());
270                    orders_snapshots_for_reporting.push(order.clone());
271                }
272            }
273            // orders_guard is dropped here
274        }
275
276        let mut orders_actually_removed_ids = Vec::new();
277        if !orders_to_cancel_ids.is_empty() {
278            let mut orders_write_guard = self.orders.write();
279            for id_to_remove in &orders_to_cancel_ids {
280                if orders_write_guard.remove(id_to_remove).is_some() {
281                    orders_actually_removed_ids.push(id_to_remove.clone());
282                }
283            }
284            // orders_write_guard is dropped here
285        }
286
287        for order_snapshot in orders_snapshots_for_reporting {
288            // Only send cancel report if it was actually found and intended for cancellation initially
289            // And further, ensure it was part of the batch successfully removed.
290            if orders_to_cancel_ids.contains(&order_snapshot.id.to_string())
291                && orders_actually_removed_ids.contains(&order_snapshot.id.to_string())
292            {
293                let cancel_report = ExecutionReport {
294                    id: id_generation::generate_cancel_id(&format!("all_{}", order_snapshot.id)),
295                    order_id: order_snapshot.id.to_string().into(),
296                    exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
297                    system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
298                    instrument_id: InstrumentId::new(
299                        order_snapshot.symbol.clone(),
300                        order_snapshot.venue,
301                    ),
302                    status: OrderStatus::Cancelled,
303                    filled_quantity: Decimal::ZERO, // Assuming full cancel
304                    remaining_quantity: Decimal::ZERO, // Assuming full cancel means 0 remaining from this order
305                    execution_price: None,
306                    reject_reason: None,
307                    exchange_execution_id: None,
308                    is_final: true,
309                };
310                // Need to handle potential send error if OMS is no longer listening
311                if let Err(e) = report_sender.send_async(cancel_report).await {
312                    eprintln!(
313                        "Failed to send cancel_all report for order {}: {}",
314                        order_snapshot.id, e
315                    );
316                }
317            }
318        }
319        Ok(())
320    }
321
322    async fn get_order_status(&self, order_id: &str) -> Result<OrderStatus> {
323        let orders_guard = self.orders.read();
324        if let Some(_order) = orders_guard.get(order_id) {
325            // _order is Order
326            // TestExchange doesn't explicitly store OrderStatus per order in its map.
327            // It sends reports. For a get_order_status, it might need to infer or simulate.
328            // Returning a default or looking up the last sent report status would be more complex.
329            // For now, if order exists, assume it's New (if not filled/cancelled by other logic).
330            // This is a simplification for TestExchange.
331            Ok(OrderStatus::New) // Simplified: if it exists, it's New. A real exchange would have definite states.
332        } else {
333            Ok(OrderStatus::Rejected) // Or an error like NotFound
334        }
335    }
336
337    async fn connect(&self, _report_sender: Sender<ExecutionReport>) -> Result<()> {
338        *self.is_connected.write() = true;
339        Ok(())
340    }
341
342    async fn disconnect(&self) -> Result<()> {
343        *self.is_connected.write() = false;
344        Ok(())
345    }
346
347    async fn is_connected(&self) -> bool {
348        *self.is_connected.read()
349    }
350
351    async fn get_instruments(&self) -> Result<SmallVec<[InstrumentId; 32]>> {
352        let instruments = {
353            let orders_guard = self.orders.read();
354            orders_guard
355                .values()
356                .map(|order| InstrumentId::new(order.symbol.clone(), order.venue))
357                .collect::<SmallVec<[InstrumentId; 32]>>()
358            // orders_guard is dropped here
359        };
360        Ok(instruments)
361    }
362
363    async fn send_fix_message(&self, message: Vec<u8>) -> Result<()> {
364        log::info!("TestExchange: Sending FIX message: {:?}", message);
365        Ok(())
366    }
367
368    async fn receive_fix_message(&self) -> Result<Vec<u8>> {
369        log::info!("TestExchange: Receiving FIX message (placeholder).");
370        Ok(Vec::new())
371    }
372}