1use rusty_common::collections::FxHashMap;
2use std::sync::Arc;
3
4use anyhow::Result;
5use async_trait::async_trait;
6use flume::Sender;
7use log;
8use parking_lot::RwLock;
9use quanta::Clock;
10use rust_decimal::Decimal;
11use rusty_common::SmartString;
12use rusty_common::utils::id_generation;
13use smallvec::SmallVec;
14
15use rusty_model::{
16 enums::{OrderStatus, OrderType},
17 instruments::InstrumentId,
18 trading_order::Order,
19 venues::Venue,
20};
21
22use crate::execution_engine::{Exchange, ExecutionReport};
23
24pub struct TestExchange {
26 orders: Arc<RwLock<FxHashMap<String, Order>>>,
28
29 clock: Clock,
31
32 is_connected: Arc<RwLock<bool>>,
34}
35
36impl Default for TestExchange {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42impl TestExchange {
43 #[must_use]
45 pub fn new() -> Self {
46 Self {
47 orders: Arc::new(RwLock::new(FxHashMap::default())),
48 clock: Clock::new(),
49 is_connected: Arc::new(RwLock::new(false)),
50 }
51 }
52}
53
54#[async_trait]
55impl Exchange for TestExchange {
56 fn venue(&self) -> Venue {
57 Venue::Test
58 }
59
60 async fn place_order(
61 &self,
62 order: Order,
63 report_sender: Sender<ExecutionReport>,
64 ) -> Result<()> {
65 self.orders
67 .write()
68 .insert(order.id.to_string(), order.clone());
69
70 let ack_report = ExecutionReport {
72 id: id_generation::generate_acknowledgment_id(&order.id.to_string()),
73 order_id: order.id.to_string().into(),
74 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
75 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
76 instrument_id: InstrumentId::new(order.symbol.clone(), order.venue),
77 status: OrderStatus::New,
78 filled_quantity: Decimal::ZERO,
79 remaining_quantity: order.quantity,
80 execution_price: None,
81 reject_reason: None,
82 exchange_execution_id: None,
83 is_final: false,
84 };
85
86 report_sender.send_async(ack_report).await?;
87
88 if order.order_type == OrderType::Market {
90 let fill_report = ExecutionReport {
92 id: id_generation::generate_fill_id(&order.id.to_string()),
93 order_id: order.id.to_string().into(),
94 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
95 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
96 instrument_id: InstrumentId::new(order.symbol.clone(), order.venue),
97 status: OrderStatus::Filled,
98 filled_quantity: order.quantity,
99 remaining_quantity: Decimal::ZERO,
100 execution_price: order.price,
101 reject_reason: None,
102 exchange_execution_id: Some(id_generation::generate_execution_id(
103 &order.id.to_string(),
104 )),
105 is_final: true,
106 };
107
108 report_sender.send_async(fill_report).await?;
109
110 }
115
116 Ok(())
117 }
118
119 async fn cancel_order(
120 &self,
121 order_id: SmartString,
122 report_sender: Sender<ExecutionReport>,
123 ) -> Result<()> {
124 let order_exists;
125 let mut order_snapshot_for_report: Option<Order> = None;
126
127 {
128 let orders_guard = self.orders.read();
129 if let Some(o) = orders_guard.get(order_id.as_str()) {
130 order_exists = true;
131 order_snapshot_for_report = Some(o.clone());
132 } else {
133 order_exists = false;
134 }
135 }
137
138 if !order_exists {
139 let reject_report = ExecutionReport {
140 id: id_generation::generate_rejection_id(&format!("cancel_{order_id}")),
141 order_id: order_id.clone(),
142 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
143 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
144 instrument_id: InstrumentId::new("UNKNOWN", Venue::Test),
145 status: OrderStatus::Rejected,
146 filled_quantity: Decimal::ZERO,
147 remaining_quantity: Decimal::ZERO,
148 execution_price: None,
149 reject_reason: Some("Order not found for cancellation".into()),
150 exchange_execution_id: None,
151 is_final: true,
152 };
153 report_sender.send_async(reject_report).await?;
154 return Ok(());
155 }
156
157 self.orders.write().remove(&order_id.to_string()); if let Some(order_ref) = order_snapshot_for_report {
161 let cancel_report = ExecutionReport {
162 id: id_generation::generate_cancel_id(&order_id),
163 order_id: order_id.clone(),
164 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
165 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
166 instrument_id: InstrumentId::new(order_ref.symbol.clone(), order_ref.venue),
167 status: OrderStatus::Cancelled,
168 filled_quantity: Decimal::ZERO,
169 remaining_quantity: order_ref.quantity,
170 execution_price: None,
171 reject_reason: None,
172 exchange_execution_id: None,
173 is_final: true,
174 };
175 report_sender.send_async(cancel_report).await?;
176 }
177
178 Ok(())
179 }
180
181 async fn modify_order(
182 &self,
183 order_id: SmartString,
184 new_price: Option<Decimal>,
185 new_quantity: Option<Decimal>,
186 report_sender: Sender<ExecutionReport>,
187 ) -> Result<()> {
188 let mut order_to_modify_and_report: Option<Order> = None;
189 let mut order_found = false;
190
191 {
192 let mut orders_guard = self.orders.write(); if let Some(stored_order) = orders_guard.get_mut(&order_id.to_string()) {
194 if let Some(price) = new_price {
195 stored_order.price = Some(price);
196 }
197 if let Some(quantity) = new_quantity {
198 stored_order.quantity = quantity;
199 }
200 order_to_modify_and_report = Some(stored_order.clone());
201 order_found = true;
202 }
203 }
205
206 if !order_found {
207 let reject_report = ExecutionReport {
208 id: id_generation::generate_rejection_id(&format!("modify_{order_id}")),
209 order_id: order_id.clone(),
210 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
211 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
212 instrument_id: InstrumentId::new("UNKNOWN", Venue::Test),
213 status: OrderStatus::Rejected,
214 filled_quantity: Decimal::ZERO,
215 remaining_quantity: Decimal::ZERO,
216 execution_price: None,
217 reject_reason: Some("Order not found for modification".into()),
218 exchange_execution_id: None,
219 is_final: true,
220 };
221 report_sender.send_async(reject_report).await?;
222 return Ok(());
223 }
224
225 if let Some(order_snapshot) = order_to_modify_and_report {
226 let ack_report = ExecutionReport {
227 id: id_generation::generate_acknowledgment_id(&format!("modify_{order_id}")),
228 order_id: order_id.clone(),
229 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
230 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
231 instrument_id: InstrumentId::new(
232 order_snapshot.symbol.clone(),
233 order_snapshot.venue,
234 ),
235 status: OrderStatus::New, filled_quantity: Decimal::ZERO,
237 remaining_quantity: order_snapshot.quantity,
238 execution_price: None,
239 reject_reason: None,
240 exchange_execution_id: None,
241 is_final: false, };
243 report_sender.send_async(ack_report).await?;
244 }
245
246 Ok(())
247 }
248
249 async fn cancel_all_orders(
250 &self,
251 instrument_id_filter: Option<InstrumentId>,
252 report_sender: Sender<ExecutionReport>,
253 ) -> Result<()> {
254 let mut orders_to_cancel_ids = Vec::new();
255 let mut orders_snapshots_for_reporting = Vec::new();
256
257 {
258 let orders_guard = self.orders.read();
259 for (id, order) in orders_guard.iter() {
260 if instrument_id_filter.as_ref().is_none_or(|filter_id| {
262 filter_id == &InstrumentId::new(order.symbol.clone(), order.venue)
263 }) {
264 orders_to_cancel_ids.push(id.clone());
270 orders_snapshots_for_reporting.push(order.clone());
271 }
272 }
273 }
275
276 let mut orders_actually_removed_ids = Vec::new();
277 if !orders_to_cancel_ids.is_empty() {
278 let mut orders_write_guard = self.orders.write();
279 for id_to_remove in &orders_to_cancel_ids {
280 if orders_write_guard.remove(id_to_remove).is_some() {
281 orders_actually_removed_ids.push(id_to_remove.clone());
282 }
283 }
284 }
286
287 for order_snapshot in orders_snapshots_for_reporting {
288 if orders_to_cancel_ids.contains(&order_snapshot.id.to_string())
291 && orders_actually_removed_ids.contains(&order_snapshot.id.to_string())
292 {
293 let cancel_report = ExecutionReport {
294 id: id_generation::generate_cancel_id(&format!("all_{}", order_snapshot.id)),
295 order_id: order_snapshot.id.to_string().into(),
296 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
297 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
298 instrument_id: InstrumentId::new(
299 order_snapshot.symbol.clone(),
300 order_snapshot.venue,
301 ),
302 status: OrderStatus::Cancelled,
303 filled_quantity: Decimal::ZERO, remaining_quantity: Decimal::ZERO, execution_price: None,
306 reject_reason: None,
307 exchange_execution_id: None,
308 is_final: true,
309 };
310 if let Err(e) = report_sender.send_async(cancel_report).await {
312 eprintln!(
313 "Failed to send cancel_all report for order {}: {}",
314 order_snapshot.id, e
315 );
316 }
317 }
318 }
319 Ok(())
320 }
321
322 async fn get_order_status(&self, order_id: &str) -> Result<OrderStatus> {
323 let orders_guard = self.orders.read();
324 if let Some(_order) = orders_guard.get(order_id) {
325 Ok(OrderStatus::New) } else {
333 Ok(OrderStatus::Rejected) }
335 }
336
337 async fn connect(&self, _report_sender: Sender<ExecutionReport>) -> Result<()> {
338 *self.is_connected.write() = true;
339 Ok(())
340 }
341
342 async fn disconnect(&self) -> Result<()> {
343 *self.is_connected.write() = false;
344 Ok(())
345 }
346
347 async fn is_connected(&self) -> bool {
348 *self.is_connected.read()
349 }
350
351 async fn get_instruments(&self) -> Result<SmallVec<[InstrumentId; 32]>> {
352 let instruments = {
353 let orders_guard = self.orders.read();
354 orders_guard
355 .values()
356 .map(|order| InstrumentId::new(order.symbol.clone(), order.venue))
357 .collect::<SmallVec<[InstrumentId; 32]>>()
358 };
360 Ok(instruments)
361 }
362
363 async fn send_fix_message(&self, message: Vec<u8>) -> Result<()> {
364 log::info!("TestExchange: Sending FIX message: {:?}", message);
365 Ok(())
366 }
367
368 async fn receive_fix_message(&self) -> Result<Vec<u8>> {
369 log::info!("TestExchange: Receiving FIX message (placeholder).");
370 Ok(Vec::new())
371 }
372}