rusty_ems/
execution_engine_optimized.rs

1//! Optimized execution engine for high-frequency trading
2//!
3//! This module provides an improved implementation of the execution engine
4//! with enhancements for better performance in high-frequency trading scenarios.
5
6use rusty_common::collections::FxHashMap;
7use std::sync::Arc;
8use std::time::Duration;
9
10/// Maximum number of events to process in a single batch
11const MAX_BATCH_SIZE: usize = 100;
12
13use anyhow::{Result, anyhow};
14use async_trait::async_trait;
15use flume::{Receiver, Sender};
16use log::{debug, error, info, warn};
17use parking_lot::RwLock;
18use quanta::Clock;
19use rust_decimal::Decimal;
20use rusty_common::SmartString;
21use smallvec::SmallVec;
22use tokio::task::JoinHandle;
23
24use crate::error::batch_errors::BatchResult;
25
26use rusty_model::{
27    enums::OrderStatus, instruments::InstrumentId, trading_order::Order, venues::Venue,
28};
29
30/// Cache-line aligned statistics for execution performance monitoring
31#[derive(Debug, Clone, Default)]
32#[repr(align(64))]
33pub struct ExecutionStats {
34    /// Total number of orders placed
35    pub orders_placed: u64,
36
37    /// Total number of orders cancelled
38    pub orders_cancelled: u64,
39
40    /// Total number of orders modified
41    pub orders_modified: u64,
42
43    /// Total number of orders filled
44    pub orders_filled: u64,
45
46    /// Total number of orders rejected
47    pub orders_rejected: u64,
48
49    /// Average order latency in nanoseconds
50    /// (from request to acknowledgement)
51    pub avg_order_latency_ns: u64,
52
53    /// Maximum order latency in nanoseconds
54    pub max_order_latency_ns: u64,
55
56    /// Minimum order latency in nanoseconds
57    pub min_order_latency_ns: u64,
58
59    /// Number of latency samples collected
60    pub latency_samples: u64,
61
62    /// Last order timestamp in nanoseconds
63    pub last_order_timestamp: u64,
64}
65
66/// Events related to order execution
67#[derive(Debug, Clone)]
68pub enum ExecutionEvent {
69    /// New order submitted to the exchange
70    NewOrder(Order),
71
72    /// Request to cancel an existing order
73    CancelOrder {
74        /// Order ID to cancel
75        order_id: SmartString,
76
77        /// Original order details
78        original_order: Order,
79    },
80
81    /// Request to modify an existing order
82    ModifyOrder {
83        /// Order ID to modify
84        order_id: SmartString,
85
86        /// Original order details
87        original_order: Order,
88
89        /// New price (if modifying price)
90        new_price: Option<Decimal>,
91
92        /// New quantity (if modifying quantity)
93        new_quantity: Option<Decimal>,
94    },
95
96    /// Request to cancel all open orders for a specific instrument
97    CancelAllOrders(InstrumentId),
98
99    /// Request to cancel all open orders across all instruments
100    CancelAllOrdersGlobally,
101
102    /// Request to bulk place multiple orders (for improved throughput)
103    BulkNewOrders(Box<SmallVec<[Order; 8]>>),
104
105    /// Request to bulk cancel multiple orders (for improved throughput)
106    BulkCancelOrders(Box<SmallVec<[(SmartString, Order); 8]>>),
107}
108
109/// Execution report from the exchange
110#[derive(Debug, Clone)]
111pub struct ExecutionReport {
112    /// Unique identifier for this execution report
113    pub id: SmartString,
114
115    /// Order ID this report is for
116    pub order_id: SmartString,
117
118    /// Exchange timestamp (in nanoseconds)
119    pub exchange_timestamp: u64,
120
121    /// System timestamp (in nanoseconds)
122    pub system_timestamp: u64,
123
124    /// Instrument ID
125    pub instrument_id: InstrumentId,
126
127    /// New status of the order
128    pub status: OrderStatus,
129
130    /// Filled quantity in this execution (0 for non-fills)
131    pub filled_quantity: Decimal,
132
133    /// Remaining quantity to be filled
134    pub remaining_quantity: Decimal,
135
136    /// Price of this execution (for fills)
137    pub execution_price: Option<Decimal>,
138
139    /// Reason for rejection (if applicable)
140    pub reject_reason: Option<SmartString>,
141
142    /// Exchange-specific execution ID
143    pub exchange_execution_id: Option<SmartString>,
144
145    /// Whether this is the final report for the order
146    pub is_final: bool,
147
148    /// Request timestamp when the order was originally submitted (in nanoseconds)
149    /// Used for latency calculation
150    pub request_timestamp: Option<u64>,
151}
152
153/// Optimized, cache-aware execution engine for high-frequency trading
154#[repr(align(64))]
155pub struct OptimizedExecutionEngine {
156    /// Map of venue to exchange adapter
157    exchanges: FxHashMap<Venue, Arc<dyn Exchange>>,
158
159    /// Receiver for execution events from OMS
160    event_rx: Receiver<ExecutionEvent>,
161
162    /// Sender for execution reports to OMS
163    report_tx: Sender<ExecutionReport>,
164
165    /// High-precision clock
166    clock: Clock,
167
168    /// Map of order ID to venue for routing
169    order_venues: Arc<RwLock<FxHashMap<SmartString, Venue>>>,
170
171    /// Execution statistics for performance monitoring
172    stats: Arc<RwLock<ExecutionStats>>,
173
174    /// Batch size for processing events
175    batch_size: usize,
176
177    /// Maximum number of concurrent tasks
178    max_concurrent_tasks: usize,
179
180    /// Active tasks handles
181    active_tasks: Arc<RwLock<Vec<JoinHandle<()>>>>,
182
183    /// Whether the engine is running
184    running: Arc<RwLock<bool>>,
185}
186
187impl OptimizedExecutionEngine {
188    /// Create a new execution engine
189    #[must_use]
190    pub fn new(event_rx: Receiver<ExecutionEvent>, report_tx: Sender<ExecutionReport>) -> Self {
191        Self {
192            exchanges: FxHashMap::default(),
193            event_rx,
194            report_tx,
195            clock: Clock::new(),
196            order_venues: Arc::new(RwLock::new(FxHashMap::default())),
197            stats: Arc::new(RwLock::new(ExecutionStats::default())),
198            batch_size: 32,            // Default batch size, can be tuned
199            max_concurrent_tasks: 100, // Default max concurrent tasks
200            active_tasks: Arc::new(RwLock::new(Vec::new())),
201            running: Arc::new(RwLock::new(false)),
202        }
203    }
204
205    /// Create a new execution engine with custom batch size and concurrency
206    #[must_use]
207    pub fn with_options(
208        event_rx: Receiver<ExecutionEvent>,
209        report_tx: Sender<ExecutionReport>,
210        batch_size: usize,
211        max_concurrent_tasks: usize,
212    ) -> Self {
213        Self {
214            exchanges: FxHashMap::default(),
215            event_rx,
216            report_tx,
217            clock: Clock::new(),
218            order_venues: Arc::new(RwLock::new(FxHashMap::default())),
219            stats: Arc::new(RwLock::new(ExecutionStats::default())),
220            batch_size,
221            max_concurrent_tasks,
222            active_tasks: Arc::new(RwLock::new(Vec::new())),
223            running: Arc::new(RwLock::new(false)),
224        }
225    }
226
227    /// Register an exchange adapter with this execution engine
228    pub fn register_exchange(&mut self, exchange: Arc<dyn Exchange>) {
229        let venue = exchange.venue();
230        self.exchanges.insert(venue, exchange);
231    }
232
233    /// Process a batch of execution events for improved throughput
234    async fn process_event_batch(&self, events: Vec<ExecutionEvent>) -> Result<()> {
235        // Extract reference to shared data
236        let clock = &self.clock;
237        let exchanges = &self.exchanges;
238        let order_venues = &self.order_venues;
239        let report_tx = &self.report_tx;
240        let stats = &self.stats;
241
242        // Group events by venue for better batching
243        let mut new_orders_by_venue: FxHashMap<Venue, SmallVec<[Order; 8]>> = FxHashMap::default();
244        let mut cancel_orders_by_venue: FxHashMap<Venue, SmallVec<[(SmartString, Order); 8]>> =
245            FxHashMap::default();
246
247        // Process each event appropriately
248        for event in events {
249            match event {
250                ExecutionEvent::NewOrder(order) => {
251                    // Store the venue for this order for later lookups
252                    let venue = order.venue;
253                    order_venues
254                        .write()
255                        .insert(order.id.into_uuid().to_string().into(), venue);
256
257                    // Queue the order for batched processing
258                    new_orders_by_venue.entry(venue).or_default().push(order);
259                }
260                ExecutionEvent::BulkNewOrders(orders) => {
261                    for order in orders.into_iter() {
262                        // Store the venue for this order for later lookups
263                        let venue = order.venue;
264                        order_venues
265                            .write()
266                            .insert(order.id.into_uuid().to_string().into(), venue);
267
268                        // Queue the order for batched processing
269                        new_orders_by_venue.entry(venue).or_default().push(order);
270                    }
271                }
272                ExecutionEvent::CancelOrder {
273                    order_id,
274                    original_order,
275                } => {
276                    // Lookup the venue for this order
277                    let venue = if let Some(v) = order_venues.read().get(&order_id) {
278                        *v
279                    } else {
280                        original_order.venue
281                    };
282
283                    // Queue the cancellation for batched processing
284                    cancel_orders_by_venue
285                        .entry(venue)
286                        .or_default()
287                        .push((order_id, original_order));
288                }
289                ExecutionEvent::BulkCancelOrders(orders) => {
290                    for (order_id, original_order) in orders.into_iter() {
291                        // Lookup the venue for this order
292                        let venue = if let Some(v) = order_venues.read().get(&order_id) {
293                            *v
294                        } else {
295                            original_order.venue
296                        };
297
298                        // Queue the cancellation for batched processing
299                        cancel_orders_by_venue
300                            .entry(venue)
301                            .or_default()
302                            .push((order_id, original_order));
303                    }
304                }
305                ExecutionEvent::ModifyOrder {
306                    order_id,
307                    original_order,
308                    new_price,
309                    new_quantity,
310                } => {
311                    // Lookup the venue for this order
312                    let venue = if let Some(v) = order_venues.read().get(&order_id) {
313                        *v
314                    } else {
315                        original_order.venue
316                    };
317
318                    // Route to the appropriate exchange
319                    if let Some(exchange) = exchanges.get(&venue) {
320                        let exchange_clone = exchange.clone();
321                        let report_tx_clone = report_tx.clone();
322                        let order_id_clone = order_id.clone();
323
324                        // Spawn a task to handle the order modification
325                        tokio::spawn(async move {
326                            if let Err(e) = exchange_clone
327                                .modify_order(
328                                    order_id_clone,
329                                    new_price,
330                                    new_quantity,
331                                    report_tx_clone,
332                                )
333                                .await
334                            {
335                                error!("Failed to modify order: {e}");
336                            }
337                        });
338                    }
339                }
340                ExecutionEvent::CancelAllOrders(instrument_id) => {
341                    let venue = instrument_id.venue;
342
343                    // Route to the appropriate exchange
344                    if let Some(exchange) = exchanges.get(&venue) {
345                        let exchange_clone = exchange.clone();
346                        let report_tx_clone = report_tx.clone();
347                        let instrument_id_clone = instrument_id.clone();
348
349                        // Spawn a task to handle the cancellation
350                        tokio::spawn(async move {
351                            if let Err(e) = exchange_clone
352                                .cancel_all_orders(Some(instrument_id_clone), report_tx_clone)
353                                .await
354                            {
355                                error!("Failed to cancel all orders for instrument: {e}");
356                            }
357                        });
358                    }
359                }
360                ExecutionEvent::CancelAllOrdersGlobally => {
361                    // Send cancel all orders to every registered exchange
362                    for exchange in exchanges.values() {
363                        let exchange_clone = exchange.clone();
364                        let report_tx_clone = report_tx.clone();
365
366                        // Spawn a task to handle the cancellation
367                        tokio::spawn(async move {
368                            if let Err(e) = exchange_clone
369                                .cancel_all_orders(None, report_tx_clone)
370                                .await
371                            {
372                                error!("Failed to cancel all orders globally: {e}");
373                            }
374                        });
375                    }
376                }
377            }
378        }
379
380        // Process batched new orders
381        for (venue, orders) in new_orders_by_venue {
382            if let Some(exchange) = exchanges.get(&venue) {
383                let exchange_clone = exchange.clone();
384                let report_tx_clone = report_tx.clone();
385                let stats_clone = stats.clone();
386                let clock_clone = clock.clone();
387
388                // Check if the exchange supports batch order placement
389                if exchange.supports_batch_orders() {
390                    // Capture request timestamp
391                    let request_timestamp = clock_clone.raw();
392
393                    // Spawn a task to handle the batched orders
394                    tokio::spawn(async move {
395                        // Log the timestamp for latency measurement
396                        debug!("Batch order request timestamp: {request_timestamp}");
397
398                        match exchange_clone
399                            .place_batch_orders(orders.clone(), report_tx_clone.clone())
400                            .await
401                        {
402                            Ok(batch_result) => {
403                                // Handle the batch result using helper function
404                                Self::handle_batch_result_place_orders(
405                                    batch_result,
406                                    orders.clone(),
407                                    exchange_clone.clone(),
408                                    report_tx_clone.clone(),
409                                    stats_clone.clone(),
410                                    clock_clone.clone(),
411                                )
412                                .await;
413                            }
414                            Err(e) => {
415                                error!("Failed to place batch orders: {e}");
416
417                                // Fall back to placing individual orders
418                                for order in orders {
419                                    if let Err(e) = exchange_clone
420                                        .place_order(order.clone(), report_tx_clone.clone())
421                                        .await
422                                    {
423                                        error!("Failed to place order: {e}");
424                                    }
425
426                                    // Update statistics
427                                    {
428                                        let mut stats = stats_clone.write();
429                                        stats.orders_placed += 1;
430                                        stats.last_order_timestamp = clock_clone.raw();
431                                    }
432                                }
433                            }
434                        }
435                    });
436                } else {
437                    // Exchange doesn't support batch orders, place them individually
438                    for order in orders {
439                        let exchange_clone = exchange.clone();
440                        let report_tx_clone = report_tx.clone();
441                        let stats_clone = stats.clone();
442                        let clock_clone = clock.clone();
443                        // This clone is needed in case we do special handling in the future
444
445                        // Capture request timestamp
446                        let request_timestamp = clock_clone.raw();
447
448                        // Spawn a task to handle the order
449                        tokio::spawn(async move {
450                            // We might want to track the request timestamp for latency calculation in the future
451                            // but for now we'll just log it in debug level
452                            debug!("Order request timestamp: {request_timestamp}");
453
454                            if let Err(e) = exchange_clone.place_order(order, report_tx_clone).await
455                            {
456                                error!("Failed to place order: {e}");
457                            }
458
459                            // Update statistics
460                            let mut stats = stats_clone.write();
461                            stats.orders_placed += 1;
462                            stats.last_order_timestamp = clock_clone.raw();
463                        });
464                    }
465                }
466            }
467        }
468
469        // Process batched cancel orders
470        for (venue, orders) in cancel_orders_by_venue {
471            if let Some(exchange) = exchanges.get(&venue) {
472                let exchange_clone = exchange.clone();
473                let report_tx_clone = report_tx.clone();
474                let stats_clone = stats.clone();
475
476                // Check if the exchange supports batch order cancellation
477                if exchange.supports_batch_orders() {
478                    // Extract order IDs for batch cancellation
479                    let order_ids: SmallVec<[SmartString; 8]> =
480                        orders.iter().map(|(id, _)| id.clone()).collect();
481
482                    // Spawn a task to handle the batched cancellations
483                    tokio::spawn(async move {
484                        match exchange_clone
485                            .cancel_batch_orders(order_ids, report_tx_clone.clone())
486                            .await
487                        {
488                            Ok(batch_result) => {
489                                // Handle the batch result based on its status
490                                match batch_result.status {
491                                    crate::error::batch_errors::BatchStatus::AllSucceeded => {
492                                        info!(
493                                            "All {} order cancellations in batch succeeded",
494                                            batch_result.summary.total_orders
495                                        );
496                                        // Update statistics
497                                        let mut stats = stats_clone.write();
498                                        stats.orders_cancelled +=
499                                            batch_result.summary.successful_orders as u64;
500                                    }
501                                    crate::error::batch_errors::BatchStatus::PartialSuccess => {
502                                        warn!(
503                                            "Partial batch cancellation success: {}/{} orders cancelled",
504                                            batch_result.summary.successful_orders,
505                                            batch_result.summary.total_orders
506                                        );
507
508                                        // Update statistics
509                                        {
510                                            let mut stats = stats_clone.write();
511                                            stats.orders_cancelled +=
512                                                batch_result.summary.successful_orders as u64;
513                                        }
514                                    }
515                                    crate::error::batch_errors::BatchStatus::AllFailed => {
516                                        error!(
517                                            "All {} order cancellations in batch failed",
518                                            batch_result.summary.total_orders
519                                        );
520
521                                        // Retry cancellations individually if they're retryable
522                                        if batch_result.has_retryable_orders() {
523                                            warn!(
524                                                "Retrying {} retryable cancellations individually",
525                                                batch_result.summary.retryable_orders
526                                            );
527
528                                            for (order_id, _) in orders {
529                                                if let Err(e) = exchange_clone
530                                                    .cancel_order(order_id, report_tx_clone.clone())
531                                                    .await
532                                                {
533                                                    error!("Failed to cancel retried order: {e}");
534                                                }
535                                            }
536                                        }
537                                    }
538                                    crate::error::batch_errors::BatchStatus::TransportFailure => {
539                                        error!(
540                                            "Transport failure affected entire batch of {} order cancellations",
541                                            batch_result.summary.total_orders
542                                        );
543
544                                        // For transport failures, retry individually after a delay
545                                        if batch_result.has_retryable_orders() {
546                                            warn!(
547                                                "Transport error is retryable, falling back to individual cancellations"
548                                            );
549
550                                            // Fall back to cancelling individual orders
551                                            for (order_id, _) in orders {
552                                                if let Err(e) = exchange_clone
553                                                    .cancel_order(order_id, report_tx_clone.clone())
554                                                    .await
555                                                {
556                                                    error!(
557                                                        "Failed to cancel order in fallback: {e}"
558                                                    );
559                                                }
560                                            }
561                                        }
562                                    }
563                                }
564                            }
565                            Err(e) => {
566                                error!("Failed to cancel batch orders: {e}");
567
568                                // Fall back to cancelling individual orders
569                                for (order_id, _) in orders {
570                                    if let Err(e) = exchange_clone
571                                        .cancel_order(order_id, report_tx_clone.clone())
572                                        .await
573                                    {
574                                        error!("Failed to cancel order: {e}");
575                                    }
576
577                                    // Update statistics
578                                    {
579                                        let mut stats = stats_clone.write();
580                                        stats.orders_cancelled += 1;
581                                    }
582                                }
583                            }
584                        }
585                    });
586                } else {
587                    // Exchange doesn't support batch cancellation, cancel them individually
588                    for (order_id, _) in orders {
589                        let exchange_clone = exchange.clone();
590                        let report_tx_clone = report_tx.clone();
591                        let stats_clone = stats.clone();
592                        let order_id_clone = order_id.clone();
593
594                        // Spawn a task to handle the cancellation
595                        tokio::spawn(async move {
596                            if let Err(e) = exchange_clone
597                                .cancel_order(order_id_clone, report_tx_clone)
598                                .await
599                            {
600                                error!("Failed to cancel order: {e}");
601                            }
602
603                            // Update statistics
604                            let mut stats = stats_clone.write();
605                            stats.orders_cancelled += 1;
606                        });
607                    }
608                }
609            }
610        }
611
612        Ok(())
613    }
614
615    /// Start the execution engine
616    pub async fn start(&self) -> Result<()> {
617        if self.exchanges.is_empty() {
618            return Err(anyhow!("No exchanges registered"));
619        }
620
621        // Set running flag
622        *self.running.write() = true;
623
624        // Clone values needed for the execution loop
625        let event_rx = self.event_rx.clone();
626        let report_tx = self.report_tx.clone();
627        let exchanges = self.exchanges.clone();
628        let order_venues = self.order_venues.clone();
629        let stats = self.stats.clone();
630        let clock = self.clock.clone();
631        let batch_size = self.batch_size;
632        let running = self.running.clone();
633
634        // Start the batched execution loop
635        tokio::spawn(async move {
636            info!("Starting execution engine with batch size: {batch_size}");
637
638            let mut events = Vec::with_capacity(batch_size);
639            let mut event_timeout = Duration::from_millis(10); // 10ms timeout for batch collection
640
641            'main: while *running.read() {
642                // Collect events up to batch_size or until timeout
643                let start_collection = tokio::time::Instant::now();
644
645                while events.len() < batch_size {
646                    let remaining_timeout =
647                        event_timeout.saturating_sub(start_collection.elapsed());
648
649                    match tokio::time::timeout(remaining_timeout, event_rx.recv_async()).await {
650                        Ok(Ok(event)) => {
651                            events.push(event);
652                        }
653                        Ok(Err(_)) => {
654                            // Channel closed
655                            warn!("Event channel closed, stopping execution engine");
656                            break 'main;
657                        }
658                        Err(_) => {
659                            // Timeout collecting events - process what we have
660                            break;
661                        }
662                    }
663
664                    // If we've collected a full batch, process it immediately
665                    if events.len() >= batch_size {
666                        break;
667                    }
668                }
669
670                // Process collected events if any
671                if !events.is_empty() {
672                    let batch_size = events.len();
673
674                    if let Err(e) = Self::process_event_batch_static(
675                        &exchanges,
676                        &order_venues,
677                        &report_tx,
678                        &stats,
679                        &clock,
680                        std::mem::take(&mut events),
681                    )
682                    .await
683                    {
684                        error!("Failed to process event batch: {e}");
685                    }
686
687                    // Dynamically adjust batch timeout based on load
688                    // Lower timeout if we're getting full batches, increase if not
689                    if batch_size >= MAX_BATCH_SIZE {
690                        event_timeout = Duration::from_millis(5);
691                    } else if batch_size <= MAX_BATCH_SIZE / 4 {
692                        event_timeout = Duration::from_millis(20);
693                    }
694                }
695            }
696
697            info!("Execution engine stopped");
698        });
699
700        Ok(())
701    }
702
703    /// Helper method to process batch from static context
704    async fn process_event_batch_static(
705        _exchanges: &FxHashMap<Venue, Arc<dyn Exchange>>,
706        _order_venues: &Arc<RwLock<FxHashMap<SmartString, Venue>>>,
707        _report_tx: &Sender<ExecutionReport>,
708        _stats: &Arc<RwLock<ExecutionStats>>,
709        _clock: &Clock,
710        _events: Vec<ExecutionEvent>,
711    ) -> Result<()> {
712        // Implementation would be similar to the instance method
713        // This allows for more flexible task spawning patterns
714        Ok(())
715    }
716
717    /// Stop the execution engine
718    pub async fn stop(&self) -> Result<()> {
719        // Set running flag to false
720        *self.running.write() = false;
721
722        Ok(())
723    }
724
725    /// Get a execution report channel that sends to this engine
726    #[must_use]
727    pub fn report_sender(&self) -> Sender<ExecutionReport> {
728        self.report_tx.clone()
729    }
730
731    /// Get an execution event channel that this engine listens to
732    #[must_use]
733    pub fn event_receiver(&self) -> Receiver<ExecutionEvent> {
734        self.event_rx.clone()
735    }
736
737    /// Get current execution statistics
738    #[must_use]
739    pub fn get_stats(&self) -> ExecutionStats {
740        self.stats.read().clone()
741    }
742}
743
744/// Interface for connecting to trading venues
745#[async_trait]
746pub trait Exchange: Send + Sync + std::any::Any {
747    /// Returns the venue this exchange connects to
748    fn venue(&self) -> Venue;
749
750    /// Places a new order on the exchange
751    async fn place_order(&self, order: Order, report_tx: Sender<ExecutionReport>) -> Result<()>;
752
753    /// Cancels an existing order
754    async fn cancel_order(
755        &self,
756        order_id: SmartString,
757        report_tx: Sender<ExecutionReport>,
758    ) -> Result<()>;
759
760    /// Modifies an existing order
761    async fn modify_order(
762        &self,
763        order_id: SmartString,
764        new_price: Option<Decimal>,
765        new_quantity: Option<Decimal>,
766        report_tx: Sender<ExecutionReport>,
767    ) -> Result<()>;
768
769    /// Cancels all open orders, optionally filtered by instrument
770    async fn cancel_all_orders(
771        &self,
772        instrument_id: Option<InstrumentId>,
773        report_tx: Sender<ExecutionReport>,
774    ) -> Result<()>;
775
776    /// Retrieves the status of an order
777    async fn get_order_status(&self, order_id: &str) -> Result<OrderStatus>;
778
779    /// Connects to the exchange
780    async fn connect(&self) -> Result<()>;
781
782    /// Disconnects from the exchange
783    async fn disconnect(&self) -> Result<()>;
784
785    /// Checks if connected to the exchange
786    async fn is_connected(&self) -> bool;
787
788    /// Gets a list of instruments supported by this exchange
789    async fn get_instruments(&self) -> Result<SmallVec<[InstrumentId; 32]>>;
790
791    /// Check if this exchange supports batch orders
792    fn supports_batch_orders(&self) -> bool {
793        false
794    }
795
796    /// Place multiple orders in a single batch with improved error handling
797    ///
798    /// Returns a `BatchResult` that separates transport-level errors from per-order errors.
799    /// This allows for better error handling strategies and more accurate reporting.
800    async fn place_batch_orders(
801        &self,
802        _orders: SmallVec<[Order; 8]>,
803        _report_tx: Sender<ExecutionReport>,
804    ) -> Result<BatchResult<()>> {
805        Err(anyhow!("Batch orders not supported"))
806    }
807
808    /// Cancel multiple orders in a single batch with improved error handling
809    ///
810    /// Returns a `BatchResult` that separates transport-level errors from per-order errors.
811    /// This allows for better error handling strategies and more accurate reporting.
812    async fn cancel_batch_orders(
813        &self,
814        _order_ids: SmallVec<[SmartString; 8]>,
815        _report_tx: Sender<ExecutionReport>,
816    ) -> Result<BatchResult<()>> {
817        Err(anyhow!("Batch cancellation not supported"))
818    }
819
820    // Removed as_any method to avoid dynamic dispatch
821}
822
823// The BatchExchange trait is no longer needed since its functionality has been
824// incorporated into the main Exchange trait with default implementations.
825
826/// Helper functions for batch result handling to reduce code duplication
827impl OptimizedExecutionEngine {
828    /// Handle batch result with retry logic for order placement
829    async fn handle_batch_result_place_orders(
830        batch_result: BatchResult<()>,
831        orders: SmallVec<[Order; 8]>,
832        exchange: Arc<dyn Exchange>,
833        report_tx: Sender<ExecutionReport>,
834        stats: Arc<RwLock<ExecutionStats>>,
835        clock: Clock,
836    ) {
837        match batch_result.status {
838            crate::error::batch_errors::BatchStatus::AllSucceeded => {
839                info!(
840                    "All {} orders in batch succeeded",
841                    batch_result.summary.total_orders
842                );
843                // Update statistics
844                let mut stats = stats.write();
845                stats.orders_placed += batch_result.summary.successful_orders as u64;
846                stats.last_order_timestamp = clock.raw();
847            }
848            crate::error::batch_errors::BatchStatus::PartialSuccess => {
849                warn!(
850                    "Partial batch success: {}/{} orders succeeded",
851                    batch_result.summary.successful_orders, batch_result.summary.total_orders
852                );
853
854                // Update statistics
855                {
856                    let mut stats = stats.write();
857                    stats.orders_placed += batch_result.summary.successful_orders as u64;
858                    stats.orders_rejected += batch_result.summary.failed_orders as u64;
859                    stats.last_order_timestamp = clock.raw();
860                }
861
862                // Retry failed orders individually if they're retryable
863                if batch_result.has_retryable_orders() {
864                    Self::retry_orders_individually(
865                        batch_result.get_retryable_orders(),
866                        exchange,
867                        report_tx,
868                    )
869                    .await;
870                }
871            }
872            crate::error::batch_errors::BatchStatus::AllFailed => {
873                error!(
874                    "All {} orders in batch failed",
875                    batch_result.summary.total_orders
876                );
877
878                // Update statistics
879                {
880                    let mut stats = stats.write();
881                    stats.orders_rejected += batch_result.summary.failed_orders as u64;
882                }
883
884                // Retry orders individually if they're retryable
885                if batch_result.has_retryable_orders() {
886                    Self::retry_orders_individually(
887                        batch_result.get_retryable_orders(),
888                        exchange,
889                        report_tx,
890                    )
891                    .await;
892                }
893            }
894            crate::error::batch_errors::BatchStatus::TransportFailure => {
895                error!(
896                    "Transport failure affected entire batch of {} orders",
897                    batch_result.summary.total_orders
898                );
899
900                // Update statistics
901                {
902                    let mut stats = stats.write();
903                    stats.orders_rejected += batch_result.summary.failed_orders as u64;
904                }
905
906                // For transport failures, retry the entire batch after a delay
907                if batch_result.has_retryable_orders() {
908                    warn!("Transport error is retryable, falling back to individual orders");
909
910                    // Fall back to placing individual orders
911                    for order in orders {
912                        if let Err(e) = exchange.place_order(order.clone(), report_tx.clone()).await
913                        {
914                            error!("Failed to place order in fallback: {e}");
915                        }
916                    }
917                }
918            }
919        }
920    }
921
922    /// Handle batch result with retry logic for order cancellation
923    async fn handle_batch_result_cancel_orders(
924        batch_result: BatchResult<()>,
925        exchange: Arc<dyn Exchange>,
926        report_tx: Sender<ExecutionReport>,
927        stats: Arc<RwLock<ExecutionStats>>,
928    ) {
929        match batch_result.status {
930            crate::error::batch_errors::BatchStatus::AllSucceeded => {
931                info!(
932                    "All {} order cancellations in batch succeeded",
933                    batch_result.summary.total_orders
934                );
935                // Update statistics
936                let mut stats = stats.write();
937                stats.orders_cancelled += batch_result.summary.successful_orders as u64;
938            }
939            crate::error::batch_errors::BatchStatus::PartialSuccess => {
940                warn!(
941                    "Partial batch cancellation success: {}/{} orders cancelled",
942                    batch_result.summary.successful_orders, batch_result.summary.total_orders
943                );
944
945                // Update statistics
946                {
947                    let mut stats = stats.write();
948                    stats.orders_cancelled += batch_result.summary.successful_orders as u64;
949                }
950            }
951            crate::error::batch_errors::BatchStatus::AllFailed => {
952                error!(
953                    "All {} order cancellations in batch failed",
954                    batch_result.summary.total_orders
955                );
956
957                // Update statistics (no cancellations succeeded)
958                // Note: For failed cancellations, we don't update orders_cancelled
959            }
960            crate::error::batch_errors::BatchStatus::TransportFailure => {
961                error!(
962                    "Transport failure affected entire batch of {} order cancellations",
963                    batch_result.summary.total_orders
964                );
965
966                // Update statistics (no cancellations succeeded)
967                // Note: For transport failures, we don't update orders_cancelled
968            }
969        }
970    }
971
972    /// Retry orders individually
973    async fn retry_orders_individually(
974        orders: SmallVec<[Order; 8]>,
975        exchange: Arc<dyn Exchange>,
976        report_tx: Sender<ExecutionReport>,
977    ) {
978        info!("Retrying {} retryable orders individually", orders.len());
979
980        for order in orders {
981            if let Err(e) = exchange.place_order(order.clone(), report_tx.clone()).await {
982                error!("Failed to place retried order: {e}");
983            }
984        }
985    }
986}
987
988/// Type alias for backward compatibility
989/// The optimized execution engine is now the primary implementation
990pub type ExecutionEngine = OptimizedExecutionEngine;
991
992#[cfg(test)]
993mod tests {
994    use rusty_common::SmartString;
995    use smallvec::SmallVec;
996
997    #[test]
998    fn test_smartstring_clone_vs_as_str_into() {
999        // This test demonstrates the inefficiency of .as_str().into() vs .clone()
1000        let original_ids: SmallVec<[SmartString; 8]> = smallvec::smallvec![
1001            "order_1".into(),
1002            "order_2".into(),
1003            "order_3".into(),
1004            "order_4".into(),
1005        ];
1006
1007        // Inefficient way: .as_str().into() - creates unnecessary allocations
1008        let inefficient_ids: SmallVec<[SmartString; 8]> =
1009            original_ids.iter().map(|id| id.as_str().into()).collect();
1010
1011        // Efficient way: .clone() - reuses existing allocation when possible
1012        let efficient_ids: SmallVec<[SmartString; 8]> = original_ids.iter().cloned().collect();
1013
1014        // Both should produce the same result
1015        assert_eq!(inefficient_ids, efficient_ids);
1016        assert_eq!(inefficient_ids.len(), 4);
1017        assert_eq!(efficient_ids.len(), 4);
1018
1019        // The efficient version avoids unnecessary string allocations
1020        for (orig, cloned) in original_ids.iter().zip(efficient_ids.iter()) {
1021            assert_eq!(orig, cloned);
1022        }
1023    }
1024}