rusty_ems/execution_engine.rs
1use rusty_common::SmartString;
2use rusty_common::collections::FxHashMap;
3use rusty_common::zerocopy::{BufferedStringOps, ZeroCopyStringUtils};
4use std::sync::Arc;
5
6use anyhow::{Result, anyhow};
7use async_trait::async_trait;
8use dashmap::DashMap;
9use flume::{Receiver, Sender};
10use quanta::Clock;
11use rust_decimal::Decimal;
12use smallvec::SmallVec;
13
14use rusty_model::{
15 enums::OrderStatus, instruments::InstrumentId, trading_order::Order, venues::Venue,
16};
17
18/// Events related to order execution
19#[derive(Debug, Clone)]
20pub enum ExecutionEvent {
21 /// Order accepted by the exchange
22 OrderAccepted(Order),
23
24 /// Request to cancel an existing order
25 CancelOrder {
26 /// Order ID to cancel
27 order_id: SmartString,
28
29 /// Original order details
30 original_order: Order,
31 },
32
33 /// Request to modify an existing order
34 ModifyOrder {
35 /// Order ID to modify
36 order_id: SmartString,
37
38 /// Original order details
39 original_order: Order,
40
41 /// New price (if modifying price)
42 new_price: Option<Decimal>,
43
44 /// New quantity (if modifying quantity)
45 new_quantity: Option<Decimal>,
46 },
47
48 /// Request to cancel all open orders for a specific instrument
49 CancelAllOrders(InstrumentId),
50
51 /// Request to cancel all open orders across all instruments
52 CancelAllOrdersGlobally,
53}
54
55/// Execution report from the exchange
56#[derive(Debug, Clone)]
57pub struct ExecutionReport {
58 /// Unique identifier for this execution report
59 pub id: SmartString,
60
61 /// Order ID this report is for
62 pub order_id: SmartString,
63
64 /// Exchange timestamp (in nanoseconds)
65 pub exchange_timestamp: u64,
66
67 /// System timestamp (in nanoseconds)
68 pub system_timestamp: u64,
69
70 /// Instrument ID
71 pub instrument_id: InstrumentId,
72
73 /// New status of the order
74 pub status: OrderStatus,
75
76 /// Filled quantity in this execution (0 for non-fills)
77 pub filled_quantity: Decimal,
78
79 /// Remaining quantity to be filled
80 pub remaining_quantity: Decimal,
81
82 /// Price of this execution (for fills)
83 pub execution_price: Option<Decimal>,
84
85 /// Reason for rejection (if applicable)
86 pub reject_reason: Option<SmartString>,
87
88 /// Exchange-specific execution ID
89 pub exchange_execution_id: Option<SmartString>,
90
91 /// Whether this is the final report for the order
92 pub is_final: bool,
93}
94
95/// Work item for exchange operations
96struct ExchangeWork {
97 exchange: Arc<dyn Exchange>,
98 operation: ExchangeOperation,
99 report_sender: Sender<ExecutionReport>,
100}
101
102/// Types of exchange operations
103#[derive(Debug)]
104enum ExchangeOperation {
105 PlaceOrder(Order),
106 CancelOrder {
107 order_id: SmartString,
108 instrument_id: InstrumentId,
109 },
110 ModifyOrder {
111 order_id: SmartString,
112 instrument_id: InstrumentId,
113 new_price: Option<Decimal>,
114 new_quantity: Option<Decimal>,
115 },
116 CancelAllOrders(Option<InstrumentId>),
117}
118
119/// An execution management engine that routes orders to appropriate exchange connections
120pub struct ExecutionEngine {
121 /// Map of venue to exchange adapter
122 exchanges: FxHashMap<Venue, Arc<dyn Exchange>>,
123
124 /// Receiver for execution events from OMS
125 event_receiver: Receiver<ExecutionEvent>,
126
127 /// Sender for execution reports to OMS
128 report_sender: Sender<ExecutionReport>,
129
130 /// High-precision clock
131 clock: Clock,
132
133 /// Lock-free map of order ID to venue for routing
134 order_venues: Arc<DashMap<SmartString, Venue>>,
135
136 /// Work queue for exchange operations
137 work_sender: Sender<ExchangeWork>,
138 work_receiver: Receiver<ExchangeWork>,
139
140 /// Number of worker threads for processing exchange operations
141 worker_count: usize,
142}
143
144impl ExecutionEngine {
145 /// Create a new execution engine
146 #[must_use]
147 pub fn new(
148 event_receiver: Receiver<ExecutionEvent>,
149 report_sender: Sender<ExecutionReport>,
150 ) -> Self {
151 Self::with_worker_count(event_receiver, report_sender, 4)
152 }
153
154 /// Create a new execution engine with a specific worker count
155 #[must_use]
156 pub fn with_worker_count(
157 event_receiver: Receiver<ExecutionEvent>,
158 report_sender: Sender<ExecutionReport>,
159 worker_count: usize,
160 ) -> Self {
161 let (work_sender, work_receiver) = flume::unbounded();
162
163 Self {
164 exchanges: FxHashMap::default(),
165 event_receiver,
166 report_sender,
167 clock: Clock::new(),
168 order_venues: Arc::new(DashMap::new()),
169 work_sender,
170 work_receiver,
171 worker_count: worker_count.max(1), // Ensure at least one worker
172 }
173 }
174
175 /// Register an exchange adapter with this execution engine
176 pub fn register_exchange(&mut self, exchange: Arc<dyn Exchange>) {
177 let venue = exchange.venue();
178 self.exchanges.insert(venue, exchange);
179 }
180
181 /// Start the execution engine
182 pub async fn start(&self) -> Result<()> {
183 if self.exchanges.is_empty() {
184 return Err(anyhow!("No exchanges registered"));
185 }
186
187 // Clone values needed for the loops
188 let event_receiver = self.event_receiver.clone();
189 let work_sender = self.work_sender.clone();
190 let exchanges = self.exchanges.clone();
191 let order_venues = self.order_venues.clone();
192 let clock = self.clock.clone();
193 let report_sender = self.report_sender.clone();
194
195 // Start dedicated worker pool for exchange operations
196 let work_receiver = self.work_receiver.clone();
197 let worker_count = self.worker_count;
198
199 for worker_id in 0..worker_count {
200 let work_receiver = work_receiver.clone();
201 tokio::spawn(async move {
202 while let Ok(work) = work_receiver.recv_async().await {
203 match work.operation {
204 ExchangeOperation::PlaceOrder(order) => {
205 // Create references for rejection report before moving order
206 let order_id_for_rejection = order.id;
207 let symbol_for_rejection = order.symbol.clone();
208 let venue_for_rejection = order.venue;
209 let quantity_for_rejection = order.quantity;
210
211 if let Err(e) = work
212 .exchange
213 .place_order(order, work.report_sender.clone())
214 .await
215 {
216 log::error!("Worker {worker_id}: Failed to place order: {e}");
217
218 // Send rejection ExecutionReport using zero-copy utilities
219 let rejection = ExecutionReport {
220 id: BufferedStringOps::create_rejection_id_buffered(
221 &order_id_for_rejection,
222 ),
223 order_id: ZeroCopyStringUtils::display_to_smart_string(
224 &order_id_for_rejection,
225 ),
226 exchange_timestamp: 0, // No exchange interaction occurred
227 system_timestamp: 0, // Will be set by receiving system
228 instrument_id: InstrumentId::new(
229 symbol_for_rejection,
230 venue_for_rejection,
231 ),
232 status: OrderStatus::Rejected,
233 filled_quantity: Decimal::ZERO,
234 remaining_quantity: quantity_for_rejection,
235 execution_price: None,
236 reject_reason: Some(
237 ZeroCopyStringUtils::error_to_smart_string(&e),
238 ),
239 exchange_execution_id: None,
240 is_final: true,
241 };
242
243 // Try to send rejection report (ignore send errors to avoid infinite loops)
244 if let Err(send_err) =
245 work.report_sender.send_async(rejection).await
246 {
247 log::warn!(
248 "Worker {worker_id}: Failed to send rejection report: {send_err}"
249 );
250 }
251 }
252 }
253 ExchangeOperation::CancelOrder {
254 order_id,
255 instrument_id,
256 } => {
257 // Create reference for failure report before moving order_id
258 let order_id_for_failure = order_id.clone();
259
260 if let Err(e) = work
261 .exchange
262 .cancel_order(order_id, work.report_sender.clone())
263 .await
264 {
265 log::error!("Worker {worker_id}: Failed to cancel order: {e}");
266
267 // Send cancellation failure report using zero-copy utilities
268 let cancel_failure = ExecutionReport {
269 id: {
270 let mut id = SmartString::new();
271 id.push_str("cancel_fail_");
272 id.push_str(&order_id_for_failure);
273 id
274 },
275 order_id: order_id_for_failure,
276 exchange_timestamp: 0,
277 system_timestamp: 0,
278 instrument_id,
279 status: OrderStatus::Rejected,
280 filled_quantity: Decimal::ZERO,
281 remaining_quantity: Decimal::ZERO,
282 execution_price: None,
283 reject_reason: Some({
284 let mut reason = SmartString::new();
285 reason.push_str("Cancel failed: ");
286 reason.push_str(&e.to_string());
287 reason
288 }),
289 exchange_execution_id: None,
290 is_final: false, // Order may still be active
291 };
292
293 if let Err(send_err) =
294 work.report_sender.send_async(cancel_failure).await
295 {
296 log::warn!(
297 "Worker {worker_id}: Failed to send cancel failure report: {send_err}"
298 );
299 }
300 }
301 }
302 ExchangeOperation::ModifyOrder {
303 order_id,
304 instrument_id,
305 new_price,
306 new_quantity,
307 } => {
308 // Create reference for failure report before moving order_id
309 let order_id_for_failure = order_id.clone();
310
311 if let Err(e) = work
312 .exchange
313 .modify_order(
314 order_id,
315 new_price,
316 new_quantity,
317 work.report_sender.clone(),
318 )
319 .await
320 {
321 log::error!("Worker {worker_id}: Failed to modify order: {e}");
322
323 // Send modification failure report using zero-copy utilities
324 let modify_failure = ExecutionReport {
325 id: {
326 let mut id = SmartString::new();
327 id.push_str("modify_fail_");
328 id.push_str(&order_id_for_failure);
329 id
330 },
331 order_id: order_id_for_failure,
332 exchange_timestamp: 0,
333 system_timestamp: 0,
334 instrument_id,
335 status: OrderStatus::Rejected, // Order modification failed
336 filled_quantity: Decimal::ZERO,
337 remaining_quantity: Decimal::ZERO,
338 execution_price: None,
339 reject_reason: Some({
340 let mut reason = SmartString::new();
341 reason.push_str("Modify failed: ");
342 reason.push_str(&e.to_string());
343 reason
344 }),
345 exchange_execution_id: None,
346 is_final: false,
347 };
348
349 if let Err(send_err) =
350 work.report_sender.send_async(modify_failure).await
351 {
352 log::warn!(
353 "Worker {worker_id}: Failed to send modify failure report: {send_err}"
354 );
355 }
356 }
357 }
358 ExchangeOperation::CancelAllOrders(instrument_id) => {
359 if let Err(e) = work
360 .exchange
361 .cancel_all_orders(
362 instrument_id.clone(),
363 work.report_sender.clone(),
364 )
365 .await
366 {
367 log::error!("Worker {worker_id}: Failed to cancel all orders: {e}");
368
369 // Send cancel all failure report
370 let cancel_all_failure = ExecutionReport {
371 id: {
372 let mut id = SmartString::new();
373 id.push_str("cancel_all_fail_");
374 id.push_str(
375 instrument_id
376 .as_ref()
377 .map_or("ALL", |i| i.symbol.as_str()),
378 );
379 id
380 },
381 order_id: "CANCEL_ALL_FAILED".into(),
382 exchange_timestamp: 0,
383 system_timestamp: 0,
384 instrument_id: instrument_id
385 .unwrap_or_else(|| InstrumentId::new("ALL", Venue::Test)),
386 status: OrderStatus::Rejected,
387 filled_quantity: Decimal::ZERO,
388 remaining_quantity: Decimal::ZERO,
389 execution_price: None,
390 reject_reason: Some({
391 let mut reason = SmartString::new();
392 reason.push_str("Cancel all failed: ");
393 reason.push_str(&e.to_string());
394 reason
395 }),
396 exchange_execution_id: None,
397 is_final: false,
398 };
399
400 if let Err(send_err) =
401 work.report_sender.send_async(cancel_all_failure).await
402 {
403 log::warn!(
404 "Worker {worker_id}: Failed to send cancel all failure report: {send_err}"
405 );
406 }
407 }
408 }
409 }
410 }
411 });
412 }
413
414 // Start the main event processing loop
415 tokio::spawn(async move {
416 while let Ok(event) = event_receiver.recv_async().await {
417 match event {
418 ExecutionEvent::OrderAccepted(order) => {
419 let venue = order.venue;
420
421 // Route to the appropriate exchange
422 if let Some(exchange) = exchanges.get(&venue) {
423 let order_id_str =
424 ZeroCopyStringUtils::display_to_smart_string(&order.id);
425
426 // Store the venue for this order only after confirming exchange exists
427 order_venues.insert(order_id_str.clone(), venue);
428
429 let work = ExchangeWork {
430 exchange: exchange.clone(),
431 operation: ExchangeOperation::PlaceOrder(order),
432 report_sender: report_sender.clone(),
433 };
434
435 if let Err(e) = work_sender.send_async(work).await {
436 log::error!("Failed to queue place order work: {e}");
437 // Remove from venues map if we failed to queue the work
438 order_venues.remove(&order_id_str);
439 }
440 } else {
441 // Send rejection report if no exchange is available using zero-copy utilities
442 let report = ExecutionReport {
443 id: BufferedStringOps::create_rejection_id_buffered(&order.id),
444 order_id: ZeroCopyStringUtils::display_to_smart_string(&order.id),
445 exchange_timestamp: 0,
446 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
447 instrument_id: InstrumentId::new(order.symbol, order.venue),
448 status: OrderStatus::Rejected,
449 filled_quantity: Decimal::ZERO,
450 remaining_quantity: order.quantity,
451 execution_price: None,
452 reject_reason: Some({
453 let mut reason = SmartString::new();
454 reason.push_str("No exchange adapter for venue: ");
455 reason.push_str(&format!("{venue:?}"));
456 reason
457 }),
458 exchange_execution_id: None,
459 is_final: true,
460 };
461
462 if let Err(e) = report_sender.send_async(report).await {
463 log::error!("Failed to send rejection report: {e}");
464 }
465 }
466 }
467
468 ExecutionEvent::CancelOrder {
469 order_id,
470 original_order,
471 } => {
472 // Lookup the venue for this order (lock-free)
473 let venue = if let Some(v) = order_venues.get(&order_id) {
474 *v
475 } else {
476 original_order.venue
477 };
478
479 // Route to the appropriate exchange
480 if let Some(exchange) = exchanges.get(&venue) {
481 let work = ExchangeWork {
482 exchange: exchange.clone(),
483 operation: ExchangeOperation::CancelOrder {
484 order_id,
485 instrument_id: InstrumentId::new(
486 original_order.symbol.clone(),
487 original_order.venue,
488 ),
489 },
490 report_sender: report_sender.clone(),
491 };
492
493 if let Err(e) = work_sender.send_async(work).await {
494 log::error!("Failed to queue cancel order work: {e}");
495 }
496 }
497 }
498
499 ExecutionEvent::ModifyOrder {
500 order_id,
501 original_order,
502 new_price,
503 new_quantity,
504 } => {
505 // Lookup the venue for this order (lock-free)
506 let venue = if let Some(v) = order_venues.get(&order_id) {
507 *v
508 } else {
509 original_order.venue
510 };
511
512 // Route to the appropriate exchange
513 if let Some(exchange) = exchanges.get(&venue) {
514 let work = ExchangeWork {
515 exchange: exchange.clone(),
516 operation: ExchangeOperation::ModifyOrder {
517 order_id,
518 instrument_id: InstrumentId::new(
519 original_order.symbol.clone(),
520 original_order.venue,
521 ),
522 new_price,
523 new_quantity,
524 },
525 report_sender: report_sender.clone(),
526 };
527
528 if let Err(e) = work_sender.send_async(work).await {
529 log::error!("Failed to queue modify order work: {e}");
530 }
531 }
532 }
533
534 ExecutionEvent::CancelAllOrders(instrument_id) => {
535 let venue = instrument_id.venue;
536
537 // Route to the appropriate exchange
538 if let Some(exchange) = exchanges.get(&venue) {
539 let work = ExchangeWork {
540 exchange: exchange.clone(),
541 operation: ExchangeOperation::CancelAllOrders(Some(instrument_id)),
542 report_sender: report_sender.clone(),
543 };
544
545 if let Err(e) = work_sender.send_async(work).await {
546 log::error!("Failed to queue cancel all orders work: {e}");
547 }
548 }
549 }
550
551 ExecutionEvent::CancelAllOrdersGlobally => {
552 // Send cancel all orders to every registered exchange
553 for exchange in exchanges.values() {
554 let work = ExchangeWork {
555 exchange: exchange.clone(),
556 operation: ExchangeOperation::CancelAllOrders(None),
557 report_sender: report_sender.clone(),
558 };
559
560 if let Err(e) = work_sender.send_async(work).await {
561 log::error!("Failed to queue global cancel all orders work: {e}");
562 }
563 }
564 }
565 }
566 }
567 });
568
569 Ok(())
570 }
571
572 /// Get a execution report channel that sends to this engine
573 #[must_use]
574 pub fn report_sender(&self) -> Sender<ExecutionReport> {
575 self.report_sender.clone()
576 }
577
578 /// Get an execution event channel that this engine listens to
579 #[must_use]
580 pub fn event_receiver(&self) -> Receiver<ExecutionEvent> {
581 self.event_receiver.clone()
582 }
583}
584
585/// Interface for connecting to trading venues
586#[async_trait]
587pub trait Exchange: Send + Sync {
588 /// Returns the venue this exchange connects to
589 fn venue(&self) -> Venue;
590
591 /// Places a new order on the exchange
592 async fn place_order(&self, order: Order, report_sender: Sender<ExecutionReport>)
593 -> Result<()>;
594
595 /// Cancels an existing order
596 async fn cancel_order(
597 &self,
598 order_id: SmartString,
599 report_sender: Sender<ExecutionReport>,
600 ) -> Result<()>;
601
602 /// Modifies an existing order
603 async fn modify_order(
604 &self,
605 order_id: SmartString,
606 new_price: Option<Decimal>,
607 new_quantity: Option<Decimal>,
608 report_sender: Sender<ExecutionReport>,
609 ) -> Result<()>;
610
611 /// Cancels all open orders, optionally filtered by instrument
612 async fn cancel_all_orders(
613 &self,
614 instrument_id: Option<InstrumentId>,
615 report_sender: Sender<ExecutionReport>,
616 ) -> Result<()>;
617
618 /// Retrieves the status of an order
619 async fn get_order_status(&self, order_id: &str) -> Result<OrderStatus>;
620
621 /// Connects to the exchange
622 async fn connect(&self, report_sender: Sender<ExecutionReport>) -> Result<()>;
623
624 /// Disconnects from the exchange
625 async fn disconnect(&self) -> Result<()>;
626
627 /// Checks if connected to the exchange
628 async fn is_connected(&self) -> bool;
629
630 /// Gets a list of instruments supported by this exchange
631 async fn get_instruments(&self) -> Result<SmallVec<[InstrumentId; 32]>>;
632
633 /// Sends a FIX message to the exchange
634 async fn send_fix_message(&self, message: Vec<u8>) -> Result<()>;
635
636 /// Receives a FIX message from the exchange
637 async fn receive_fix_message(&self) -> Result<Vec<u8>>;
638}