rusty_ems/
execution_engine_lockfree.rs

1//! Lock-free execution engine implementation using `crossbeam::queue::SegQueue`
2//!
3//! This implementation eliminates contention by using lock-free data structures
4//! for all critical paths in the execution pipeline.
5
6use std::sync::{
7    Arc,
8    atomic::{AtomicBool, AtomicUsize, Ordering},
9};
10
11use anyhow::{Result, anyhow};
12use crossbeam::queue::SegQueue;
13use dashmap::DashMap;
14use flume::{Receiver, Sender};
15use quanta::Clock;
16use rust_decimal::Decimal;
17use rusty_common::SmartString;
18
19use rusty_model::{
20    enums::OrderStatus, instruments::InstrumentId, trading_order::Order, venues::Venue,
21};
22
23use crate::{Exchange, ExecutionEvent, ExecutionReport};
24
25/// Work item for exchange operations with lock-free passing
26struct ExchangeWork {
27    exchange: Arc<dyn Exchange>,
28    operation: ExchangeOperation,
29    report_sender: Sender<ExecutionReport>,
30}
31
32/// Types of exchange operations
33#[derive(Debug)]
34enum ExchangeOperation {
35    PlaceOrder(Order),
36    CancelOrder {
37        order_id: SmartString,
38    },
39    ModifyOrder {
40        order_id: SmartString,
41        new_price: Option<Decimal>,
42        new_quantity: Option<Decimal>,
43    },
44    CancelAllOrders(Option<InstrumentId>),
45}
46
47/// Lock-free execution engine using `crossbeam::queue::SegQueue`
48///
49/// This implementation provides:
50/// - Zero contention on the critical path
51/// - Lock-free work distribution
52/// - Efficient batch processing
53/// - Cache-friendly memory layout
54pub struct LockFreeExecutionEngine {
55    /// Map of venue to exchange adapter
56    exchanges: Arc<DashMap<Venue, Arc<dyn Exchange>>>,
57
58    /// Receiver for execution events from OMS
59    event_receiver: Receiver<ExecutionEvent>,
60
61    /// Sender for execution reports to OMS
62    report_sender: Sender<ExecutionReport>,
63
64    /// High-precision clock
65    clock: Clock,
66
67    /// Lock-free map of order ID to venue for routing
68    order_venues: Arc<DashMap<SmartString, Venue>>,
69
70    /// Lock-free work queue for exchange operations
71    work_queue: Arc<SegQueue<ExchangeWork>>,
72
73    /// Atomic flag to signal shutdown
74    shutdown: Arc<AtomicBool>,
75
76    /// Worker thread count
77    worker_count: Arc<AtomicUsize>,
78}
79
80impl LockFreeExecutionEngine {
81    /// Create a new lock-free execution engine
82    #[must_use]
83    pub fn new(
84        event_receiver: Receiver<ExecutionEvent>,
85        report_sender: Sender<ExecutionReport>,
86    ) -> Self {
87        Self {
88            exchanges: Arc::new(DashMap::new()),
89            event_receiver,
90            report_sender,
91            clock: Clock::new(),
92            order_venues: Arc::new(DashMap::new()),
93            work_queue: Arc::new(SegQueue::new()),
94            shutdown: Arc::new(AtomicBool::new(false)),
95            worker_count: Arc::new(AtomicUsize::new(0)),
96        }
97    }
98
99    /// Register an exchange adapter with this execution engine
100    pub fn register_exchange(&self, exchange: Arc<dyn Exchange>) {
101        let venue = exchange.venue();
102        self.exchanges.insert(venue, exchange);
103    }
104
105    /// Start the lock-free execution engine
106    pub async fn start(&self) -> Result<()> {
107        if self.exchanges.is_empty() {
108            return Err(anyhow!("No exchanges registered"));
109        }
110
111        // Clone values needed for the loops
112        let event_receiver = self.event_receiver.clone();
113        let work_queue = self.work_queue.clone();
114        let exchanges = self.exchanges.clone();
115        let order_venues = self.order_venues.clone();
116        let clock = self.clock.clone();
117        let report_sender = self.report_sender.clone();
118        let shutdown = self.shutdown.clone();
119        let worker_count = self.worker_count.clone();
120
121        // Start dedicated worker pool for exchange operations
122        let worker_threads = std::env::var("EMS_WORKER_COUNT")
123            .ok()
124            .and_then(|s| s.parse::<usize>().ok())
125            .unwrap_or(4)
126            .max(1); // Ensure at least one worker
127
128        for worker_id in 0..worker_threads {
129            let work_queue = work_queue.clone();
130            let shutdown = shutdown.clone();
131            let worker_count = worker_count.clone();
132
133            // Increment worker count
134            worker_count.fetch_add(1, Ordering::SeqCst);
135
136            tokio::spawn(async move {
137                // Worker main loop - process work items from lock-free queue
138                loop {
139                    // Check shutdown flag
140                    if shutdown.load(Ordering::Relaxed) {
141                        break;
142                    }
143
144                    // Try to get work from the lock-free queue
145                    if let Some(work) = work_queue.pop() {
146                        // Process the work item
147                        match work.operation {
148                            ExchangeOperation::PlaceOrder(order) => {
149                                if let Err(e) =
150                                    work.exchange.place_order(order, work.report_sender).await
151                                {
152                                    log::error!("Worker {worker_id}: Failed to place order: {e}");
153                                }
154                            }
155                            ExchangeOperation::CancelOrder { order_id } => {
156                                if let Err(e) = work
157                                    .exchange
158                                    .cancel_order(order_id, work.report_sender)
159                                    .await
160                                {
161                                    log::error!("Worker {worker_id}: Failed to cancel order: {e}");
162                                }
163                            }
164                            ExchangeOperation::ModifyOrder {
165                                order_id,
166                                new_price,
167                                new_quantity,
168                            } => {
169                                if let Err(e) = work
170                                    .exchange
171                                    .modify_order(
172                                        order_id,
173                                        new_price,
174                                        new_quantity,
175                                        work.report_sender,
176                                    )
177                                    .await
178                                {
179                                    log::error!("Worker {worker_id}: Failed to modify order: {e}");
180                                }
181                            }
182                            ExchangeOperation::CancelAllOrders(instrument_id) => {
183                                if let Err(e) = work
184                                    .exchange
185                                    .cancel_all_orders(instrument_id, work.report_sender)
186                                    .await
187                                {
188                                    log::error!(
189                                        "Worker {worker_id}: Failed to cancel all orders: {e}"
190                                    );
191                                }
192                            }
193                        }
194                    } else {
195                        // No work available, yield to prevent busy-waiting
196                        tokio::time::sleep(tokio::time::Duration::from_micros(10)).await;
197                    }
198                }
199
200                // Decrement worker count on exit
201                worker_count.fetch_sub(1, Ordering::SeqCst);
202                log::info!("Worker {worker_id} shutting down");
203            });
204        }
205
206        // Start the main event processing loop
207        tokio::spawn(async move {
208            while let Ok(event) = event_receiver.recv_async().await {
209                // Check shutdown flag
210                if shutdown.load(Ordering::Relaxed) {
211                    break;
212                }
213
214                match event {
215                    ExecutionEvent::OrderAccepted(order) => {
216                        let venue = order.venue;
217
218                        // Route to the appropriate exchange
219                        if let Some(exchange) = exchanges.get(&venue) {
220                            // Store the venue for this order only after confirming exchange exists
221                            order_venues.insert(order.id.to_string().into(), venue);
222
223                            let work = ExchangeWork {
224                                exchange: exchange.clone(),
225                                operation: ExchangeOperation::PlaceOrder(order),
226                                report_sender: report_sender.clone(),
227                            };
228
229                            // Push to lock-free queue
230                            work_queue.push(work);
231                            // Note: With lock-free queue, we can't check if push failed
232                            // as push() always succeeds (unbounded queue)
233                        } else {
234                            // Send rejection report if no exchange is available
235                            let report = ExecutionReport {
236                                id: {
237                                    let mut id = SmartString::new();
238                                    id.push_str("rej_");
239                                    id.push_str(&order.id.to_string());
240                                    id
241                                },
242                                order_id: order.id.to_string().into(),
243                                exchange_timestamp: 0,
244                                system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
245                                instrument_id: InstrumentId::new(order.symbol.clone(), order.venue),
246                                status: OrderStatus::Rejected,
247                                filled_quantity: Decimal::ZERO,
248                                remaining_quantity: order.quantity,
249                                execution_price: None,
250                                reject_reason: Some(
251                                    format!("No exchange adapter for venue: {venue:?}").into(),
252                                ),
253                                exchange_execution_id: None,
254                                is_final: true,
255                            };
256
257                            if let Err(e) = report_sender.send_async(report).await {
258                                log::error!("Failed to send rejection report: {e}");
259                            }
260                        }
261                    }
262
263                    ExecutionEvent::CancelOrder {
264                        order_id,
265                        original_order,
266                    } => {
267                        // Lookup the venue for this order (lock-free)
268                        let venue = order_venues
269                            .get(&order_id)
270                            .map_or(original_order.venue, |v| *v);
271
272                        // Route to the appropriate exchange
273                        if let Some(exchange) = exchanges.get(&venue) {
274                            let work = ExchangeWork {
275                                exchange: exchange.clone(),
276                                operation: ExchangeOperation::CancelOrder { order_id },
277                                report_sender: report_sender.clone(),
278                            };
279
280                            // Push to lock-free queue
281                            work_queue.push(work);
282                        }
283                    }
284
285                    ExecutionEvent::ModifyOrder {
286                        order_id,
287                        original_order,
288                        new_price,
289                        new_quantity,
290                    } => {
291                        // Lookup the venue for this order (lock-free)
292                        let venue = order_venues
293                            .get(&order_id)
294                            .map_or(original_order.venue, |v| *v);
295
296                        // Route to the appropriate exchange
297                        if let Some(exchange) = exchanges.get(&venue) {
298                            let work = ExchangeWork {
299                                exchange: exchange.clone(),
300                                operation: ExchangeOperation::ModifyOrder {
301                                    order_id,
302                                    new_price,
303                                    new_quantity,
304                                },
305                                report_sender: report_sender.clone(),
306                            };
307
308                            // Push to lock-free queue
309                            work_queue.push(work);
310                        }
311                    }
312
313                    ExecutionEvent::CancelAllOrders(instrument_id) => {
314                        let venue = instrument_id.venue;
315
316                        // Route to the appropriate exchange
317                        if let Some(exchange) = exchanges.get(&venue) {
318                            let work = ExchangeWork {
319                                exchange: exchange.clone(),
320                                operation: ExchangeOperation::CancelAllOrders(Some(instrument_id)),
321                                report_sender: report_sender.clone(),
322                            };
323
324                            // Push to lock-free queue
325                            work_queue.push(work);
326                        }
327                    }
328
329                    ExecutionEvent::CancelAllOrdersGlobally => {
330                        // Send cancel all orders to every registered exchange
331                        for entry in exchanges.iter() {
332                            let work = ExchangeWork {
333                                exchange: entry.value().clone(),
334                                operation: ExchangeOperation::CancelAllOrders(None),
335                                report_sender: report_sender.clone(),
336                            };
337
338                            // Push to lock-free queue
339                            work_queue.push(work);
340                        }
341                    }
342                }
343            }
344
345            log::info!("Event processor shutting down");
346        });
347
348        Ok(())
349    }
350
351    /// Shutdown the execution engine gracefully
352    pub async fn shutdown(&self) {
353        // Signal shutdown
354        self.shutdown.store(true, Ordering::SeqCst);
355
356        // Wait for all workers to finish
357        let mut retries = 0;
358        while self.worker_count.load(Ordering::SeqCst) > 0 && retries < 100 {
359            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
360            retries += 1;
361        }
362
363        if retries >= 100 {
364            log::warn!("Timeout waiting for workers to shutdown");
365        }
366    }
367
368    /// Get queue depth for monitoring
369    #[must_use]
370    pub fn queue_depth(&self) -> usize {
371        self.work_queue.len()
372    }
373
374    /// Get a execution report channel that sends to this engine
375    #[must_use]
376    pub fn report_sender(&self) -> Sender<ExecutionReport> {
377        self.report_sender.clone()
378    }
379
380    /// Get an execution event channel that this engine listens to
381    #[must_use]
382    pub fn event_receiver(&self) -> Receiver<ExecutionEvent> {
383        self.event_receiver.clone()
384    }
385}
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390
391    #[tokio::test]
392    async fn test_lock_free_engine_creation() {
393        let (_event_tx, event_rx) = flume::unbounded();
394        let (report_tx, _report_rx) = flume::unbounded();
395
396        let engine = LockFreeExecutionEngine::new(event_rx, report_tx);
397        assert_eq!(engine.queue_depth(), 0);
398    }
399}