rusty_ems/
execution_engine.rs

1use rusty_common::SmartString;
2use rusty_common::collections::FxHashMap;
3use rusty_common::zerocopy::{BufferedStringOps, ZeroCopyStringUtils};
4use std::sync::Arc;
5
6use anyhow::{Result, anyhow};
7use async_trait::async_trait;
8use dashmap::DashMap;
9use flume::{Receiver, Sender};
10use quanta::Clock;
11use rust_decimal::Decimal;
12use smallvec::SmallVec;
13
14use rusty_model::{
15    enums::OrderStatus, instruments::InstrumentId, trading_order::Order, venues::Venue,
16};
17
18/// Events related to order execution
19#[derive(Debug, Clone)]
20pub enum ExecutionEvent {
21    /// Order accepted by the exchange
22    OrderAccepted(Order),
23
24    /// Request to cancel an existing order
25    CancelOrder {
26        /// Order ID to cancel
27        order_id: SmartString,
28
29        /// Original order details
30        original_order: Order,
31    },
32
33    /// Request to modify an existing order
34    ModifyOrder {
35        /// Order ID to modify
36        order_id: SmartString,
37
38        /// Original order details
39        original_order: Order,
40
41        /// New price (if modifying price)
42        new_price: Option<Decimal>,
43
44        /// New quantity (if modifying quantity)
45        new_quantity: Option<Decimal>,
46    },
47
48    /// Request to cancel all open orders for a specific instrument
49    CancelAllOrders(InstrumentId),
50
51    /// Request to cancel all open orders across all instruments
52    CancelAllOrdersGlobally,
53}
54
55/// Execution report from the exchange
56#[derive(Debug, Clone)]
57pub struct ExecutionReport {
58    /// Unique identifier for this execution report
59    pub id: SmartString,
60
61    /// Order ID this report is for
62    pub order_id: SmartString,
63
64    /// Exchange timestamp (in nanoseconds)
65    pub exchange_timestamp: u64,
66
67    /// System timestamp (in nanoseconds)
68    pub system_timestamp: u64,
69
70    /// Instrument ID
71    pub instrument_id: InstrumentId,
72
73    /// New status of the order
74    pub status: OrderStatus,
75
76    /// Filled quantity in this execution (0 for non-fills)
77    pub filled_quantity: Decimal,
78
79    /// Remaining quantity to be filled
80    pub remaining_quantity: Decimal,
81
82    /// Price of this execution (for fills)
83    pub execution_price: Option<Decimal>,
84
85    /// Reason for rejection (if applicable)
86    pub reject_reason: Option<SmartString>,
87
88    /// Exchange-specific execution ID
89    pub exchange_execution_id: Option<SmartString>,
90
91    /// Whether this is the final report for the order
92    pub is_final: bool,
93}
94
95/// Work item for exchange operations
96struct ExchangeWork {
97    exchange: Arc<dyn Exchange>,
98    operation: ExchangeOperation,
99    report_sender: Sender<ExecutionReport>,
100}
101
102/// Types of exchange operations
103#[derive(Debug)]
104enum ExchangeOperation {
105    PlaceOrder(Order),
106    CancelOrder {
107        order_id: SmartString,
108        instrument_id: InstrumentId,
109    },
110    ModifyOrder {
111        order_id: SmartString,
112        instrument_id: InstrumentId,
113        new_price: Option<Decimal>,
114        new_quantity: Option<Decimal>,
115    },
116    CancelAllOrders(Option<InstrumentId>),
117}
118
119/// An execution management engine that routes orders to appropriate exchange connections
120pub struct ExecutionEngine {
121    /// Map of venue to exchange adapter
122    exchanges: FxHashMap<Venue, Arc<dyn Exchange>>,
123
124    /// Receiver for execution events from OMS
125    event_receiver: Receiver<ExecutionEvent>,
126
127    /// Sender for execution reports to OMS
128    report_sender: Sender<ExecutionReport>,
129
130    /// High-precision clock
131    clock: Clock,
132
133    /// Lock-free map of order ID to venue for routing
134    order_venues: Arc<DashMap<SmartString, Venue>>,
135
136    /// Work queue for exchange operations
137    work_sender: Sender<ExchangeWork>,
138    work_receiver: Receiver<ExchangeWork>,
139
140    /// Number of worker threads for processing exchange operations
141    worker_count: usize,
142}
143
144impl ExecutionEngine {
145    /// Create a new execution engine
146    #[must_use]
147    pub fn new(
148        event_receiver: Receiver<ExecutionEvent>,
149        report_sender: Sender<ExecutionReport>,
150    ) -> Self {
151        Self::with_worker_count(event_receiver, report_sender, 4)
152    }
153
154    /// Create a new execution engine with a specific worker count
155    #[must_use]
156    pub fn with_worker_count(
157        event_receiver: Receiver<ExecutionEvent>,
158        report_sender: Sender<ExecutionReport>,
159        worker_count: usize,
160    ) -> Self {
161        let (work_sender, work_receiver) = flume::unbounded();
162
163        Self {
164            exchanges: FxHashMap::default(),
165            event_receiver,
166            report_sender,
167            clock: Clock::new(),
168            order_venues: Arc::new(DashMap::new()),
169            work_sender,
170            work_receiver,
171            worker_count: worker_count.max(1), // Ensure at least one worker
172        }
173    }
174
175    /// Register an exchange adapter with this execution engine
176    pub fn register_exchange(&mut self, exchange: Arc<dyn Exchange>) {
177        let venue = exchange.venue();
178        self.exchanges.insert(venue, exchange);
179    }
180
181    /// Start the execution engine
182    pub async fn start(&self) -> Result<()> {
183        if self.exchanges.is_empty() {
184            return Err(anyhow!("No exchanges registered"));
185        }
186
187        // Clone values needed for the loops
188        let event_receiver = self.event_receiver.clone();
189        let work_sender = self.work_sender.clone();
190        let exchanges = self.exchanges.clone();
191        let order_venues = self.order_venues.clone();
192        let clock = self.clock.clone();
193        let report_sender = self.report_sender.clone();
194
195        // Start dedicated worker pool for exchange operations
196        let work_receiver = self.work_receiver.clone();
197        let worker_count = self.worker_count;
198
199        for worker_id in 0..worker_count {
200            let work_receiver = work_receiver.clone();
201            tokio::spawn(async move {
202                while let Ok(work) = work_receiver.recv_async().await {
203                    match work.operation {
204                        ExchangeOperation::PlaceOrder(order) => {
205                            // Create references for rejection report before moving order
206                            let order_id_for_rejection = order.id;
207                            let symbol_for_rejection = order.symbol.clone();
208                            let venue_for_rejection = order.venue;
209                            let quantity_for_rejection = order.quantity;
210
211                            if let Err(e) = work
212                                .exchange
213                                .place_order(order, work.report_sender.clone())
214                                .await
215                            {
216                                log::error!("Worker {worker_id}: Failed to place order: {e}");
217
218                                // Send rejection ExecutionReport using zero-copy utilities
219                                let rejection = ExecutionReport {
220                                    id: BufferedStringOps::create_rejection_id_buffered(
221                                        &order_id_for_rejection,
222                                    ),
223                                    order_id: ZeroCopyStringUtils::display_to_smart_string(
224                                        &order_id_for_rejection,
225                                    ),
226                                    exchange_timestamp: 0, // No exchange interaction occurred
227                                    system_timestamp: 0,   // Will be set by receiving system
228                                    instrument_id: InstrumentId::new(
229                                        symbol_for_rejection,
230                                        venue_for_rejection,
231                                    ),
232                                    status: OrderStatus::Rejected,
233                                    filled_quantity: Decimal::ZERO,
234                                    remaining_quantity: quantity_for_rejection,
235                                    execution_price: None,
236                                    reject_reason: Some(
237                                        ZeroCopyStringUtils::error_to_smart_string(&e),
238                                    ),
239                                    exchange_execution_id: None,
240                                    is_final: true,
241                                };
242
243                                // Try to send rejection report (ignore send errors to avoid infinite loops)
244                                if let Err(send_err) =
245                                    work.report_sender.send_async(rejection).await
246                                {
247                                    log::warn!(
248                                        "Worker {worker_id}: Failed to send rejection report: {send_err}"
249                                    );
250                                }
251                            }
252                        }
253                        ExchangeOperation::CancelOrder {
254                            order_id,
255                            instrument_id,
256                        } => {
257                            // Create reference for failure report before moving order_id
258                            let order_id_for_failure = order_id.clone();
259
260                            if let Err(e) = work
261                                .exchange
262                                .cancel_order(order_id, work.report_sender.clone())
263                                .await
264                            {
265                                log::error!("Worker {worker_id}: Failed to cancel order: {e}");
266
267                                // Send cancellation failure report using zero-copy utilities
268                                let cancel_failure = ExecutionReport {
269                                    id: {
270                                        let mut id = SmartString::new();
271                                        id.push_str("cancel_fail_");
272                                        id.push_str(&order_id_for_failure);
273                                        id
274                                    },
275                                    order_id: order_id_for_failure,
276                                    exchange_timestamp: 0,
277                                    system_timestamp: 0,
278                                    instrument_id,
279                                    status: OrderStatus::Rejected,
280                                    filled_quantity: Decimal::ZERO,
281                                    remaining_quantity: Decimal::ZERO,
282                                    execution_price: None,
283                                    reject_reason: Some({
284                                        let mut reason = SmartString::new();
285                                        reason.push_str("Cancel failed: ");
286                                        reason.push_str(&e.to_string());
287                                        reason
288                                    }),
289                                    exchange_execution_id: None,
290                                    is_final: false, // Order may still be active
291                                };
292
293                                if let Err(send_err) =
294                                    work.report_sender.send_async(cancel_failure).await
295                                {
296                                    log::warn!(
297                                        "Worker {worker_id}: Failed to send cancel failure report: {send_err}"
298                                    );
299                                }
300                            }
301                        }
302                        ExchangeOperation::ModifyOrder {
303                            order_id,
304                            instrument_id,
305                            new_price,
306                            new_quantity,
307                        } => {
308                            // Create reference for failure report before moving order_id
309                            let order_id_for_failure = order_id.clone();
310
311                            if let Err(e) = work
312                                .exchange
313                                .modify_order(
314                                    order_id,
315                                    new_price,
316                                    new_quantity,
317                                    work.report_sender.clone(),
318                                )
319                                .await
320                            {
321                                log::error!("Worker {worker_id}: Failed to modify order: {e}");
322
323                                // Send modification failure report using zero-copy utilities
324                                let modify_failure = ExecutionReport {
325                                    id: {
326                                        let mut id = SmartString::new();
327                                        id.push_str("modify_fail_");
328                                        id.push_str(&order_id_for_failure);
329                                        id
330                                    },
331                                    order_id: order_id_for_failure,
332                                    exchange_timestamp: 0,
333                                    system_timestamp: 0,
334                                    instrument_id,
335                                    status: OrderStatus::Rejected, // Order modification failed
336                                    filled_quantity: Decimal::ZERO,
337                                    remaining_quantity: Decimal::ZERO,
338                                    execution_price: None,
339                                    reject_reason: Some({
340                                        let mut reason = SmartString::new();
341                                        reason.push_str("Modify failed: ");
342                                        reason.push_str(&e.to_string());
343                                        reason
344                                    }),
345                                    exchange_execution_id: None,
346                                    is_final: false,
347                                };
348
349                                if let Err(send_err) =
350                                    work.report_sender.send_async(modify_failure).await
351                                {
352                                    log::warn!(
353                                        "Worker {worker_id}: Failed to send modify failure report: {send_err}"
354                                    );
355                                }
356                            }
357                        }
358                        ExchangeOperation::CancelAllOrders(instrument_id) => {
359                            if let Err(e) = work
360                                .exchange
361                                .cancel_all_orders(
362                                    instrument_id.clone(),
363                                    work.report_sender.clone(),
364                                )
365                                .await
366                            {
367                                log::error!("Worker {worker_id}: Failed to cancel all orders: {e}");
368
369                                // Send cancel all failure report
370                                let cancel_all_failure = ExecutionReport {
371                                    id: {
372                                        let mut id = SmartString::new();
373                                        id.push_str("cancel_all_fail_");
374                                        id.push_str(
375                                            instrument_id
376                                                .as_ref()
377                                                .map_or("ALL", |i| i.symbol.as_str()),
378                                        );
379                                        id
380                                    },
381                                    order_id: "CANCEL_ALL_FAILED".into(),
382                                    exchange_timestamp: 0,
383                                    system_timestamp: 0,
384                                    instrument_id: instrument_id
385                                        .unwrap_or_else(|| InstrumentId::new("ALL", Venue::Test)),
386                                    status: OrderStatus::Rejected,
387                                    filled_quantity: Decimal::ZERO,
388                                    remaining_quantity: Decimal::ZERO,
389                                    execution_price: None,
390                                    reject_reason: Some({
391                                        let mut reason = SmartString::new();
392                                        reason.push_str("Cancel all failed: ");
393                                        reason.push_str(&e.to_string());
394                                        reason
395                                    }),
396                                    exchange_execution_id: None,
397                                    is_final: false,
398                                };
399
400                                if let Err(send_err) =
401                                    work.report_sender.send_async(cancel_all_failure).await
402                                {
403                                    log::warn!(
404                                        "Worker {worker_id}: Failed to send cancel all failure report: {send_err}"
405                                    );
406                                }
407                            }
408                        }
409                    }
410                }
411            });
412        }
413
414        // Start the main event processing loop
415        tokio::spawn(async move {
416            while let Ok(event) = event_receiver.recv_async().await {
417                match event {
418                    ExecutionEvent::OrderAccepted(order) => {
419                        let venue = order.venue;
420
421                        // Route to the appropriate exchange
422                        if let Some(exchange) = exchanges.get(&venue) {
423                            let order_id_str =
424                                ZeroCopyStringUtils::display_to_smart_string(&order.id);
425
426                            // Store the venue for this order only after confirming exchange exists
427                            order_venues.insert(order_id_str.clone(), venue);
428
429                            let work = ExchangeWork {
430                                exchange: exchange.clone(),
431                                operation: ExchangeOperation::PlaceOrder(order),
432                                report_sender: report_sender.clone(),
433                            };
434
435                            if let Err(e) = work_sender.send_async(work).await {
436                                log::error!("Failed to queue place order work: {e}");
437                                // Remove from venues map if we failed to queue the work
438                                order_venues.remove(&order_id_str);
439                            }
440                        } else {
441                            // Send rejection report if no exchange is available using zero-copy utilities
442                            let report = ExecutionReport {
443                                id: BufferedStringOps::create_rejection_id_buffered(&order.id),
444                                order_id: ZeroCopyStringUtils::display_to_smart_string(&order.id),
445                                exchange_timestamp: 0,
446                                system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
447                                instrument_id: InstrumentId::new(order.symbol, order.venue),
448                                status: OrderStatus::Rejected,
449                                filled_quantity: Decimal::ZERO,
450                                remaining_quantity: order.quantity,
451                                execution_price: None,
452                                reject_reason: Some({
453                                    let mut reason = SmartString::new();
454                                    reason.push_str("No exchange adapter for venue: ");
455                                    reason.push_str(&format!("{venue:?}"));
456                                    reason
457                                }),
458                                exchange_execution_id: None,
459                                is_final: true,
460                            };
461
462                            if let Err(e) = report_sender.send_async(report).await {
463                                log::error!("Failed to send rejection report: {e}");
464                            }
465                        }
466                    }
467
468                    ExecutionEvent::CancelOrder {
469                        order_id,
470                        original_order,
471                    } => {
472                        // Lookup the venue for this order (lock-free)
473                        let venue = if let Some(v) = order_venues.get(&order_id) {
474                            *v
475                        } else {
476                            original_order.venue
477                        };
478
479                        // Route to the appropriate exchange
480                        if let Some(exchange) = exchanges.get(&venue) {
481                            let work = ExchangeWork {
482                                exchange: exchange.clone(),
483                                operation: ExchangeOperation::CancelOrder {
484                                    order_id,
485                                    instrument_id: InstrumentId::new(
486                                        original_order.symbol.clone(),
487                                        original_order.venue,
488                                    ),
489                                },
490                                report_sender: report_sender.clone(),
491                            };
492
493                            if let Err(e) = work_sender.send_async(work).await {
494                                log::error!("Failed to queue cancel order work: {e}");
495                            }
496                        }
497                    }
498
499                    ExecutionEvent::ModifyOrder {
500                        order_id,
501                        original_order,
502                        new_price,
503                        new_quantity,
504                    } => {
505                        // Lookup the venue for this order (lock-free)
506                        let venue = if let Some(v) = order_venues.get(&order_id) {
507                            *v
508                        } else {
509                            original_order.venue
510                        };
511
512                        // Route to the appropriate exchange
513                        if let Some(exchange) = exchanges.get(&venue) {
514                            let work = ExchangeWork {
515                                exchange: exchange.clone(),
516                                operation: ExchangeOperation::ModifyOrder {
517                                    order_id,
518                                    instrument_id: InstrumentId::new(
519                                        original_order.symbol.clone(),
520                                        original_order.venue,
521                                    ),
522                                    new_price,
523                                    new_quantity,
524                                },
525                                report_sender: report_sender.clone(),
526                            };
527
528                            if let Err(e) = work_sender.send_async(work).await {
529                                log::error!("Failed to queue modify order work: {e}");
530                            }
531                        }
532                    }
533
534                    ExecutionEvent::CancelAllOrders(instrument_id) => {
535                        let venue = instrument_id.venue;
536
537                        // Route to the appropriate exchange
538                        if let Some(exchange) = exchanges.get(&venue) {
539                            let work = ExchangeWork {
540                                exchange: exchange.clone(),
541                                operation: ExchangeOperation::CancelAllOrders(Some(instrument_id)),
542                                report_sender: report_sender.clone(),
543                            };
544
545                            if let Err(e) = work_sender.send_async(work).await {
546                                log::error!("Failed to queue cancel all orders work: {e}");
547                            }
548                        }
549                    }
550
551                    ExecutionEvent::CancelAllOrdersGlobally => {
552                        // Send cancel all orders to every registered exchange
553                        for exchange in exchanges.values() {
554                            let work = ExchangeWork {
555                                exchange: exchange.clone(),
556                                operation: ExchangeOperation::CancelAllOrders(None),
557                                report_sender: report_sender.clone(),
558                            };
559
560                            if let Err(e) = work_sender.send_async(work).await {
561                                log::error!("Failed to queue global cancel all orders work: {e}");
562                            }
563                        }
564                    }
565                }
566            }
567        });
568
569        Ok(())
570    }
571
572    /// Get a execution report channel that sends to this engine
573    #[must_use]
574    pub fn report_sender(&self) -> Sender<ExecutionReport> {
575        self.report_sender.clone()
576    }
577
578    /// Get an execution event channel that this engine listens to
579    #[must_use]
580    pub fn event_receiver(&self) -> Receiver<ExecutionEvent> {
581        self.event_receiver.clone()
582    }
583}
584
585/// Interface for connecting to trading venues
586#[async_trait]
587pub trait Exchange: Send + Sync {
588    /// Returns the venue this exchange connects to
589    fn venue(&self) -> Venue;
590
591    /// Places a new order on the exchange
592    async fn place_order(&self, order: Order, report_sender: Sender<ExecutionReport>)
593    -> Result<()>;
594
595    /// Cancels an existing order
596    async fn cancel_order(
597        &self,
598        order_id: SmartString,
599        report_sender: Sender<ExecutionReport>,
600    ) -> Result<()>;
601
602    /// Modifies an existing order
603    async fn modify_order(
604        &self,
605        order_id: SmartString,
606        new_price: Option<Decimal>,
607        new_quantity: Option<Decimal>,
608        report_sender: Sender<ExecutionReport>,
609    ) -> Result<()>;
610
611    /// Cancels all open orders, optionally filtered by instrument
612    async fn cancel_all_orders(
613        &self,
614        instrument_id: Option<InstrumentId>,
615        report_sender: Sender<ExecutionReport>,
616    ) -> Result<()>;
617
618    /// Retrieves the status of an order
619    async fn get_order_status(&self, order_id: &str) -> Result<OrderStatus>;
620
621    /// Connects to the exchange
622    async fn connect(&self, report_sender: Sender<ExecutionReport>) -> Result<()>;
623
624    /// Disconnects from the exchange
625    async fn disconnect(&self) -> Result<()>;
626
627    /// Checks if connected to the exchange
628    async fn is_connected(&self) -> bool;
629
630    /// Gets a list of instruments supported by this exchange
631    async fn get_instruments(&self) -> Result<SmallVec<[InstrumentId; 32]>>;
632
633    /// Sends a FIX message to the exchange
634    async fn send_fix_message(&self, message: Vec<u8>) -> Result<()>;
635
636    /// Receives a FIX message from the exchange
637    async fn receive_fix_message(&self) -> Result<Vec<u8>>;
638}