rusty_ems/exchanges/
binance.rs

1use std::sync::Arc;
2
3use anyhow::{Result, anyhow};
4use async_trait::async_trait;
5use flume::Sender;
6
7use crate::utils::time::timestamp_millis;
8use parking_lot::RwLock;
9use quanta::Clock;
10use rust_decimal::Decimal;
11use rusty_common::SmartString;
12use rusty_common::utils::id_generation;
13use rusty_model::{
14    enums::{OrderSide, OrderStatus, OrderType, TimeInForce},
15    instruments::InstrumentId,
16    trading_order::Order,
17    types::ClientId,
18    venues::Venue,
19};
20use smallvec::SmallVec;
21use std::time::{Duration, SystemTime};
22
23use crate::error::EMSError;
24use crate::error::batch_errors::{
25    BatchResult, BatchStatus, BatchSummary, OrderResult, OrderResultMap,
26};
27use crate::exchanges::binance_rest::{
28    BinanceRestClient, OcoOrderParams, PlaceOrderParams, SorOrderParams,
29};
30use crate::exchanges::binance_websocket::BinanceWebSocketClient;
31use crate::execution_engine::{Exchange, ExecutionReport};
32
33/// Standalone function to map Binance order status to internal `OrderStatus`
34fn map_order_status(status: &str) -> OrderStatus {
35    match status {
36        "NEW" => OrderStatus::New,
37        "PARTIALLY_FILLED" => OrderStatus::PartiallyFilled,
38        "FILLED" => OrderStatus::Filled,
39        "CANCELED" | "CANCELLED" => OrderStatus::Cancelled,
40        "REJECTED" => OrderStatus::Rejected,
41        "EXPIRED" => OrderStatus::Expired,
42        "PENDING_CANCEL" => OrderStatus::Pending,
43        "PENDING_NEW" => OrderStatus::Pending,
44        "REPLACED" => OrderStatus::Unknown, // Binance-specific status for replaced orders
45        _ => {
46            log::error!("Unknown Binance order status: {status}");
47            OrderStatus::Unknown
48        }
49    }
50}
51
52/// Cache entry for order symbol lookups
53#[derive(Clone, Debug)]
54struct OrderSymbolCacheEntry {
55    /// The symbol if found, None for negative cache
56    symbol: Option<SmartString>,
57    /// Timestamp when this entry was cached
58    cached_at: SystemTime,
59}
60
61impl OrderSymbolCacheEntry {
62    /// Create a positive cache entry (order found)
63    fn found(symbol: SmartString) -> Self {
64        Self {
65            symbol: Some(symbol),
66            cached_at: SystemTime::now(),
67        }
68    }
69
70    /// Create a negative cache entry (order not found)
71    fn not_found() -> Self {
72        Self {
73            symbol: None,
74            cached_at: SystemTime::now(),
75        }
76    }
77
78    /// Check if this cache entry is still valid
79    fn is_valid(&self, max_age: Duration) -> bool {
80        self.cached_at.elapsed().unwrap_or(max_age) < max_age
81    }
82}
83
84/// Binance exchange adapter using composition of REST and WebSocket clients
85///
86/// # Usage Patterns
87///
88/// ## High-Performance Separate Clients
89/// ```rust,no_run
90/// use BinanceExchange;
91///
92/// // WebSocket-only for market data (optimal memory usage)
93/// let ws_client = BinanceExchange::create_websocket_client(api_key, secret);
94///
95/// // REST-only for order management (optimal for batch operations)
96/// let rest_client = BinanceExchange::create_rest_client(api_key, secret);
97/// ```
98///
99/// ## Convenient Combined Interface
100/// ```rust,no_run
101/// use BinanceExchange;
102///
103/// // Full exchange adapter (includes both REST + WebSocket)
104/// let exchange = BinanceExchange::new(api_key, secret);
105/// exchange.place_order(order, tx).await?;
106///
107/// // Access underlying clients for performance-critical operations
108/// let rest = exchange.rest_client();
109/// let ws = exchange.websocket_client();
110/// ```
111///
112/// ## Performance Recommendations
113/// - **Market Data Feeds**: Use `BinanceWebSocketClient` directly
114/// - **Order Management**: Use `BinanceRestClient` directly
115/// - **Full Applications**: Use `BinanceExchange` facade
116/// - **Microservices**: Use separate clients per service
117///
118/// # Configuration for Binance exchange operations
119#[derive(Debug, Clone)]
120pub struct BinanceConfig {
121    /// Timeout for individual order lookup requests (milliseconds)
122    pub order_lookup_timeout_ms: u64,
123    /// Total timeout for concurrent order lookups (milliseconds)
124    pub order_lookup_total_timeout_ms: u64,
125    /// Major symbols for order lookup fallback
126    pub major_symbols: SmallVec<[SmartString; 8]>,
127}
128
129impl Default for BinanceConfig {
130    fn default() -> Self {
131        Self {
132            order_lookup_timeout_ms: 2000,
133            order_lookup_total_timeout_ms: 5000,
134            major_symbols: Self::default_major_symbols(),
135        }
136    }
137}
138
139impl BinanceConfig {
140    /// Default major symbols for order lookup fallback
141    #[must_use]
142    fn default_major_symbols() -> SmallVec<[SmartString; 8]> {
143        let symbols = ["BTCUSDT", "ETHUSDT", "BNBUSDT", "ADAUSDT", "SOLUSDT"];
144        symbols.iter().map(|&s| s.into()).collect()
145    }
146
147    /// Create config optimized for high-frequency trading
148    #[must_use]
149    pub fn high_frequency() -> Self {
150        Self {
151            order_lookup_timeout_ms: 1000,       // Faster timeout for HFT
152            order_lookup_total_timeout_ms: 2000, // Faster total timeout
153            major_symbols: Self::default_major_symbols(),
154        }
155    }
156}
157
158/// Binance exchange implementation providing order execution and market data access
159pub struct BinanceExchange {
160    /// REST client for API operations
161    rest_client: Arc<BinanceRestClient>,
162
163    /// WebSocket client for user data stream
164    ws_client: BinanceWebSocketClient,
165
166    /// High-precision clock
167    clock: Clock,
168
169    /// Cache of instruments
170    instruments_cache: Arc<RwLock<SmallVec<[InstrumentId; 32]>>>,
171
172    /// Cache mapping `order_id` to symbol for fast lookup
173    /// Also includes negative cache entries to avoid repeated failed lookups
174    order_symbol_cache:
175        Arc<RwLock<rusty_common::collections::FxHashMap<SmartString, OrderSymbolCacheEntry>>>,
176
177    /// Exchange configuration
178    config: BinanceConfig,
179}
180
181impl Clone for BinanceExchange {
182    fn clone(&self) -> Self {
183        Self {
184            rest_client: self.rest_client.clone(),
185            ws_client: self.ws_client.clone(),
186            clock: self.clock.clone(),
187            instruments_cache: self.instruments_cache.clone(),
188            order_symbol_cache: self.order_symbol_cache.clone(),
189            config: self.config.clone(),
190        }
191    }
192}
193
194impl BinanceExchange {
195    /// Get direct access to REST client for performance-critical operations
196    #[must_use]
197    pub fn rest_client(&self) -> &BinanceRestClient {
198        &self.rest_client
199    }
200
201    /// Get direct access to WebSocket client for performance-critical operations
202    #[must_use]
203    pub const fn websocket_client(&self) -> &BinanceWebSocketClient {
204        &self.ws_client
205    }
206
207    /// Create standalone REST client with shared auth
208    #[must_use]
209    pub fn create_rest_client(api_key: String, secret_key: String) -> BinanceRestClient {
210        use rusty_common::auth::exchanges::binance::BinanceAuth;
211        let auth = BinanceAuth::new_hmac(api_key.into(), secret_key.into());
212        BinanceRestClient::new_with_auth(auth)
213    }
214
215    /// Create standalone WebSocket client with shared auth
216    #[must_use]
217    pub fn create_websocket_client(api_key: String, secret_key: String) -> BinanceWebSocketClient {
218        use rusty_common::auth::exchanges::binance::BinanceAuth;
219        let auth = Arc::new(BinanceAuth::new_hmac(api_key.into(), secret_key.into()));
220        BinanceWebSocketClient::new(auth)
221    }
222    /// Create a new Binance exchange adapter
223    #[must_use]
224    pub fn new(api_key: String, secret_key: String) -> Self {
225        Self::new_with_config(api_key, secret_key, BinanceConfig::default())
226    }
227
228    /// Create a new Binance exchange adapter with custom major symbols
229    #[must_use]
230    pub fn new_with_config(api_key: String, secret_key: String, config: BinanceConfig) -> Self {
231        use rusty_common::auth::exchanges::binance::BinanceAuth;
232
233        let auth = Arc::new(BinanceAuth::new_hmac(api_key.into(), secret_key.into()));
234        let rest_client = Arc::new(BinanceRestClient::new_with_auth((*auth).clone()));
235        let ws_client = BinanceWebSocketClient::new(auth);
236
237        Self {
238            rest_client,
239            ws_client,
240            clock: Clock::new(),
241            instruments_cache: Arc::new(RwLock::new(SmallVec::new())),
242            order_symbol_cache: Arc::new(RwLock::new(
243                rusty_common::collections::FxHashMap::default(),
244            )),
245            config,
246        }
247    }
248
249    /// Create a new Binance exchange adapter with Ed25519 authentication
250    pub fn new_ed25519(api_key: String, private_key: String) -> Result<Self> {
251        Self::new_ed25519_with_config(api_key, private_key, BinanceConfig::default())
252    }
253
254    /// Create a new Binance exchange adapter with Ed25519 authentication and custom configuration
255    pub fn new_ed25519_with_config(
256        api_key: String,
257        private_key: String,
258        config: BinanceConfig,
259    ) -> Result<Self> {
260        use rusty_common::auth::exchanges::binance::BinanceAuth;
261
262        let auth = Arc::new(
263            BinanceAuth::new_ed25519(api_key.into(), private_key.into())
264                .map_err(|e| anyhow!("Failed to create Ed25519 auth: {}", e))?,
265        );
266        let rest_client = Arc::new(BinanceRestClient::new_with_auth((*auth).clone()));
267        let ws_client = BinanceWebSocketClient::new(auth);
268
269        Ok(Self {
270            rest_client,
271            ws_client,
272            clock: Clock::new(),
273            instruments_cache: Arc::new(RwLock::new(SmallVec::new())),
274            order_symbol_cache: Arc::new(RwLock::new(
275                rusty_common::collections::FxHashMap::default(),
276            )),
277            config,
278        })
279    }
280
281    /// Cache the `order_id` to symbol mapping for efficient lookups
282    fn cache_order_symbol(&self, order_id: &str, symbol: &str) {
283        let mut cache = self.order_symbol_cache.write();
284
285        // Implement bounded cache - remove oldest entries if too large
286        const MAX_CACHE_SIZE: usize = 10000;
287        if cache.len() >= MAX_CACHE_SIZE {
288            // Find and remove the oldest entry
289            if let Some(oldest_key) = cache
290                .iter()
291                .min_by_key(|(_, entry)| entry.cached_at)
292                .map(|(k, _)| k.clone())
293            {
294                cache.remove(&oldest_key);
295            }
296        }
297
298        cache.insert(order_id.into(), OrderSymbolCacheEntry::found(symbol.into()));
299    }
300
301    /// Cache a negative result (order not found)
302    fn cache_order_not_found(&self, order_id: &str) {
303        let mut cache = self.order_symbol_cache.write();
304
305        // Same bounded cache logic
306        const MAX_CACHE_SIZE: usize = 10000;
307        if cache.len() >= MAX_CACHE_SIZE
308            && let Some(oldest_key) = cache
309                .iter()
310                .min_by_key(|(_, entry)| entry.cached_at)
311                .map(|(k, _)| k.clone())
312        {
313            cache.remove(&oldest_key);
314        }
315
316        cache.insert(order_id.into(), OrderSymbolCacheEntry::not_found());
317    }
318
319    /// Lookup symbol by `order_id`, returns None if not cached or expired
320    fn get_cached_symbol(&self, order_id: &str) -> Option<SmartString> {
321        let cache = self.order_symbol_cache.read();
322
323        // Cache entries expire after 1 hour
324        const CACHE_TTL: Duration = Duration::from_secs(3600);
325
326        cache.get(order_id).and_then(|entry| {
327            if entry.is_valid(CACHE_TTL) {
328                entry.symbol.clone()
329            } else {
330                None
331            }
332        })
333    }
334
335    /// Check if we have a negative cache entry for this order
336    fn is_order_not_found_cached(&self, order_id: &str) -> bool {
337        let cache = self.order_symbol_cache.read();
338
339        // Negative cache entries expire after 5 minutes
340        const NEGATIVE_CACHE_TTL: Duration = Duration::from_secs(300);
341
342        cache
343            .get(order_id)
344            .is_some_and(|entry| entry.symbol.is_none() && entry.is_valid(NEGATIVE_CACHE_TTL))
345    }
346
347    /// Remove order from cache (when order is complete or cancelled)
348    fn remove_from_cache(&self, order_id: &str) {
349        let mut cache = self.order_symbol_cache.write();
350        cache.remove(order_id);
351    }
352
353    /// Optimized fallback method to find order symbol with concurrent requests
354    /// Uses parallel requests to major symbols for minimal latency impact
355    /// Only used when order is not in cache (legacy orders)
356    async fn find_order_symbol(&self, order_id: &str) -> Result<SmartString> {
357        // First check cache
358        if let Some(symbol) = self.get_cached_symbol(order_id) {
359            return Ok(symbol);
360        }
361
362        // Check if we have a negative cache entry (order previously not found)
363        if self.is_order_not_found_cached(order_id) {
364            return Err(anyhow!(
365                "Order {} was previously not found (cached negative result). \
366                 This avoids repeated API calls for non-existent orders.",
367                order_id
368            ));
369        }
370
371        // Optimization: Use concurrent requests instead of sequential to minimize latency
372        // This reduces total lookup time from O(n) network requests to O(1) with parallelism
373        use futures::future::join_all;
374        use std::time::Duration;
375
376        // Extract timeout values before creating futures to avoid borrowing issues
377        let per_request_timeout_ms = self.config.order_lookup_timeout_ms;
378        let total_timeout_ms = self.config.order_lookup_total_timeout_ms;
379
380        // Create concurrent futures for all major symbols
381        let lookup_futures = self.config.major_symbols.iter().map(|symbol| {
382            let symbol = symbol.clone();
383            let order_id = order_id.to_string();
384            let rest_client = self.rest_client.clone();
385
386            async move {
387                // Add timeout to prevent hanging requests
388                let result = tokio::time::timeout(
389                    Duration::from_millis(per_request_timeout_ms),
390                    rest_client.get_order_status(&symbol, &order_id),
391                )
392                .await;
393
394                match result {
395                    Ok(Ok(_)) => Some(symbol), // Found the order in this symbol
396                    _ => None,                 // Either timeout or order not found
397                }
398            }
399        });
400
401        // Execute all requests concurrently with overall timeout
402        let results = tokio::time::timeout(
403            Duration::from_millis(total_timeout_ms),
404            join_all(lookup_futures),
405        )
406        .await
407        .map_err(|_| {
408            anyhow!(
409                "Timeout while searching for order {} in major symbols. \
410                 Consider caching order symbols at placement time for HFT performance.",
411                order_id
412            )
413        })?;
414
415        // Find the first successful result
416        if let Some(symbol) = results.into_iter().flatten().next() {
417            // Cache the found symbol for future lookups
418            self.cache_order_symbol(order_id, &symbol);
419            return Ok(symbol);
420        }
421
422        // Order not found in any major trading pairs
423        // Cache the negative result to avoid repeated lookups
424        self.cache_order_not_found(order_id);
425
426        // Provide actionable guidance for HFT systems
427        Err(anyhow!(
428            "Order {} not found in {} configured major trading pairs. \
429             For HFT systems, consider: 1) Adding the symbol to major_symbols configuration, \
430             2) Ensuring order_id -> symbol mapping is cached when placing orders, \
431             3) Using persistent cache across application restarts, \
432             4) Avoiding lookups for orders placed outside current session.",
433            order_id,
434            self.config.major_symbols.len()
435        ))
436    }
437
438    /// Convert internal `OrderType` to Binance order type
439    const fn map_order_type(order_type: OrderType) -> &'static str {
440        match order_type {
441            OrderType::Market => "MARKET",
442            OrderType::Limit => "LIMIT",
443            OrderType::Stop => "STOP_LOSS", // Maps to stop loss market
444            OrderType::StopLimit => "STOP_LOSS_LIMIT",
445            OrderType::FillOrKill => "LIMIT", // Binance doesn't directly support FOK
446            OrderType::ImmediateOrCancel => "LIMIT", // IOC behavior is set via time_in_force
447            OrderType::PostOnly => "LIMIT",   // Post only behavior is set via additional flag
448        }
449    }
450
451    /// Convert internal `OrderSide` to Binance side
452    const fn map_order_side(side: OrderSide) -> &'static str {
453        match side {
454            OrderSide::Buy => "BUY",
455            OrderSide::Sell => "SELL",
456        }
457    }
458
459    /// Convert internal `TimeInForce` to Binance TIF
460    const fn map_time_in_force(tif: TimeInForce) -> &'static str {
461        match tif {
462            TimeInForce::GTC => "GTC",
463            TimeInForce::IOC => "IOC",
464            TimeInForce::FOK => "FOK",
465            _ => "GTC", // Default to GTC for other types
466        }
467    }
468
469    /// Function to extract `time_in_force` from `order_type`
470    const fn get_time_in_force(&self, order_type: OrderType) -> TimeInForce {
471        match order_type {
472            OrderType::Market => TimeInForce::IOC,
473            OrderType::Limit => TimeInForce::GTC,
474            OrderType::Stop => TimeInForce::GTC,
475            OrderType::StopLimit => TimeInForce::GTC,
476            OrderType::FillOrKill => TimeInForce::FOK,
477            OrderType::ImmediateOrCancel => TimeInForce::IOC,
478            OrderType::PostOnly => TimeInForce::GTC,
479        }
480    }
481
482    /// Place an OCO (One-Cancels-Other) order
483    ///
484    /// # Arguments
485    /// * `symbol` - Trading symbol (e.g., "BTCUSDT")
486    /// * `side` - Order side (Buy/Sell)
487    /// * `quantity` - Order quantity
488    /// * `above_type` - Order type for above price (`STOP_LOSS_LIMIT`, `LIMIT_MAKER`, etc.)
489    /// * `below_type` - Order type for below price (`STOP_LOSS`, `STOP_LOSS_LIMIT`, etc.)
490    /// * `above_price` - Limit price for above order
491    /// * `below_price` - Limit price for below order
492    /// * `above_stop_price` - Stop price for above order
493    /// * `below_stop_price` - Stop price for below order
494    /// * `report_tx` - Channel to send execution reports
495    #[allow(clippy::too_many_arguments)]
496    pub async fn place_oco_order(
497        &self,
498        symbol: &str,
499        side: OrderSide,
500        quantity: Decimal,
501        above_type: &str,
502        below_type: &str,
503        above_price: Option<Decimal>,
504        below_price: Option<Decimal>,
505        above_stop_price: Option<Decimal>,
506        below_stop_price: Option<Decimal>,
507        report_tx: Sender<ExecutionReport>,
508    ) -> Result<()> {
509        let above_price_str = above_price.map(|p| p.to_string());
510        let below_price_str = below_price.map(|p| p.to_string());
511        let above_stop_price_str = above_stop_price.map(|p| p.to_string());
512        let below_stop_price_str = below_stop_price.map(|p| p.to_string());
513
514        let params = OcoOrderParams {
515            symbol,
516            side: Self::map_order_side(side),
517            quantity: &quantity.to_string(),
518            above_type,
519            below_type,
520            above_price: above_price_str.as_deref(),
521            above_stop_price: above_stop_price_str.as_deref(),
522            below_price: below_price_str.as_deref(),
523            below_stop_price: below_stop_price_str.as_deref(),
524            list_client_order_id: None,
525        };
526
527        match self.rest_client.place_oco_order(params).await {
528            Ok(oco_response) => {
529                // Send reports for both orders in the OCO
530                for order_report in oco_response.order_reports {
531                    let report = ExecutionReport {
532                        id: id_generation::generate_report_id("oco", &order_report.client_order_id),
533                        order_id: order_report.client_order_id.clone(),
534                        exchange_timestamp: oco_response.transaction_time * 1_000_000,
535                        system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
536                        instrument_id: InstrumentId::new(symbol, Venue::Binance),
537                        status: map_order_status(&order_report.status),
538                        filled_quantity: Decimal::from_str_exact(&order_report.executed_qty)
539                            .unwrap_or(Decimal::ZERO),
540                        remaining_quantity: {
541                            let original = Decimal::from_str_exact(&order_report.original_qty)
542                                .unwrap_or(Decimal::ZERO);
543                            let executed = Decimal::from_str_exact(&order_report.executed_qty)
544                                .unwrap_or(Decimal::ZERO);
545                            original - executed
546                        },
547                        execution_price: Decimal::from_str_exact(&order_report.price).ok(),
548                        reject_reason: None,
549                        exchange_execution_id: Some(order_report.order_id.to_string().into()),
550                        is_final: false,
551                    };
552
553                    report_tx.send_async(report).await?;
554                }
555                Ok(())
556            }
557            Err(e) => {
558                let error_report = ExecutionReport {
559                    id: id_generation::generate_report_id_with_uuid("oco_rej"),
560                    order_id: id_generation::generate_report_id_with_uuid("oco"),
561                    exchange_timestamp: 0,
562                    system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
563                    instrument_id: InstrumentId::new(symbol, Venue::Binance),
564                    status: OrderStatus::Rejected,
565                    filled_quantity: Decimal::ZERO,
566                    remaining_quantity: quantity,
567                    execution_price: None,
568                    reject_reason: Some(e.to_string().into()),
569                    exchange_execution_id: None,
570                    is_final: true,
571                };
572
573                report_tx.send_async(error_report).await?;
574                Err(anyhow!("OCO order placement failed: {}", e))
575            }
576        }
577    }
578
579    /// Place a SOR (Smart Order Routing) order
580    ///
581    /// # Arguments
582    /// * `symbol` - Trading symbol (e.g., "BTCUSDT")
583    /// * `side` - Order side (Buy/Sell)
584    /// * `order_type` - Order type (Limit/Market)
585    /// * `quantity` - Order quantity
586    /// * `price` - Order price (required for LIMIT orders)
587    /// * `time_in_force` - Time in force (GTC/IOC/FOK)
588    /// * `report_tx` - Channel to send execution reports
589    #[allow(clippy::too_many_arguments)]
590    pub async fn place_sor_order(
591        &self,
592        symbol: &str,
593        side: OrderSide,
594        order_type: OrderType,
595        quantity: Decimal,
596        price: Option<Decimal>,
597        time_in_force: Option<TimeInForce>,
598        report_tx: Sender<ExecutionReport>,
599    ) -> Result<()> {
600        let price_str = price.map(|p| p.to_string());
601        let tif_str = time_in_force.map(Self::map_time_in_force);
602
603        let params = SorOrderParams {
604            symbol,
605            side: Self::map_order_side(side),
606            order_type: Self::map_order_type(order_type),
607            quantity: &quantity.to_string(),
608            price: price_str.as_deref(),
609            time_in_force: tif_str,
610            client_order_id: None,
611            strategy_id: None,
612            strategy_type: None,
613        };
614
615        match self.rest_client.place_sor_order(params).await {
616            Ok(sor_response) => {
617                let report = ExecutionReport {
618                    id: id_generation::generate_report_id("sor", &sor_response.client_order_id),
619                    order_id: sor_response.client_order_id.clone(),
620                    exchange_timestamp: sor_response.transaction_time * 1_000_000,
621                    system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
622                    instrument_id: InstrumentId::new(symbol, Venue::Binance),
623                    status: map_order_status(&sor_response.status),
624                    filled_quantity: Decimal::from_str_exact(&sor_response.executed_qty)
625                        .unwrap_or(Decimal::ZERO),
626                    remaining_quantity: {
627                        let original = Decimal::from_str_exact(&sor_response.original_qty)
628                            .unwrap_or(Decimal::ZERO);
629                        let executed = Decimal::from_str_exact(&sor_response.executed_qty)
630                            .unwrap_or(Decimal::ZERO);
631                        original - executed
632                    },
633                    execution_price: Decimal::from_str_exact(&sor_response.price).ok(),
634                    reject_reason: None,
635                    exchange_execution_id: Some(sor_response.order_id.to_string().into()),
636                    is_final: false,
637                };
638
639                report_tx.send_async(report).await?;
640                Ok(())
641            }
642            Err(e) => {
643                let error_report = ExecutionReport {
644                    id: id_generation::generate_report_id_with_uuid("sor_rej"),
645                    order_id: id_generation::generate_report_id_with_uuid("sor"),
646                    exchange_timestamp: 0,
647                    system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
648                    instrument_id: InstrumentId::new(symbol, Venue::Binance),
649                    status: OrderStatus::Rejected,
650                    filled_quantity: Decimal::ZERO,
651                    remaining_quantity: quantity,
652                    execution_price: None,
653                    reject_reason: Some(e.to_string().into()),
654                    exchange_execution_id: None,
655                    is_final: true,
656                };
657
658                report_tx.send_async(error_report).await?;
659                Err(anyhow!("SOR order placement failed: {}", e))
660            }
661        }
662    }
663
664    /// Place multiple orders concurrently
665    ///
666    /// # Arguments
667    /// * `orders` - Vector of orders to place
668    /// * `report_tx` - Channel to send execution reports
669    ///
670    /// # Returns
671    /// * `Result<BatchResult<()>>` - Detailed result with per-order status and errors
672    pub async fn place_batch_orders(
673        &self,
674        orders: Vec<Order>,
675        report_tx: Sender<ExecutionReport>,
676    ) -> Result<BatchResult<()>> {
677        let start_time = self.clock.raw();
678        let total_orders = orders.len();
679
680        // Place orders concurrently using individual place_order calls
681        use futures::future::join_all;
682
683        let order_futures: Vec<_> = orders
684            .into_iter()
685            .map(|order| {
686                let report_tx = report_tx.clone();
687                let order_id: SmartString = order.id.to_string().into();
688                async move {
689                    let result = self.place_order(order.clone(), report_tx).await;
690                    (order_id, order, result)
691                }
692            })
693            .collect();
694
695        let results = join_all(order_futures).await;
696
697        let mut order_results: OrderResultMap<()> = OrderResultMap::default();
698        let mut successful_orders = 0;
699        let mut failed_orders = 0;
700        let mut retryable_orders = 0;
701
702        for (order_id, order, result) in results {
703            match result {
704                Ok(()) => {
705                    successful_orders += 1;
706                    log::debug!("Batch order {order_id} placed successfully");
707                    order_results.insert(order_id.clone(), OrderResult::Success(()));
708                }
709                Err(e) => {
710                    failed_orders += 1;
711                    log::error!("Batch order {order_id} failed: {e}");
712
713                    // Convert anyhow::Error to EMSError, preserving the original error type if possible
714                    let ems_error = if let Some(specific_error) = e.downcast_ref::<EMSError>() {
715                        specific_error.clone()
716                    } else {
717                        EMSError::OrderSubmissionError(
718                            format!("Order {order_id} failed: {e}").into(),
719                        )
720                    };
721
722                    // Use the EMSError's built-in is_recoverable method
723                    let is_retryable = ems_error.is_recoverable();
724
725                    if is_retryable {
726                        retryable_orders += 1;
727                    }
728
729                    order_results.insert(
730                        order_id.clone(),
731                        OrderResult::Failed {
732                            error: ems_error,
733                            order: Box::new(order),
734                            is_retryable,
735                        },
736                    );
737                }
738            }
739        }
740
741        let processing_time_ns = self.clock.raw() - start_time;
742
743        // Determine overall status
744        let status = if failed_orders == 0 {
745            BatchStatus::AllSucceeded
746        } else if successful_orders == 0 {
747            BatchStatus::AllFailed
748        } else {
749            BatchStatus::PartialSuccess
750        };
751
752        let batch_result = BatchResult {
753            status,
754            order_results,
755            summary: BatchSummary {
756                total_orders,
757                successful_orders,
758                failed_orders,
759                retryable_orders,
760                processing_time_ns,
761            },
762            transport_error: None,
763        };
764
765        Ok(batch_result)
766    }
767
768    /// Get SOR configuration for available symbols
769    pub async fn get_sor_configuration(&self) -> Result<Vec<simd_json::OwnedValue>> {
770        let configs = self.rest_client.get_sor_config().await?;
771
772        // Convert BinanceSorConfig to OwnedValue directly
773        let mut json_values = Vec::with_capacity(configs.len());
774        for config in configs {
775            // Direct conversion without serialize-then-parse cycle
776            let value = simd_json::serde::to_owned_value(&config)?;
777            json_values.push(value);
778        }
779
780        Ok(json_values)
781    }
782}
783
784#[async_trait]
785impl Exchange for BinanceExchange {
786    fn venue(&self) -> Venue {
787        Venue::Binance
788    }
789
790    async fn place_order(&self, order: Order, report_tx: Sender<ExecutionReport>) -> Result<()> {
791        // Start WebSocket user data stream if not already connected
792        if !self.ws_client.is_connected() {
793            self.ws_client
794                .start_user_data_stream(report_tx.clone(), None)
795                .await?;
796        }
797
798        let symbol = order.symbol.clone();
799        let side = Self::map_order_side(order.side);
800        let order_type_clone = order.order_type;
801        let tif = self.get_time_in_force(order_type_clone);
802        let order_type = Self::map_order_type(order.order_type);
803        let time_in_force = Self::map_time_in_force(tif);
804
805        // Use the REST client to place the order
806        let params = PlaceOrderParams {
807            symbol: symbol.clone(),
808            side: side.into(),
809            order_type: order_type.into(),
810            time_in_force: time_in_force.into(),
811            quantity: order.quantity.to_string().into(),
812            price: order
813                .price
814                .map_or_else(|| "0".to_string(), |p| p.to_string())
815                .into(),
816            client_order_id: order.id.to_string().into(),
817        };
818
819        let result = self.rest_client.place_order(params).await;
820
821        match result {
822            Ok(order_response) => {
823                // Send acknowledgment report
824                let report = ExecutionReport {
825                    id: id_generation::generate_ack_id(&order.id.to_string()),
826                    order_id: order.id.to_string().into(),
827                    exchange_timestamp: order_response.transaction_time * 1_000_000, // Convert ms to ns
828                    system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
829                    instrument_id: InstrumentId::new(order.symbol.clone(), order.venue),
830                    status: map_order_status(&order_response.status),
831                    filled_quantity: Decimal::from_str_exact(&order_response.executed_qty)
832                        .unwrap_or(Decimal::ZERO),
833                    remaining_quantity: {
834                        let original = Decimal::from_str_exact(&order_response.original_qty)
835                            .unwrap_or(Decimal::ZERO);
836                        let executed = Decimal::from_str_exact(&order_response.executed_qty)
837                            .unwrap_or(Decimal::ZERO);
838                        original - executed
839                    },
840                    execution_price: Decimal::from_str_exact(&order_response.price).ok(),
841                    reject_reason: None,
842                    exchange_execution_id: Some(order_response.order_id.to_string().into()),
843                    is_final: false,
844                };
845
846                report_tx.send_async(report).await?;
847
848                // Cache the order_id to symbol mapping for efficient future lookups
849                self.cache_order_symbol(&order_response.order_id.to_string(), &symbol);
850            }
851            Err(e) => {
852                // Send rejection report
853                let report = ExecutionReport {
854                    id: id_generation::generate_rejection_id(&order.id.to_string()),
855                    order_id: order.id.to_string().into(),
856                    exchange_timestamp: 0,
857                    system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
858                    instrument_id: InstrumentId::new(order.symbol.clone(), order.venue),
859                    status: OrderStatus::Rejected,
860                    filled_quantity: Decimal::ZERO,
861                    remaining_quantity: order.quantity,
862                    execution_price: None,
863                    reject_reason: Some(e.to_string().into()),
864                    exchange_execution_id: None,
865                    is_final: true,
866                };
867
868                report_tx.send_async(report).await?;
869                return Err(anyhow!("Order placement failed: {}", e));
870            }
871        }
872
873        Ok(())
874    }
875
876    async fn cancel_order(
877        &self,
878        order_id: SmartString,
879        report_tx: Sender<ExecutionReport>,
880    ) -> Result<()> {
881        // Use cached symbol or fallback to symbol discovery
882        let symbol = match self.find_order_symbol(&order_id).await {
883            Ok(symbol) => symbol,
884            Err(e) => {
885                return Err(anyhow!(
886                    "Failed to find order symbol for order {}: {}",
887                    order_id,
888                    e
889                ));
890            }
891        };
892
893        // Clone order_id for cache removal after successful cancellation
894        let order_id_str = order_id.to_string();
895
896        // Use the REST client to cancel the order
897        let result = self.rest_client.cancel_order(&symbol, &order_id).await;
898
899        match result {
900            Ok(cancel_response) => {
901                // Create instrument ID
902                let instrument = InstrumentId {
903                    symbol: cancel_response.symbol,
904                    venue: Venue::Binance,
905                };
906
907                // Send cancellation report
908                let report = ExecutionReport {
909                    id: id_generation::generate_cancel_id(&order_id),
910                    order_id,
911                    exchange_timestamp: cancel_response.transaction_time * 1_000_000, // Convert ms to ns
912                    system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
913                    instrument_id: instrument,
914                    status: OrderStatus::Cancelled,
915                    filled_quantity: Decimal::from_str_exact(&cancel_response.executed_qty)
916                        .unwrap_or(Decimal::ZERO),
917                    remaining_quantity: {
918                        let original = Decimal::from_str_exact(&cancel_response.original_qty)
919                            .unwrap_or(Decimal::ZERO);
920                        let executed = Decimal::from_str_exact(&cancel_response.executed_qty)
921                            .unwrap_or(Decimal::ZERO);
922                        original - executed
923                    },
924                    execution_price: Decimal::from_str_exact(&cancel_response.price).ok(),
925                    reject_reason: None,
926                    exchange_execution_id: Some(cancel_response.order_id.to_string().into()),
927                    is_final: true,
928                };
929
930                report_tx.send_async(report).await?;
931
932                // Remove from cache since order is cancelled
933                self.remove_from_cache(&order_id_str);
934            }
935            Err(e) => {
936                return Err(anyhow!("Order cancellation failed: {}", e));
937            }
938        }
939
940        Ok(())
941    }
942
943    async fn modify_order(
944        &self,
945        order_id: SmartString,
946        new_price: Option<Decimal>,
947        new_quantity: Option<Decimal>,
948        report_tx: Sender<ExecutionReport>,
949    ) -> Result<()> {
950        // Try to use the new order amend functionality first, fall back to cancel-and-replace
951
952        // Use cached symbol or fallback to symbol discovery
953        let symbol = match self.find_order_symbol(&order_id).await {
954            Ok(symbol) => symbol,
955            Err(e) => {
956                return Err(anyhow!(
957                    "Failed to find order symbol for order {}: {}",
958                    order_id,
959                    e
960                ));
961            }
962        };
963
964        let order_response = {
965            match self.rest_client.get_order_status(&symbol, &order_id).await {
966                Ok(order) => order,
967                Err(e) => {
968                    return Err(anyhow!(
969                        "Failed to get order status for order {}: {}",
970                        order_id,
971                        e
972                    ));
973                }
974            }
975        };
976
977        // Try to amend the order if only quantity is being modified and it's a reduction
978        if let Some(new_qty) = new_quantity
979            && new_price.is_none()
980        {
981            let original_qty =
982                Decimal::from_str_exact(&order_response.original_qty).unwrap_or(Decimal::ZERO);
983            if new_qty < original_qty {
984                // Try to use the new amend functionality
985                if let Ok(amended_order) = self
986                    .rest_client
987                    .amend_order_keep_priority(&order_response.symbol, &order_id, new_qty)
988                    .await
989                {
990                    let report = ExecutionReport {
991                        id: id_generation::generate_report_id("amend", &order_id),
992                        order_id: order_id.clone(),
993                        exchange_timestamp: amended_order.transaction_time * 1_000_000,
994                        system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
995                        instrument_id: InstrumentId::new(
996                            amended_order.symbol.clone(),
997                            Venue::Binance,
998                        ),
999                        status: map_order_status(&amended_order.status),
1000                        filled_quantity: Decimal::from_str_exact(&amended_order.executed_qty)
1001                            .unwrap_or(Decimal::ZERO),
1002                        remaining_quantity: {
1003                            let original = Decimal::from_str_exact(&amended_order.original_qty)
1004                                .unwrap_or(Decimal::ZERO);
1005                            let executed = Decimal::from_str_exact(&amended_order.executed_qty)
1006                                .unwrap_or(Decimal::ZERO);
1007                            original - executed
1008                        },
1009                        execution_price: Decimal::from_str_exact(&amended_order.price).ok(),
1010                        reject_reason: None,
1011                        exchange_execution_id: Some(amended_order.order_id.to_string().into()),
1012                        is_final: false,
1013                    };
1014
1015                    report_tx.send_async(report).await?;
1016                    return Ok(());
1017                }
1018                // Fall back to cancel-and-replace if amend fails
1019            }
1020        }
1021
1022        // Fall back to cancel-and-replace for other modifications
1023
1024        // Cancel the existing order
1025        self.cancel_order(order_id.clone(), report_tx.clone())
1026            .await?;
1027
1028        // Create a new order with the updated parameters
1029        let instrument = InstrumentId {
1030            symbol: order_response.symbol.clone(),
1031            venue: Venue::Binance,
1032        };
1033
1034        let price = new_price.unwrap_or_else(|| {
1035            Decimal::from_str_exact(&order_response.price).unwrap_or(Decimal::ZERO)
1036        });
1037
1038        let quantity = new_quantity.unwrap_or_else(|| {
1039            Decimal::from_str_exact(&order_response.original_qty).unwrap_or(Decimal::ZERO)
1040        });
1041
1042        let side = match order_response.side.as_str() {
1043            "BUY" => OrderSide::Buy,
1044            _ => OrderSide::Sell,
1045        };
1046
1047        let order_type = match order_response.order_type.as_str() {
1048            "MARKET" => OrderType::Market,
1049            _ => OrderType::Limit,
1050        };
1051
1052        // Create a new order with modified parameters
1053        let new_order = Order::new(
1054            instrument.venue,
1055            instrument.symbol.clone(),
1056            side,
1057            order_type,
1058            quantity,
1059            Some(price),
1060            ClientId::new(format!("{}_{}", order_id, timestamp_millis())),
1061        );
1062
1063        // Place the new order
1064        self.place_order(new_order, report_tx).await?;
1065
1066        Ok(())
1067    }
1068
1069    async fn cancel_all_orders(
1070        &self,
1071        instrument_id: Option<InstrumentId>,
1072        _report_tx: Sender<ExecutionReport>,
1073    ) -> Result<()> {
1074        if let Some(instrument) = instrument_id {
1075            // Cancel all orders for a specific symbol
1076            self.rest_client
1077                .cancel_all_orders(&instrument.symbol)
1078                .await?;
1079        } else {
1080            // To cancel all orders across all symbols, we'd need to get a list of all symbols
1081            // and call cancel_all_orders for each one. However, this is not very efficient.
1082            // For now, we'll just return an error.
1083            return Err(anyhow!(
1084                "Cancelling all orders globally is not supported by Binance API directly. Please specify an instrument."
1085            ));
1086        }
1087
1088        // We don't send individual cancellation reports as Binance doesn't return them
1089        // The user data stream will provide the cancellation notices
1090
1091        Ok(())
1092    }
1093
1094    async fn get_order_status(&self, order_id: &str) -> Result<OrderStatus> {
1095        // Use cached symbol or fallback to symbol discovery
1096        let symbol = match self.find_order_symbol(order_id).await {
1097            Ok(symbol) => symbol,
1098            Err(e) => {
1099                return Err(anyhow!(
1100                    "Failed to find order symbol for order {}: {}",
1101                    order_id,
1102                    e
1103                ));
1104            }
1105        };
1106
1107        match self.rest_client.get_order_status(&symbol, order_id).await {
1108            Ok(order_response) => Ok(map_order_status(&order_response.status)),
1109            Err(e) => Err(anyhow!("Failed to get order status: {}", e)),
1110        }
1111    }
1112
1113    async fn connect(&self, _report_sender: Sender<ExecutionReport>) -> Result<()> {
1114        // Test connection by calling an API on the REST client
1115        // We'll use get_exchange_info as a ping to verify connectivity
1116        match self.rest_client.get_exchange_info().await {
1117            Ok(_) => {
1118                // Connection is good - the WebSocket will be connected when needed
1119                Ok(())
1120            }
1121            Err(e) => Err(anyhow!("Failed to connect to Binance API: {}", e)),
1122        }
1123    }
1124
1125    async fn disconnect(&self) -> Result<()> {
1126        // Disconnect the WebSocket client
1127        self.ws_client.disconnect().await?;
1128        Ok(())
1129    }
1130
1131    async fn is_connected(&self) -> bool {
1132        // Check if WebSocket is connected
1133        self.ws_client.is_connected()
1134    }
1135
1136    async fn get_instruments(&self) -> Result<SmallVec<[InstrumentId; 32]>> {
1137        // Check if we have cached instruments
1138        {
1139            let cached = self.instruments_cache.read();
1140            if !cached.is_empty() {
1141                return Ok(cached.clone());
1142            }
1143        }
1144
1145        // Fetch exchange info using the REST client
1146        let exchange_info = self.rest_client.get_exchange_info().await?;
1147
1148        // Process symbols
1149        let mut instruments = SmallVec::new();
1150
1151        for symbol in exchange_info.symbols {
1152            if symbol.status == "TRADING" {
1153                instruments.push(InstrumentId {
1154                    symbol: symbol.symbol,
1155                    venue: Venue::Binance,
1156                });
1157            }
1158        }
1159
1160        // Cache the instruments
1161        *self.instruments_cache.write() = instruments.clone();
1162
1163        Ok(instruments)
1164    }
1165
1166    async fn send_fix_message(&self, message: Vec<u8>) -> Result<()> {
1167        log::warn!(
1168            "BinanceExchange: send_fix_message not implemented. Message: {:?}",
1169            message
1170        );
1171        Err(anyhow!("FIX message sending not implemented for Binance"))
1172    }
1173
1174    async fn receive_fix_message(&self) -> Result<Vec<u8>> {
1175        log::warn!("BinanceExchange: receive_fix_message not implemented.");
1176        Err(anyhow!("FIX message receiving not implemented for Binance"))
1177    }
1178}
1179
1180/// Test the Binance adapter can connect and retrieve instruments
1181#[cfg(test)]
1182mod tests {
1183    use super::*;
1184    use dotenv::dotenv;
1185    use std::env;
1186
1187    #[tokio::test]
1188    async fn test_connect_and_get_instruments() {
1189        dotenv().ok();
1190
1191        let api_key = env::var("BINANCE_API_KEY").unwrap_or_default();
1192        let secret_key = env::var("BINANCE_SECRET_KEY").unwrap_or_default();
1193
1194        if api_key.is_empty() || secret_key.is_empty() {
1195            println!("Skipping test: BINANCE_API_KEY and BINANCE_SECRET_KEY must be set");
1196            return;
1197        }
1198
1199        let exchange = BinanceExchange::new(api_key, secret_key);
1200
1201        // Test connection
1202        let (report_tx, _report_rx) = flume::bounded::<ExecutionReport>(10);
1203        let result = exchange.connect(report_tx).await;
1204        assert!(result.is_ok(), "Failed to connect: {result:?}");
1205
1206        // Test getting instruments
1207        let result = exchange.get_instruments().await;
1208        assert!(result.is_ok(), "Failed to get instruments: {result:?}");
1209
1210        let instruments = result.unwrap();
1211        assert!(!instruments.is_empty(), "No instruments returned");
1212
1213        println!("Found {} instruments", instruments.len());
1214        for (i, instrument) in instruments.iter().take(5).enumerate() {
1215            println!("  {}: {}", i + 1, instrument.symbol);
1216        }
1217
1218        // Test disconnection
1219        let result = exchange.disconnect().await;
1220        assert!(result.is_ok(), "Failed to disconnect: {result:?}");
1221    }
1222
1223    #[tokio::test]
1224    async fn test_send_fix_message_not_implemented() {
1225        let exchange =
1226            BinanceExchange::new("test_api_key".to_string(), "test_secret_key".to_string());
1227
1228        let message = vec![b'T', b'E', b'S', b'T'];
1229        let result = exchange.send_fix_message(message).await;
1230
1231        assert!(result.is_err());
1232        let err = result.unwrap_err();
1233        assert!(
1234            err.to_string()
1235                .contains("FIX message sending not implemented for Binance")
1236        );
1237    }
1238
1239    #[tokio::test]
1240    async fn test_receive_fix_message_not_implemented() {
1241        let exchange =
1242            BinanceExchange::new("test_api_key".to_string(), "test_secret_key".to_string());
1243
1244        let result = exchange.receive_fix_message().await;
1245
1246        assert!(result.is_err());
1247        let err = result.unwrap_err();
1248        assert!(
1249            err.to_string()
1250                .contains("FIX message receiving not implemented for Binance")
1251        );
1252    }
1253}