1use std::sync::{
7 Arc,
8 atomic::{AtomicBool, AtomicUsize, Ordering},
9};
10
11use anyhow::{Result, anyhow};
12use crossbeam::queue::SegQueue;
13use dashmap::DashMap;
14use flume::{Receiver, Sender};
15use quanta::Clock;
16use rust_decimal::Decimal;
17use rusty_common::SmartString;
18
19use rusty_model::{
20 enums::OrderStatus, instruments::InstrumentId, trading_order::Order, venues::Venue,
21};
22
23use crate::{Exchange, ExecutionEvent, ExecutionReport};
24
25struct ExchangeWork {
27 exchange: Arc<dyn Exchange>,
28 operation: ExchangeOperation,
29 report_sender: Sender<ExecutionReport>,
30}
31
32#[derive(Debug)]
34enum ExchangeOperation {
35 PlaceOrder(Order),
36 CancelOrder {
37 order_id: SmartString,
38 },
39 ModifyOrder {
40 order_id: SmartString,
41 new_price: Option<Decimal>,
42 new_quantity: Option<Decimal>,
43 },
44 CancelAllOrders(Option<InstrumentId>),
45}
46
47pub struct LockFreeExecutionEngine {
55 exchanges: Arc<DashMap<Venue, Arc<dyn Exchange>>>,
57
58 event_receiver: Receiver<ExecutionEvent>,
60
61 report_sender: Sender<ExecutionReport>,
63
64 clock: Clock,
66
67 order_venues: Arc<DashMap<SmartString, Venue>>,
69
70 work_queue: Arc<SegQueue<ExchangeWork>>,
72
73 shutdown: Arc<AtomicBool>,
75
76 worker_count: Arc<AtomicUsize>,
78}
79
80impl LockFreeExecutionEngine {
81 #[must_use]
83 pub fn new(
84 event_receiver: Receiver<ExecutionEvent>,
85 report_sender: Sender<ExecutionReport>,
86 ) -> Self {
87 Self {
88 exchanges: Arc::new(DashMap::new()),
89 event_receiver,
90 report_sender,
91 clock: Clock::new(),
92 order_venues: Arc::new(DashMap::new()),
93 work_queue: Arc::new(SegQueue::new()),
94 shutdown: Arc::new(AtomicBool::new(false)),
95 worker_count: Arc::new(AtomicUsize::new(0)),
96 }
97 }
98
99 pub fn register_exchange(&self, exchange: Arc<dyn Exchange>) {
101 let venue = exchange.venue();
102 self.exchanges.insert(venue, exchange);
103 }
104
105 pub async fn start(&self) -> Result<()> {
107 if self.exchanges.is_empty() {
108 return Err(anyhow!("No exchanges registered"));
109 }
110
111 let event_receiver = self.event_receiver.clone();
113 let work_queue = self.work_queue.clone();
114 let exchanges = self.exchanges.clone();
115 let order_venues = self.order_venues.clone();
116 let clock = self.clock.clone();
117 let report_sender = self.report_sender.clone();
118 let shutdown = self.shutdown.clone();
119 let worker_count = self.worker_count.clone();
120
121 let worker_threads = std::env::var("EMS_WORKER_COUNT")
123 .ok()
124 .and_then(|s| s.parse::<usize>().ok())
125 .unwrap_or(4)
126 .max(1); for worker_id in 0..worker_threads {
129 let work_queue = work_queue.clone();
130 let shutdown = shutdown.clone();
131 let worker_count = worker_count.clone();
132
133 worker_count.fetch_add(1, Ordering::SeqCst);
135
136 tokio::spawn(async move {
137 loop {
139 if shutdown.load(Ordering::Relaxed) {
141 break;
142 }
143
144 if let Some(work) = work_queue.pop() {
146 match work.operation {
148 ExchangeOperation::PlaceOrder(order) => {
149 if let Err(e) =
150 work.exchange.place_order(order, work.report_sender).await
151 {
152 log::error!("Worker {worker_id}: Failed to place order: {e}");
153 }
154 }
155 ExchangeOperation::CancelOrder { order_id } => {
156 if let Err(e) = work
157 .exchange
158 .cancel_order(order_id, work.report_sender)
159 .await
160 {
161 log::error!("Worker {worker_id}: Failed to cancel order: {e}");
162 }
163 }
164 ExchangeOperation::ModifyOrder {
165 order_id,
166 new_price,
167 new_quantity,
168 } => {
169 if let Err(e) = work
170 .exchange
171 .modify_order(
172 order_id,
173 new_price,
174 new_quantity,
175 work.report_sender,
176 )
177 .await
178 {
179 log::error!("Worker {worker_id}: Failed to modify order: {e}");
180 }
181 }
182 ExchangeOperation::CancelAllOrders(instrument_id) => {
183 if let Err(e) = work
184 .exchange
185 .cancel_all_orders(instrument_id, work.report_sender)
186 .await
187 {
188 log::error!(
189 "Worker {worker_id}: Failed to cancel all orders: {e}"
190 );
191 }
192 }
193 }
194 } else {
195 tokio::time::sleep(tokio::time::Duration::from_micros(10)).await;
197 }
198 }
199
200 worker_count.fetch_sub(1, Ordering::SeqCst);
202 log::info!("Worker {worker_id} shutting down");
203 });
204 }
205
206 tokio::spawn(async move {
208 while let Ok(event) = event_receiver.recv_async().await {
209 if shutdown.load(Ordering::Relaxed) {
211 break;
212 }
213
214 match event {
215 ExecutionEvent::OrderAccepted(order) => {
216 let venue = order.venue;
217
218 if let Some(exchange) = exchanges.get(&venue) {
220 order_venues.insert(order.id.to_string().into(), venue);
222
223 let work = ExchangeWork {
224 exchange: exchange.clone(),
225 operation: ExchangeOperation::PlaceOrder(order),
226 report_sender: report_sender.clone(),
227 };
228
229 work_queue.push(work);
231 } else {
234 let report = ExecutionReport {
236 id: {
237 let mut id = SmartString::new();
238 id.push_str("rej_");
239 id.push_str(&order.id.to_string());
240 id
241 },
242 order_id: order.id.to_string().into(),
243 exchange_timestamp: 0,
244 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
245 instrument_id: InstrumentId::new(order.symbol.clone(), order.venue),
246 status: OrderStatus::Rejected,
247 filled_quantity: Decimal::ZERO,
248 remaining_quantity: order.quantity,
249 execution_price: None,
250 reject_reason: Some(
251 format!("No exchange adapter for venue: {venue:?}").into(),
252 ),
253 exchange_execution_id: None,
254 is_final: true,
255 };
256
257 if let Err(e) = report_sender.send_async(report).await {
258 log::error!("Failed to send rejection report: {e}");
259 }
260 }
261 }
262
263 ExecutionEvent::CancelOrder {
264 order_id,
265 original_order,
266 } => {
267 let venue = order_venues
269 .get(&order_id)
270 .map_or(original_order.venue, |v| *v);
271
272 if let Some(exchange) = exchanges.get(&venue) {
274 let work = ExchangeWork {
275 exchange: exchange.clone(),
276 operation: ExchangeOperation::CancelOrder { order_id },
277 report_sender: report_sender.clone(),
278 };
279
280 work_queue.push(work);
282 }
283 }
284
285 ExecutionEvent::ModifyOrder {
286 order_id,
287 original_order,
288 new_price,
289 new_quantity,
290 } => {
291 let venue = order_venues
293 .get(&order_id)
294 .map_or(original_order.venue, |v| *v);
295
296 if let Some(exchange) = exchanges.get(&venue) {
298 let work = ExchangeWork {
299 exchange: exchange.clone(),
300 operation: ExchangeOperation::ModifyOrder {
301 order_id,
302 new_price,
303 new_quantity,
304 },
305 report_sender: report_sender.clone(),
306 };
307
308 work_queue.push(work);
310 }
311 }
312
313 ExecutionEvent::CancelAllOrders(instrument_id) => {
314 let venue = instrument_id.venue;
315
316 if let Some(exchange) = exchanges.get(&venue) {
318 let work = ExchangeWork {
319 exchange: exchange.clone(),
320 operation: ExchangeOperation::CancelAllOrders(Some(instrument_id)),
321 report_sender: report_sender.clone(),
322 };
323
324 work_queue.push(work);
326 }
327 }
328
329 ExecutionEvent::CancelAllOrdersGlobally => {
330 for entry in exchanges.iter() {
332 let work = ExchangeWork {
333 exchange: entry.value().clone(),
334 operation: ExchangeOperation::CancelAllOrders(None),
335 report_sender: report_sender.clone(),
336 };
337
338 work_queue.push(work);
340 }
341 }
342 }
343 }
344
345 log::info!("Event processor shutting down");
346 });
347
348 Ok(())
349 }
350
351 pub async fn shutdown(&self) {
353 self.shutdown.store(true, Ordering::SeqCst);
355
356 let mut retries = 0;
358 while self.worker_count.load(Ordering::SeqCst) > 0 && retries < 100 {
359 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
360 retries += 1;
361 }
362
363 if retries >= 100 {
364 log::warn!("Timeout waiting for workers to shutdown");
365 }
366 }
367
368 #[must_use]
370 pub fn queue_depth(&self) -> usize {
371 self.work_queue.len()
372 }
373
374 #[must_use]
376 pub fn report_sender(&self) -> Sender<ExecutionReport> {
377 self.report_sender.clone()
378 }
379
380 #[must_use]
382 pub fn event_receiver(&self) -> Receiver<ExecutionEvent> {
383 self.event_receiver.clone()
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390
391 #[tokio::test]
392 async fn test_lock_free_engine_creation() {
393 let (_event_tx, event_rx) = flume::unbounded();
394 let (report_tx, _report_rx) = flume::unbounded();
395
396 let engine = LockFreeExecutionEngine::new(event_rx, report_tx);
397 assert_eq!(engine.queue_depth(), 0);
398 }
399}