1use rusty_common::collections::FxHashMap;
2use std::sync::Arc;
3
4use anyhow::Result;
5use async_trait::async_trait;
6use flume::Sender;
7use parking_lot::RwLock;
8use quanta::Clock;
9use rust_decimal::Decimal;
10use rusty_common::SmartString;
11use rusty_common::utils::id_generation;
12use smallvec::SmallVec;
13
14use rusty_model::{
15 enums::{OrderStatus, OrderType},
16 instruments::InstrumentId,
17 trading_order::Order,
18 venues::Venue,
19};
20
21use crate::execution_engine::{Exchange, ExecutionReport};
22
23pub struct TestExchange {
26 orders: Arc<RwLock<FxHashMap<String, Order>>>,
28
29 clock: Clock,
31
32 is_connected: Arc<RwLock<bool>>,
34}
35
36impl Default for TestExchange {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42impl TestExchange {
43 #[must_use]
45 pub fn new() -> Self {
46 Self {
47 orders: Arc::new(RwLock::new(FxHashMap::default())),
48 clock: Clock::new(),
49 is_connected: Arc::new(RwLock::new(false)),
50 }
51 }
52}
53
54#[async_trait]
55impl Exchange for TestExchange {
56 fn venue(&self) -> Venue {
57 Venue::Test
58 }
59
60 async fn place_order(&self, order: Order, report_tx: Sender<ExecutionReport>) -> Result<()> {
61 self.orders
63 .write()
64 .insert(order.id.to_string(), order.clone());
65
66 let ack_report = ExecutionReport {
68 id: id_generation::generate_acknowledgment_id(&order.id.to_string()),
69 order_id: order.id.to_string().into(),
70 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
71 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
72 instrument_id: InstrumentId::new(order.symbol.clone(), order.venue),
73 status: OrderStatus::New,
74 filled_quantity: Decimal::ZERO,
75 remaining_quantity: order.quantity,
76 execution_price: None,
77 reject_reason: None,
78 exchange_execution_id: None,
79 is_final: false,
80 };
81
82 report_tx.send(ack_report)?;
83
84 if order.order_type == OrderType::Market {
86 let fill_report = ExecutionReport {
88 id: id_generation::generate_fill_id(&order.id.to_string()),
89 order_id: order.id.to_string().into(),
90 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
91 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
92 instrument_id: InstrumentId::new(order.symbol.clone(), order.venue),
93 status: OrderStatus::Filled,
94 filled_quantity: order.quantity,
95 remaining_quantity: Decimal::ZERO,
96 execution_price: order.price,
97 reject_reason: None,
98 exchange_execution_id: Some(id_generation::generate_execution_id(
99 &order.id.to_string(),
100 )),
101 is_final: true,
102 };
103
104 report_tx.send(fill_report)?;
105
106 }
110
111 Ok(())
112 }
113
114 async fn cancel_order(
115 &self,
116 order_id: SmartString,
117 report_tx: Sender<ExecutionReport>,
118 ) -> Result<()> {
119 let order_exists;
120 let mut order_snapshot_for_report: Option<Order> = None;
121
122 {
123 let orders_guard = self.orders.read();
124 if let Some(order) = orders_guard.get(order_id.as_str()) {
125 order_exists = true;
126 order_snapshot_for_report = Some(order.clone());
127 } else {
128 order_exists = false;
129 }
130 }
132
133 if !order_exists {
134 let reject_report = ExecutionReport {
135 id: id_generation::generate_rejection_id(&format!("cancel_{order_id}")),
136 order_id,
137 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
138 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
139 instrument_id: InstrumentId::new("UNKNOWN", Venue::Test),
140 status: OrderStatus::Rejected,
141 filled_quantity: Decimal::ZERO,
142 remaining_quantity: Decimal::ZERO,
143 execution_price: None,
144 reject_reason: Some("Order not found for cancellation".into()),
145 exchange_execution_id: None,
146 is_final: true,
147 };
148 report_tx.send(reject_report)?;
149 return Ok(());
150 }
151
152 self.orders.write().remove(&order_id.to_string()); if let Some(order_ref) = order_snapshot_for_report {
156 let cancel_report = ExecutionReport {
157 id: id_generation::generate_cancel_id(&order_id),
158 order_id,
159 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
160 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
161 instrument_id: InstrumentId::new(order_ref.symbol.clone(), order_ref.venue),
162 status: OrderStatus::Cancelled,
163 filled_quantity: Decimal::ZERO,
164 remaining_quantity: order_ref.quantity,
165 execution_price: None,
166 reject_reason: None,
167 exchange_execution_id: None,
168 is_final: true,
169 };
170 report_tx.send(cancel_report)?;
171 }
172
173 Ok(())
174 }
175
176 async fn modify_order(
177 &self,
178 order_id: SmartString,
179 new_price: Option<Decimal>,
180 new_quantity: Option<Decimal>,
181 report_tx: Sender<ExecutionReport>,
182 ) -> Result<()> {
183 let mut order_to_modify_and_report: Option<Order> = None;
184 let mut order_found = false;
185
186 {
187 let mut orders_guard = self.orders.write(); if let Some(stored_order) = orders_guard.get_mut(&order_id.to_string()) {
189 if let Some(price) = new_price {
190 stored_order.price = Some(price);
191 }
192 if let Some(quantity) = new_quantity {
193 stored_order.quantity = quantity;
194 }
195 order_to_modify_and_report = Some(stored_order.clone());
196 order_found = true;
197 }
198 }
200
201 if !order_found {
202 let reject_report = ExecutionReport {
203 id: id_generation::generate_rejection_id(&format!("modify_{order_id}")),
204 order_id,
205 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
206 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
207 instrument_id: InstrumentId::new("UNKNOWN", Venue::Test),
208 status: OrderStatus::Rejected,
209 filled_quantity: Decimal::ZERO,
210 remaining_quantity: Decimal::ZERO,
211 execution_price: None,
212 reject_reason: Some("Order not found for modification".into()),
213 exchange_execution_id: None,
214 is_final: true,
215 };
216 report_tx.send(reject_report)?;
217 return Ok(());
218 }
219
220 if let Some(order_snapshot) = order_to_modify_and_report {
221 let ack_report = ExecutionReport {
222 id: id_generation::generate_report_id("modify", &order_id),
223 order_id,
224 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
225 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
226 instrument_id: InstrumentId::new(
227 order_snapshot.symbol.clone(),
228 order_snapshot.venue,
229 ),
230 status: OrderStatus::Open, filled_quantity: Decimal::ZERO,
232 remaining_quantity: order_snapshot.quantity,
233 execution_price: None,
234 reject_reason: None,
235 exchange_execution_id: None,
236 is_final: false,
237 };
238 report_tx.send(ack_report)?;
239 }
240
241 Ok(())
242 }
243
244 async fn cancel_all_orders(
245 &self,
246 instrument_id_filter: Option<InstrumentId>,
247 report_tx: Sender<ExecutionReport>,
248 ) -> Result<()> {
249 let mut orders_to_cancel_ids = Vec::new();
250 let mut orders_snapshots_for_reporting = Vec::new();
251
252 {
253 let orders_guard = self.orders.read();
254 for (id, order) in orders_guard.iter() {
255 if instrument_id_filter.as_ref().is_none_or(|filter_id| {
257 filter_id == &InstrumentId::new(order.symbol.clone(), order.venue)
258 }) {
259 orders_to_cancel_ids.push(id.clone());
260 orders_snapshots_for_reporting.push(order.clone());
261 }
262 }
263 }
265
266 let mut orders_actually_removed_ids = Vec::new();
267 if !orders_to_cancel_ids.is_empty() {
268 let mut orders_write_guard = self.orders.write();
269 for id_to_remove in &orders_to_cancel_ids {
270 if orders_write_guard.remove(id_to_remove).is_some() {
271 orders_actually_removed_ids.push(id_to_remove.clone());
272 }
273 }
274 }
276
277 for order_snapshot in orders_snapshots_for_reporting {
278 if orders_to_cancel_ids.contains(&order_snapshot.id.to_string())
281 && orders_actually_removed_ids.contains(&order_snapshot.id.to_string())
282 {
283 let cancel_report = ExecutionReport {
284 id: id_generation::generate_cancel_id(&format!("all_{}", order_snapshot.id)),
285 order_id: order_snapshot.id.to_string().into(),
286 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
287 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
288 instrument_id: InstrumentId::new(
289 order_snapshot.symbol.clone(),
290 order_snapshot.venue,
291 ),
292 status: OrderStatus::Cancelled,
293 filled_quantity: Decimal::ZERO,
294 remaining_quantity: Decimal::ZERO,
295 execution_price: None,
296 reject_reason: None,
297 exchange_execution_id: None,
298 is_final: true,
299 };
300 if let Err(e) = report_tx.send(cancel_report) {
302 eprintln!(
303 "Failed to send cancel_all report for order {}: {}",
304 order_snapshot.id, e
305 );
306 }
307 }
308 }
309 Ok(())
310 }
311
312 async fn get_order_status(&self, order_id: &str) -> Result<OrderStatus> {
313 let orders_guard = self.orders.read();
314 if let Some(_order) = orders_guard.get(order_id) {
315 Ok(OrderStatus::New)
320 } else {
321 Ok(OrderStatus::Rejected) }
323 }
324
325 async fn connect(&self, _report_sender: Sender<ExecutionReport>) -> Result<()> {
326 *self.is_connected.write() = true;
327 Ok(())
328 }
329
330 async fn disconnect(&self) -> Result<()> {
331 *self.is_connected.write() = false;
332 Ok(())
333 }
334
335 async fn is_connected(&self) -> bool {
336 *self.is_connected.read()
337 }
338
339 async fn get_instruments(&self) -> Result<SmallVec<[InstrumentId; 32]>> {
340 let instruments = {
341 let orders_guard = self.orders.read();
342 let mut instruments = SmallVec::new();
343 for order in orders_guard.values() {
344 if !instruments.iter().any(|i: &InstrumentId| {
345 i == &InstrumentId::new(order.symbol.clone(), order.venue)
346 }) {
347 instruments.push(InstrumentId::new(order.symbol.clone(), order.venue));
348 }
349 }
350 instruments
351 };
352 Ok(instruments)
353 }
354
355 async fn send_fix_message(&self, _message: Vec<u8>) -> Result<()> {
356 anyhow::bail!("FIX protocol not supported in test exchange")
358 }
359
360 async fn receive_fix_message(&self) -> Result<Vec<u8>> {
361 anyhow::bail!("FIX protocol not supported in test exchange")
363 }
364}