rusty_ems/execution_engine_optimized.rs
1//! Optimized execution engine for high-frequency trading
2//!
3//! This module provides an improved implementation of the execution engine
4//! with enhancements for better performance in high-frequency trading scenarios.
5
6use rusty_common::collections::FxHashMap;
7use std::sync::Arc;
8use std::time::Duration;
9
10/// Maximum number of events to process in a single batch
11const MAX_BATCH_SIZE: usize = 100;
12
13use anyhow::{Result, anyhow};
14use async_trait::async_trait;
15use flume::{Receiver, Sender};
16use log::{debug, error, info, warn};
17use parking_lot::RwLock;
18use quanta::Clock;
19use rust_decimal::Decimal;
20use rusty_common::SmartString;
21use smallvec::SmallVec;
22use tokio::task::JoinHandle;
23
24use crate::error::batch_errors::BatchResult;
25
26use rusty_model::{
27 enums::OrderStatus, instruments::InstrumentId, trading_order::Order, venues::Venue,
28};
29
30/// Cache-line aligned statistics for execution performance monitoring
31#[derive(Debug, Clone, Default)]
32#[repr(align(64))]
33pub struct ExecutionStats {
34 /// Total number of orders placed
35 pub orders_placed: u64,
36
37 /// Total number of orders cancelled
38 pub orders_cancelled: u64,
39
40 /// Total number of orders modified
41 pub orders_modified: u64,
42
43 /// Total number of orders filled
44 pub orders_filled: u64,
45
46 /// Total number of orders rejected
47 pub orders_rejected: u64,
48
49 /// Average order latency in nanoseconds
50 /// (from request to acknowledgement)
51 pub avg_order_latency_ns: u64,
52
53 /// Maximum order latency in nanoseconds
54 pub max_order_latency_ns: u64,
55
56 /// Minimum order latency in nanoseconds
57 pub min_order_latency_ns: u64,
58
59 /// Number of latency samples collected
60 pub latency_samples: u64,
61
62 /// Last order timestamp in nanoseconds
63 pub last_order_timestamp: u64,
64}
65
66/// Events related to order execution
67#[derive(Debug, Clone)]
68pub enum ExecutionEvent {
69 /// New order submitted to the exchange
70 NewOrder(Order),
71
72 /// Request to cancel an existing order
73 CancelOrder {
74 /// Order ID to cancel
75 order_id: SmartString,
76
77 /// Original order details
78 original_order: Order,
79 },
80
81 /// Request to modify an existing order
82 ModifyOrder {
83 /// Order ID to modify
84 order_id: SmartString,
85
86 /// Original order details
87 original_order: Order,
88
89 /// New price (if modifying price)
90 new_price: Option<Decimal>,
91
92 /// New quantity (if modifying quantity)
93 new_quantity: Option<Decimal>,
94 },
95
96 /// Request to cancel all open orders for a specific instrument
97 CancelAllOrders(InstrumentId),
98
99 /// Request to cancel all open orders across all instruments
100 CancelAllOrdersGlobally,
101
102 /// Request to bulk place multiple orders (for improved throughput)
103 BulkNewOrders(Box<SmallVec<[Order; 8]>>),
104
105 /// Request to bulk cancel multiple orders (for improved throughput)
106 BulkCancelOrders(Box<SmallVec<[(SmartString, Order); 8]>>),
107}
108
109/// Execution report from the exchange
110#[derive(Debug, Clone)]
111pub struct ExecutionReport {
112 /// Unique identifier for this execution report
113 pub id: SmartString,
114
115 /// Order ID this report is for
116 pub order_id: SmartString,
117
118 /// Exchange timestamp (in nanoseconds)
119 pub exchange_timestamp: u64,
120
121 /// System timestamp (in nanoseconds)
122 pub system_timestamp: u64,
123
124 /// Instrument ID
125 pub instrument_id: InstrumentId,
126
127 /// New status of the order
128 pub status: OrderStatus,
129
130 /// Filled quantity in this execution (0 for non-fills)
131 pub filled_quantity: Decimal,
132
133 /// Remaining quantity to be filled
134 pub remaining_quantity: Decimal,
135
136 /// Price of this execution (for fills)
137 pub execution_price: Option<Decimal>,
138
139 /// Reason for rejection (if applicable)
140 pub reject_reason: Option<SmartString>,
141
142 /// Exchange-specific execution ID
143 pub exchange_execution_id: Option<SmartString>,
144
145 /// Whether this is the final report for the order
146 pub is_final: bool,
147
148 /// Request timestamp when the order was originally submitted (in nanoseconds)
149 /// Used for latency calculation
150 pub request_timestamp: Option<u64>,
151}
152
153/// Optimized, cache-aware execution engine for high-frequency trading
154#[repr(align(64))]
155pub struct OptimizedExecutionEngine {
156 /// Map of venue to exchange adapter
157 exchanges: FxHashMap<Venue, Arc<dyn Exchange>>,
158
159 /// Receiver for execution events from OMS
160 event_rx: Receiver<ExecutionEvent>,
161
162 /// Sender for execution reports to OMS
163 report_tx: Sender<ExecutionReport>,
164
165 /// High-precision clock
166 clock: Clock,
167
168 /// Map of order ID to venue for routing
169 order_venues: Arc<RwLock<FxHashMap<SmartString, Venue>>>,
170
171 /// Execution statistics for performance monitoring
172 stats: Arc<RwLock<ExecutionStats>>,
173
174 /// Batch size for processing events
175 batch_size: usize,
176
177 /// Maximum number of concurrent tasks
178 max_concurrent_tasks: usize,
179
180 /// Active tasks handles
181 active_tasks: Arc<RwLock<Vec<JoinHandle<()>>>>,
182
183 /// Whether the engine is running
184 running: Arc<RwLock<bool>>,
185}
186
187impl OptimizedExecutionEngine {
188 /// Create a new execution engine
189 #[must_use]
190 pub fn new(event_rx: Receiver<ExecutionEvent>, report_tx: Sender<ExecutionReport>) -> Self {
191 Self {
192 exchanges: FxHashMap::default(),
193 event_rx,
194 report_tx,
195 clock: Clock::new(),
196 order_venues: Arc::new(RwLock::new(FxHashMap::default())),
197 stats: Arc::new(RwLock::new(ExecutionStats::default())),
198 batch_size: 32, // Default batch size, can be tuned
199 max_concurrent_tasks: 100, // Default max concurrent tasks
200 active_tasks: Arc::new(RwLock::new(Vec::new())),
201 running: Arc::new(RwLock::new(false)),
202 }
203 }
204
205 /// Create a new execution engine with custom batch size and concurrency
206 #[must_use]
207 pub fn with_options(
208 event_rx: Receiver<ExecutionEvent>,
209 report_tx: Sender<ExecutionReport>,
210 batch_size: usize,
211 max_concurrent_tasks: usize,
212 ) -> Self {
213 Self {
214 exchanges: FxHashMap::default(),
215 event_rx,
216 report_tx,
217 clock: Clock::new(),
218 order_venues: Arc::new(RwLock::new(FxHashMap::default())),
219 stats: Arc::new(RwLock::new(ExecutionStats::default())),
220 batch_size,
221 max_concurrent_tasks,
222 active_tasks: Arc::new(RwLock::new(Vec::new())),
223 running: Arc::new(RwLock::new(false)),
224 }
225 }
226
227 /// Register an exchange adapter with this execution engine
228 pub fn register_exchange(&mut self, exchange: Arc<dyn Exchange>) {
229 let venue = exchange.venue();
230 self.exchanges.insert(venue, exchange);
231 }
232
233 /// Process a batch of execution events for improved throughput
234 async fn process_event_batch(&self, events: Vec<ExecutionEvent>) -> Result<()> {
235 // Extract reference to shared data
236 let clock = &self.clock;
237 let exchanges = &self.exchanges;
238 let order_venues = &self.order_venues;
239 let report_tx = &self.report_tx;
240 let stats = &self.stats;
241
242 // Group events by venue for better batching
243 let mut new_orders_by_venue: FxHashMap<Venue, SmallVec<[Order; 8]>> = FxHashMap::default();
244 let mut cancel_orders_by_venue: FxHashMap<Venue, SmallVec<[(SmartString, Order); 8]>> =
245 FxHashMap::default();
246
247 // Process each event appropriately
248 for event in events {
249 match event {
250 ExecutionEvent::NewOrder(order) => {
251 // Store the venue for this order for later lookups
252 let venue = order.venue;
253 order_venues
254 .write()
255 .insert(order.id.into_uuid().to_string().into(), venue);
256
257 // Queue the order for batched processing
258 new_orders_by_venue.entry(venue).or_default().push(order);
259 }
260 ExecutionEvent::BulkNewOrders(orders) => {
261 for order in orders.into_iter() {
262 // Store the venue for this order for later lookups
263 let venue = order.venue;
264 order_venues
265 .write()
266 .insert(order.id.into_uuid().to_string().into(), venue);
267
268 // Queue the order for batched processing
269 new_orders_by_venue.entry(venue).or_default().push(order);
270 }
271 }
272 ExecutionEvent::CancelOrder {
273 order_id,
274 original_order,
275 } => {
276 // Lookup the venue for this order
277 let venue = if let Some(v) = order_venues.read().get(&order_id) {
278 *v
279 } else {
280 original_order.venue
281 };
282
283 // Queue the cancellation for batched processing
284 cancel_orders_by_venue
285 .entry(venue)
286 .or_default()
287 .push((order_id, original_order));
288 }
289 ExecutionEvent::BulkCancelOrders(orders) => {
290 for (order_id, original_order) in orders.into_iter() {
291 // Lookup the venue for this order
292 let venue = if let Some(v) = order_venues.read().get(&order_id) {
293 *v
294 } else {
295 original_order.venue
296 };
297
298 // Queue the cancellation for batched processing
299 cancel_orders_by_venue
300 .entry(venue)
301 .or_default()
302 .push((order_id, original_order));
303 }
304 }
305 ExecutionEvent::ModifyOrder {
306 order_id,
307 original_order,
308 new_price,
309 new_quantity,
310 } => {
311 // Lookup the venue for this order
312 let venue = if let Some(v) = order_venues.read().get(&order_id) {
313 *v
314 } else {
315 original_order.venue
316 };
317
318 // Route to the appropriate exchange
319 if let Some(exchange) = exchanges.get(&venue) {
320 let exchange_clone = exchange.clone();
321 let report_tx_clone = report_tx.clone();
322 let order_id_clone = order_id.clone();
323
324 // Spawn a task to handle the order modification
325 tokio::spawn(async move {
326 if let Err(e) = exchange_clone
327 .modify_order(
328 order_id_clone,
329 new_price,
330 new_quantity,
331 report_tx_clone,
332 )
333 .await
334 {
335 error!("Failed to modify order: {e}");
336 }
337 });
338 }
339 }
340 ExecutionEvent::CancelAllOrders(instrument_id) => {
341 let venue = instrument_id.venue;
342
343 // Route to the appropriate exchange
344 if let Some(exchange) = exchanges.get(&venue) {
345 let exchange_clone = exchange.clone();
346 let report_tx_clone = report_tx.clone();
347 let instrument_id_clone = instrument_id.clone();
348
349 // Spawn a task to handle the cancellation
350 tokio::spawn(async move {
351 if let Err(e) = exchange_clone
352 .cancel_all_orders(Some(instrument_id_clone), report_tx_clone)
353 .await
354 {
355 error!("Failed to cancel all orders for instrument: {e}");
356 }
357 });
358 }
359 }
360 ExecutionEvent::CancelAllOrdersGlobally => {
361 // Send cancel all orders to every registered exchange
362 for exchange in exchanges.values() {
363 let exchange_clone = exchange.clone();
364 let report_tx_clone = report_tx.clone();
365
366 // Spawn a task to handle the cancellation
367 tokio::spawn(async move {
368 if let Err(e) = exchange_clone
369 .cancel_all_orders(None, report_tx_clone)
370 .await
371 {
372 error!("Failed to cancel all orders globally: {e}");
373 }
374 });
375 }
376 }
377 }
378 }
379
380 // Process batched new orders
381 for (venue, orders) in new_orders_by_venue {
382 if let Some(exchange) = exchanges.get(&venue) {
383 let exchange_clone = exchange.clone();
384 let report_tx_clone = report_tx.clone();
385 let stats_clone = stats.clone();
386 let clock_clone = clock.clone();
387
388 // Check if the exchange supports batch order placement
389 if exchange.supports_batch_orders() {
390 // Capture request timestamp
391 let request_timestamp = clock_clone.raw();
392
393 // Spawn a task to handle the batched orders
394 tokio::spawn(async move {
395 // Log the timestamp for latency measurement
396 debug!("Batch order request timestamp: {request_timestamp}");
397
398 match exchange_clone
399 .place_batch_orders(orders.clone(), report_tx_clone.clone())
400 .await
401 {
402 Ok(batch_result) => {
403 // Handle the batch result using helper function
404 Self::handle_batch_result_place_orders(
405 batch_result,
406 orders.clone(),
407 exchange_clone.clone(),
408 report_tx_clone.clone(),
409 stats_clone.clone(),
410 clock_clone.clone(),
411 )
412 .await;
413 }
414 Err(e) => {
415 error!("Failed to place batch orders: {e}");
416
417 // Fall back to placing individual orders
418 for order in orders {
419 if let Err(e) = exchange_clone
420 .place_order(order.clone(), report_tx_clone.clone())
421 .await
422 {
423 error!("Failed to place order: {e}");
424 }
425
426 // Update statistics
427 {
428 let mut stats = stats_clone.write();
429 stats.orders_placed += 1;
430 stats.last_order_timestamp = clock_clone.raw();
431 }
432 }
433 }
434 }
435 });
436 } else {
437 // Exchange doesn't support batch orders, place them individually
438 for order in orders {
439 let exchange_clone = exchange.clone();
440 let report_tx_clone = report_tx.clone();
441 let stats_clone = stats.clone();
442 let clock_clone = clock.clone();
443 // This clone is needed in case we do special handling in the future
444
445 // Capture request timestamp
446 let request_timestamp = clock_clone.raw();
447
448 // Spawn a task to handle the order
449 tokio::spawn(async move {
450 // We might want to track the request timestamp for latency calculation in the future
451 // but for now we'll just log it in debug level
452 debug!("Order request timestamp: {request_timestamp}");
453
454 if let Err(e) = exchange_clone.place_order(order, report_tx_clone).await
455 {
456 error!("Failed to place order: {e}");
457 }
458
459 // Update statistics
460 let mut stats = stats_clone.write();
461 stats.orders_placed += 1;
462 stats.last_order_timestamp = clock_clone.raw();
463 });
464 }
465 }
466 }
467 }
468
469 // Process batched cancel orders
470 for (venue, orders) in cancel_orders_by_venue {
471 if let Some(exchange) = exchanges.get(&venue) {
472 let exchange_clone = exchange.clone();
473 let report_tx_clone = report_tx.clone();
474 let stats_clone = stats.clone();
475
476 // Check if the exchange supports batch order cancellation
477 if exchange.supports_batch_orders() {
478 // Extract order IDs for batch cancellation
479 let order_ids: SmallVec<[SmartString; 8]> =
480 orders.iter().map(|(id, _)| id.clone()).collect();
481
482 // Spawn a task to handle the batched cancellations
483 tokio::spawn(async move {
484 match exchange_clone
485 .cancel_batch_orders(order_ids, report_tx_clone.clone())
486 .await
487 {
488 Ok(batch_result) => {
489 // Handle the batch result based on its status
490 match batch_result.status {
491 crate::error::batch_errors::BatchStatus::AllSucceeded => {
492 info!(
493 "All {} order cancellations in batch succeeded",
494 batch_result.summary.total_orders
495 );
496 // Update statistics
497 let mut stats = stats_clone.write();
498 stats.orders_cancelled +=
499 batch_result.summary.successful_orders as u64;
500 }
501 crate::error::batch_errors::BatchStatus::PartialSuccess => {
502 warn!(
503 "Partial batch cancellation success: {}/{} orders cancelled",
504 batch_result.summary.successful_orders,
505 batch_result.summary.total_orders
506 );
507
508 // Update statistics
509 {
510 let mut stats = stats_clone.write();
511 stats.orders_cancelled +=
512 batch_result.summary.successful_orders as u64;
513 }
514 }
515 crate::error::batch_errors::BatchStatus::AllFailed => {
516 error!(
517 "All {} order cancellations in batch failed",
518 batch_result.summary.total_orders
519 );
520
521 // Retry cancellations individually if they're retryable
522 if batch_result.has_retryable_orders() {
523 warn!(
524 "Retrying {} retryable cancellations individually",
525 batch_result.summary.retryable_orders
526 );
527
528 for (order_id, _) in orders {
529 if let Err(e) = exchange_clone
530 .cancel_order(order_id, report_tx_clone.clone())
531 .await
532 {
533 error!("Failed to cancel retried order: {e}");
534 }
535 }
536 }
537 }
538 crate::error::batch_errors::BatchStatus::TransportFailure => {
539 error!(
540 "Transport failure affected entire batch of {} order cancellations",
541 batch_result.summary.total_orders
542 );
543
544 // For transport failures, retry individually after a delay
545 if batch_result.has_retryable_orders() {
546 warn!(
547 "Transport error is retryable, falling back to individual cancellations"
548 );
549
550 // Fall back to cancelling individual orders
551 for (order_id, _) in orders {
552 if let Err(e) = exchange_clone
553 .cancel_order(order_id, report_tx_clone.clone())
554 .await
555 {
556 error!(
557 "Failed to cancel order in fallback: {e}"
558 );
559 }
560 }
561 }
562 }
563 }
564 }
565 Err(e) => {
566 error!("Failed to cancel batch orders: {e}");
567
568 // Fall back to cancelling individual orders
569 for (order_id, _) in orders {
570 if let Err(e) = exchange_clone
571 .cancel_order(order_id, report_tx_clone.clone())
572 .await
573 {
574 error!("Failed to cancel order: {e}");
575 }
576
577 // Update statistics
578 {
579 let mut stats = stats_clone.write();
580 stats.orders_cancelled += 1;
581 }
582 }
583 }
584 }
585 });
586 } else {
587 // Exchange doesn't support batch cancellation, cancel them individually
588 for (order_id, _) in orders {
589 let exchange_clone = exchange.clone();
590 let report_tx_clone = report_tx.clone();
591 let stats_clone = stats.clone();
592 let order_id_clone = order_id.clone();
593
594 // Spawn a task to handle the cancellation
595 tokio::spawn(async move {
596 if let Err(e) = exchange_clone
597 .cancel_order(order_id_clone, report_tx_clone)
598 .await
599 {
600 error!("Failed to cancel order: {e}");
601 }
602
603 // Update statistics
604 let mut stats = stats_clone.write();
605 stats.orders_cancelled += 1;
606 });
607 }
608 }
609 }
610 }
611
612 Ok(())
613 }
614
615 /// Start the execution engine
616 pub async fn start(&self) -> Result<()> {
617 if self.exchanges.is_empty() {
618 return Err(anyhow!("No exchanges registered"));
619 }
620
621 // Set running flag
622 *self.running.write() = true;
623
624 // Clone values needed for the execution loop
625 let event_rx = self.event_rx.clone();
626 let report_tx = self.report_tx.clone();
627 let exchanges = self.exchanges.clone();
628 let order_venues = self.order_venues.clone();
629 let stats = self.stats.clone();
630 let clock = self.clock.clone();
631 let batch_size = self.batch_size;
632 let running = self.running.clone();
633
634 // Start the batched execution loop
635 tokio::spawn(async move {
636 info!("Starting execution engine with batch size: {batch_size}");
637
638 let mut events = Vec::with_capacity(batch_size);
639 let mut event_timeout = Duration::from_millis(10); // 10ms timeout for batch collection
640
641 'main: while *running.read() {
642 // Collect events up to batch_size or until timeout
643 let start_collection = tokio::time::Instant::now();
644
645 while events.len() < batch_size {
646 let remaining_timeout =
647 event_timeout.saturating_sub(start_collection.elapsed());
648
649 match tokio::time::timeout(remaining_timeout, event_rx.recv_async()).await {
650 Ok(Ok(event)) => {
651 events.push(event);
652 }
653 Ok(Err(_)) => {
654 // Channel closed
655 warn!("Event channel closed, stopping execution engine");
656 break 'main;
657 }
658 Err(_) => {
659 // Timeout collecting events - process what we have
660 break;
661 }
662 }
663
664 // If we've collected a full batch, process it immediately
665 if events.len() >= batch_size {
666 break;
667 }
668 }
669
670 // Process collected events if any
671 if !events.is_empty() {
672 let batch_size = events.len();
673
674 if let Err(e) = Self::process_event_batch_static(
675 &exchanges,
676 &order_venues,
677 &report_tx,
678 &stats,
679 &clock,
680 std::mem::take(&mut events),
681 )
682 .await
683 {
684 error!("Failed to process event batch: {e}");
685 }
686
687 // Dynamically adjust batch timeout based on load
688 // Lower timeout if we're getting full batches, increase if not
689 if batch_size >= MAX_BATCH_SIZE {
690 event_timeout = Duration::from_millis(5);
691 } else if batch_size <= MAX_BATCH_SIZE / 4 {
692 event_timeout = Duration::from_millis(20);
693 }
694 }
695 }
696
697 info!("Execution engine stopped");
698 });
699
700 Ok(())
701 }
702
703 /// Helper method to process batch from static context
704 async fn process_event_batch_static(
705 _exchanges: &FxHashMap<Venue, Arc<dyn Exchange>>,
706 _order_venues: &Arc<RwLock<FxHashMap<SmartString, Venue>>>,
707 _report_tx: &Sender<ExecutionReport>,
708 _stats: &Arc<RwLock<ExecutionStats>>,
709 _clock: &Clock,
710 _events: Vec<ExecutionEvent>,
711 ) -> Result<()> {
712 // Implementation would be similar to the instance method
713 // This allows for more flexible task spawning patterns
714 Ok(())
715 }
716
717 /// Stop the execution engine
718 pub async fn stop(&self) -> Result<()> {
719 // Set running flag to false
720 *self.running.write() = false;
721
722 Ok(())
723 }
724
725 /// Get a execution report channel that sends to this engine
726 #[must_use]
727 pub fn report_sender(&self) -> Sender<ExecutionReport> {
728 self.report_tx.clone()
729 }
730
731 /// Get an execution event channel that this engine listens to
732 #[must_use]
733 pub fn event_receiver(&self) -> Receiver<ExecutionEvent> {
734 self.event_rx.clone()
735 }
736
737 /// Get current execution statistics
738 #[must_use]
739 pub fn get_stats(&self) -> ExecutionStats {
740 self.stats.read().clone()
741 }
742}
743
744/// Interface for connecting to trading venues
745#[async_trait]
746pub trait Exchange: Send + Sync + std::any::Any {
747 /// Returns the venue this exchange connects to
748 fn venue(&self) -> Venue;
749
750 /// Places a new order on the exchange
751 async fn place_order(&self, order: Order, report_tx: Sender<ExecutionReport>) -> Result<()>;
752
753 /// Cancels an existing order
754 async fn cancel_order(
755 &self,
756 order_id: SmartString,
757 report_tx: Sender<ExecutionReport>,
758 ) -> Result<()>;
759
760 /// Modifies an existing order
761 async fn modify_order(
762 &self,
763 order_id: SmartString,
764 new_price: Option<Decimal>,
765 new_quantity: Option<Decimal>,
766 report_tx: Sender<ExecutionReport>,
767 ) -> Result<()>;
768
769 /// Cancels all open orders, optionally filtered by instrument
770 async fn cancel_all_orders(
771 &self,
772 instrument_id: Option<InstrumentId>,
773 report_tx: Sender<ExecutionReport>,
774 ) -> Result<()>;
775
776 /// Retrieves the status of an order
777 async fn get_order_status(&self, order_id: &str) -> Result<OrderStatus>;
778
779 /// Connects to the exchange
780 async fn connect(&self) -> Result<()>;
781
782 /// Disconnects from the exchange
783 async fn disconnect(&self) -> Result<()>;
784
785 /// Checks if connected to the exchange
786 async fn is_connected(&self) -> bool;
787
788 /// Gets a list of instruments supported by this exchange
789 async fn get_instruments(&self) -> Result<SmallVec<[InstrumentId; 32]>>;
790
791 /// Check if this exchange supports batch orders
792 fn supports_batch_orders(&self) -> bool {
793 false
794 }
795
796 /// Place multiple orders in a single batch with improved error handling
797 ///
798 /// Returns a `BatchResult` that separates transport-level errors from per-order errors.
799 /// This allows for better error handling strategies and more accurate reporting.
800 async fn place_batch_orders(
801 &self,
802 _orders: SmallVec<[Order; 8]>,
803 _report_tx: Sender<ExecutionReport>,
804 ) -> Result<BatchResult<()>> {
805 Err(anyhow!("Batch orders not supported"))
806 }
807
808 /// Cancel multiple orders in a single batch with improved error handling
809 ///
810 /// Returns a `BatchResult` that separates transport-level errors from per-order errors.
811 /// This allows for better error handling strategies and more accurate reporting.
812 async fn cancel_batch_orders(
813 &self,
814 _order_ids: SmallVec<[SmartString; 8]>,
815 _report_tx: Sender<ExecutionReport>,
816 ) -> Result<BatchResult<()>> {
817 Err(anyhow!("Batch cancellation not supported"))
818 }
819
820 // Removed as_any method to avoid dynamic dispatch
821}
822
823// The BatchExchange trait is no longer needed since its functionality has been
824// incorporated into the main Exchange trait with default implementations.
825
826/// Helper functions for batch result handling to reduce code duplication
827impl OptimizedExecutionEngine {
828 /// Handle batch result with retry logic for order placement
829 async fn handle_batch_result_place_orders(
830 batch_result: BatchResult<()>,
831 orders: SmallVec<[Order; 8]>,
832 exchange: Arc<dyn Exchange>,
833 report_tx: Sender<ExecutionReport>,
834 stats: Arc<RwLock<ExecutionStats>>,
835 clock: Clock,
836 ) {
837 match batch_result.status {
838 crate::error::batch_errors::BatchStatus::AllSucceeded => {
839 info!(
840 "All {} orders in batch succeeded",
841 batch_result.summary.total_orders
842 );
843 // Update statistics
844 let mut stats = stats.write();
845 stats.orders_placed += batch_result.summary.successful_orders as u64;
846 stats.last_order_timestamp = clock.raw();
847 }
848 crate::error::batch_errors::BatchStatus::PartialSuccess => {
849 warn!(
850 "Partial batch success: {}/{} orders succeeded",
851 batch_result.summary.successful_orders, batch_result.summary.total_orders
852 );
853
854 // Update statistics
855 {
856 let mut stats = stats.write();
857 stats.orders_placed += batch_result.summary.successful_orders as u64;
858 stats.orders_rejected += batch_result.summary.failed_orders as u64;
859 stats.last_order_timestamp = clock.raw();
860 }
861
862 // Retry failed orders individually if they're retryable
863 if batch_result.has_retryable_orders() {
864 Self::retry_orders_individually(
865 batch_result.get_retryable_orders(),
866 exchange,
867 report_tx,
868 )
869 .await;
870 }
871 }
872 crate::error::batch_errors::BatchStatus::AllFailed => {
873 error!(
874 "All {} orders in batch failed",
875 batch_result.summary.total_orders
876 );
877
878 // Update statistics
879 {
880 let mut stats = stats.write();
881 stats.orders_rejected += batch_result.summary.failed_orders as u64;
882 }
883
884 // Retry orders individually if they're retryable
885 if batch_result.has_retryable_orders() {
886 Self::retry_orders_individually(
887 batch_result.get_retryable_orders(),
888 exchange,
889 report_tx,
890 )
891 .await;
892 }
893 }
894 crate::error::batch_errors::BatchStatus::TransportFailure => {
895 error!(
896 "Transport failure affected entire batch of {} orders",
897 batch_result.summary.total_orders
898 );
899
900 // Update statistics
901 {
902 let mut stats = stats.write();
903 stats.orders_rejected += batch_result.summary.failed_orders as u64;
904 }
905
906 // For transport failures, retry the entire batch after a delay
907 if batch_result.has_retryable_orders() {
908 warn!("Transport error is retryable, falling back to individual orders");
909
910 // Fall back to placing individual orders
911 for order in orders {
912 if let Err(e) = exchange.place_order(order.clone(), report_tx.clone()).await
913 {
914 error!("Failed to place order in fallback: {e}");
915 }
916 }
917 }
918 }
919 }
920 }
921
922 /// Handle batch result with retry logic for order cancellation
923 async fn handle_batch_result_cancel_orders(
924 batch_result: BatchResult<()>,
925 exchange: Arc<dyn Exchange>,
926 report_tx: Sender<ExecutionReport>,
927 stats: Arc<RwLock<ExecutionStats>>,
928 ) {
929 match batch_result.status {
930 crate::error::batch_errors::BatchStatus::AllSucceeded => {
931 info!(
932 "All {} order cancellations in batch succeeded",
933 batch_result.summary.total_orders
934 );
935 // Update statistics
936 let mut stats = stats.write();
937 stats.orders_cancelled += batch_result.summary.successful_orders as u64;
938 }
939 crate::error::batch_errors::BatchStatus::PartialSuccess => {
940 warn!(
941 "Partial batch cancellation success: {}/{} orders cancelled",
942 batch_result.summary.successful_orders, batch_result.summary.total_orders
943 );
944
945 // Update statistics
946 {
947 let mut stats = stats.write();
948 stats.orders_cancelled += batch_result.summary.successful_orders as u64;
949 }
950 }
951 crate::error::batch_errors::BatchStatus::AllFailed => {
952 error!(
953 "All {} order cancellations in batch failed",
954 batch_result.summary.total_orders
955 );
956
957 // Update statistics (no cancellations succeeded)
958 // Note: For failed cancellations, we don't update orders_cancelled
959 }
960 crate::error::batch_errors::BatchStatus::TransportFailure => {
961 error!(
962 "Transport failure affected entire batch of {} order cancellations",
963 batch_result.summary.total_orders
964 );
965
966 // Update statistics (no cancellations succeeded)
967 // Note: For transport failures, we don't update orders_cancelled
968 }
969 }
970 }
971
972 /// Retry orders individually
973 async fn retry_orders_individually(
974 orders: SmallVec<[Order; 8]>,
975 exchange: Arc<dyn Exchange>,
976 report_tx: Sender<ExecutionReport>,
977 ) {
978 info!("Retrying {} retryable orders individually", orders.len());
979
980 for order in orders {
981 if let Err(e) = exchange.place_order(order.clone(), report_tx.clone()).await {
982 error!("Failed to place retried order: {e}");
983 }
984 }
985 }
986}
987
988/// Type alias for backward compatibility
989/// The optimized execution engine is now the primary implementation
990pub type ExecutionEngine = OptimizedExecutionEngine;
991
992#[cfg(test)]
993mod tests {
994 use rusty_common::SmartString;
995 use smallvec::SmallVec;
996
997 #[test]
998 fn test_smartstring_clone_vs_as_str_into() {
999 // This test demonstrates the inefficiency of .as_str().into() vs .clone()
1000 let original_ids: SmallVec<[SmartString; 8]> = smallvec::smallvec![
1001 "order_1".into(),
1002 "order_2".into(),
1003 "order_3".into(),
1004 "order_4".into(),
1005 ];
1006
1007 // Inefficient way: .as_str().into() - creates unnecessary allocations
1008 let inefficient_ids: SmallVec<[SmartString; 8]> =
1009 original_ids.iter().map(|id| id.as_str().into()).collect();
1010
1011 // Efficient way: .clone() - reuses existing allocation when possible
1012 let efficient_ids: SmallVec<[SmartString; 8]> = original_ids.iter().cloned().collect();
1013
1014 // Both should produce the same result
1015 assert_eq!(inefficient_ids, efficient_ids);
1016 assert_eq!(inefficient_ids.len(), 4);
1017 assert_eq!(efficient_ids.len(), 4);
1018
1019 // The efficient version avoids unnecessary string allocations
1020 for (orig, cloned) in original_ids.iter().zip(efficient_ids.iter()) {
1021 assert_eq!(orig, cloned);
1022 }
1023 }
1024}