rusty_ems/exchanges/
binance_websocket_trading.rs

1//! Binance WebSocket Trading Implementation
2//!
3//! This module provides a high-performance WebSocket trading client for Binance exchange
4//! with robust connection management, automatic reconnection, and comprehensive health monitoring.
5//!
6//! # Features
7//!
8//! - **Dual-level ping/pong mechanism**: Both WebSocket protocol-level and JSON API-level
9//! - **Automatic reconnection**: Configurable backoff strategy with exponential delays
10//! - **Connection health monitoring**: Real-time metrics and health status
11//! - **Ed25519 authentication**: Support for both raw and DER-encoded private keys
12//! - **Zero-copy message processing**: Optimized for low-latency trading
13//!
14//! # Connection Management
15//!
16//! The client maintains connection health through:
17//! - WebSocket ping frames sent every 30 seconds
18//! - JSON ping requests for API-level health checks
19//! - Automatic reconnection on connection loss
20//! - Configurable pong timeout (default: 10 seconds)
21//!
22//! # Reconnection Behavior
23//!
24//! When connection is lost, the client will:
25//! 1. Detect timeout through missing pong responses
26//! 2. Kill the existing socket connection
27//! 3. Wait for backoff period (starts at 1s, doubles each attempt)
28//! 4. Attempt to reconnect and re-authenticate
29//! 5. Reset backoff on successful connection
30//!
31//! Configuration:
32//! - Initial backoff: 1 second
33//! - Max backoff: 60 seconds
34//! - Max attempts: 10 (configurable via `MAX_RECONNECTION_ATTEMPTS`)
35//!
36//! # Usage Example
37//!
38//! ```rust,no_run
39//! use rusty_common::auth::exchanges::binance::BinanceAuth;
40//! use rusty_ems::exchanges::binance_websocket_trading::BinanceWebSocketTrader;
41//! use std::sync::Arc;
42//!
43//! #[tokio::main]
44//! async fn main() -> Result<()> {
45//!     let auth = Arc::new(BinanceAuth::new_ed25519(
46//!         "api_key".into(),
47//!         "private_key_base64".into()
48//!     )?);
49//!
50//!     let trader = BinanceWebSocketTrader::new(auth);
51//!     let (report_tx, report_rx) = flume::bounded(100);
52//!
53//!     // Connect and authenticate
54//!     trader.connect(report_tx).await?;
55//!
56//!     // Check connection health
57//!     let health = trader.get_connection_health();
58//!     println!("Connection healthy: {}", health.is_healthy);
59//!
60//!     // Send JSON ping
61//!     trader.send_ping().await?;
62//!
63//!     Ok(())
64//! }
65//! ```
66
67use rusty_common::collections::FxHashMap;
68use std::sync::Arc;
69use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
70
71use anyhow::{Result, anyhow, bail};
72use async_trait::async_trait;
73use flume::Sender;
74use futures::{SinkExt, StreamExt};
75use log::{debug, error, info, warn};
76use parking_lot::RwLock;
77use quanta::Clock;
78use rand::Rng;
79use rust_decimal::Decimal;
80use rusty_common::SmartString;
81use rusty_common::auth::exchanges::binance::BinanceAuth;
82use rusty_common::id_generation;
83use rusty_common::websocket::connector::{WebSocketSink, WebSocketStream};
84use rusty_common::websocket::{Message, WebSocketConfig, WebSocketConnector};
85use rusty_model::{
86    enums::{OrderSide, OrderStatus, OrderType, TimeInForce},
87    instruments::InstrumentId,
88    position::{MarginType, PositionSide, PositionUpdate},
89    trading_order::Order,
90    venues::Venue,
91};
92use simd_json::prelude::*;
93use simd_json::value::owned::{Object, Value as JsonValue};
94use smallvec::SmallVec;
95use std::time::Duration;
96use tokio::sync::RwLock as AsyncRwLock;
97use tokio::task::JoinHandle;
98use tokio::time::interval;
99use uuid::Uuid;
100use yawc::frame::OpCode;
101
102use crate::error::batch_errors::{BatchResult, BatchStatus, OrderResult, OrderResultMap};
103use crate::execution_engine::ExecutionReport;
104use crate::position_manager::PositionManager;
105
106/// WebSocket API URLs for different environments
107const BINANCE_WS_API_URL: &str = "wss://ws-api.binance.com/ws-api/v3";
108const BINANCE_WS_API_TESTNET_URL: &str = "wss://ws-api.testnet.binance.vision/ws-api/v3";
109
110/// Ping interval for keeping WebSocket connection alive (30 seconds)
111const PING_INTERVAL_SECONDS: u64 = 30;
112
113/// Pong timeout (10 seconds - if no pong received within this time, consider connection dead)
114const PONG_TIMEOUT_SECONDS: u64 = 10;
115
116/// Maximum message size allowed (10MB)
117const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
118
119/// Maximum pending requests before cleanup
120const MAX_PENDING_REQUESTS: usize = 10000;
121
122/// Rate limiting constants for batch orders
123const MAX_ORDERS_PER_10_SECONDS: u32 = 300;
124const RATE_LIMIT_WINDOW_MS: u64 = 10_000; // 10 seconds in milliseconds
125const MAX_BATCH_SIZE: usize = 50; // Maximum orders in a single batch (Binance WebSocket rate limit is 300/10s)
126
127/// Order rate limiter for batch orders
128struct OrderRateLimiter {
129    order_times: std::collections::VecDeque<u64>,
130    clock: Clock,
131}
132
133impl OrderRateLimiter {
134    const fn new(clock: Clock) -> Self {
135        Self {
136            order_times: std::collections::VecDeque::new(),
137            clock,
138        }
139    }
140
141    /// Remove expired timestamps from the queue
142    fn cleanup_expired(&mut self) {
143        let now = self.clock.raw() / 1_000_000; // Convert to milliseconds
144        let window_start = now.saturating_sub(RATE_LIMIT_WINDOW_MS);
145
146        while let Some(&front) = self.order_times.front() {
147            if front < window_start {
148                self.order_times.pop_front();
149            } else {
150                break;
151            }
152        }
153    }
154
155    fn can_place_orders(&mut self, count: usize) -> bool {
156        self.cleanup_expired();
157        // Check if we can place the requested number of orders
158        self.order_times.len() + count <= MAX_ORDERS_PER_10_SECONDS as usize
159    }
160
161    fn record_orders(&mut self, count: usize) {
162        let now = self.clock.raw() / 1_000_000; // Convert to milliseconds
163        for _ in 0..count {
164            self.order_times.push_back(now);
165        }
166    }
167
168    fn get_current_usage(&mut self) -> (usize, usize) {
169        self.cleanup_expired();
170        (self.order_times.len(), MAX_ORDERS_PER_10_SECONDS as usize)
171    }
172
173    fn current_order_count(&mut self) -> usize {
174        self.cleanup_expired();
175        self.order_times.len()
176    }
177}
178
179/// Connection state for proper state machine tracking
180#[derive(Debug, Clone, Copy, PartialEq, Eq)]
181#[repr(u8)]
182pub enum ConnectionState {
183    /// Not connected to the WebSocket
184    Disconnected = 0,
185    /// Establishing WebSocket connection
186    Connecting = 1,
187    /// WebSocket connection established but not authenticated
188    Connected = 2,
189    /// Authentication process in progress
190    Authenticating = 3,
191    /// WebSocket connected and authenticated for trading
192    Authenticated = 4,
193    /// Disconnection process in progress
194    Disconnecting = 5,
195}
196
197impl From<u8> for ConnectionState {
198    fn from(value: u8) -> Self {
199        match value {
200            0 => Self::Disconnected,
201            1 => Self::Connecting,
202            2 => Self::Connected,
203            3 => Self::Authenticating,
204            4 => Self::Authenticated,
205            5 => Self::Disconnecting,
206            _ => Self::Disconnected,
207        }
208    }
209}
210
211/// Request ID generator with overflow protection
212#[derive(Debug, Clone)]
213enum RequestId {
214    Sequential(u64),
215    Uuid(SmartString),
216}
217
218impl RequestId {
219    fn as_json_value(&self) -> JsonValue {
220        match self {
221            Self::Sequential(id) => JsonValue::from(*id),
222            Self::Uuid(id) => JsonValue::from(id.as_str()),
223        }
224    }
225
226    /// Create a lookup key without string allocation for sequential IDs
227    fn to_lookup_key(&self) -> RequestKey {
228        match self {
229            Self::Sequential(id) => RequestKey::Sequential(*id),
230            Self::Uuid(id) => RequestKey::Uuid(id.clone()),
231        }
232    }
233}
234
235/// Optimized key type for pending requests lookup
236/// Avoids string allocations for numeric IDs in hot paths
237#[derive(Debug, Clone, PartialEq, Eq, Hash)]
238enum RequestKey {
239    Sequential(u64),
240    Uuid(SmartString),
241}
242
243impl From<&RequestId> for RequestKey {
244    fn from(req_id: &RequestId) -> Self {
245        req_id.to_lookup_key()
246    }
247}
248
249/// Request ID generator for WebSocket API requests
250#[derive(Debug)]
251struct RequestIdGenerator {
252    counter: AtomicU64,
253    overflow_detected: AtomicBool,
254}
255
256impl RequestIdGenerator {
257    const fn new() -> Self {
258        Self {
259            counter: AtomicU64::new(1),
260            overflow_detected: AtomicBool::new(false),
261        }
262    }
263
264    fn next_id(&self) -> RequestId {
265        if self.overflow_detected.load(Ordering::Acquire) {
266            // Already in UUID mode
267            return RequestId::Uuid(SmartString::from(Uuid::new_v4().to_string()));
268        }
269
270        let id = self.counter.fetch_add(1, Ordering::SeqCst);
271        if id == u64::MAX {
272            // Overflow detected, switch to UUID mode
273            self.overflow_detected.store(true, Ordering::Release);
274            RequestId::Uuid(SmartString::from(Uuid::new_v4().to_string()))
275        } else {
276            RequestId::Sequential(id)
277        }
278    }
279}
280
281/// Parameters for modifying an order
282#[derive(Debug, Clone)]
283pub struct ModifyOrderParams {
284    /// Unique identifier of the order to modify
285    pub order_id: SmartString,
286    /// Trading pair symbol (e.g., "BTCUSDT")
287    pub symbol: SmartString,
288    /// Order side (buy or sell)
289    pub side: OrderSide,
290    /// Type of order (limit, market, etc.)
291    pub order_type: OrderType,
292    /// New quantity for the order
293    pub new_quantity: Decimal,
294    /// New price for the order (None for market orders)
295    pub new_price: Option<Decimal>,
296}
297
298/// Pending request tracking with proper ID type
299#[derive(Debug, Clone)]
300struct PendingRequest {
301    pub method: SmartString,
302    pub timestamp: u64,
303    pub report_tx: Option<Sender<ExecutionReport>>,
304    pub request_id: RequestId,
305}
306
307/// Pending requests storage with overflow protection
308/// Optimized to use `RequestKey` instead of string allocations
309type PendingRequestsMap = FxHashMap<RequestKey, PendingRequest>;
310
311/// High-performance WebSocket-only trading client for Binance
312///
313/// This client provides ultra-low latency trading by using WebSocket API exclusively.
314/// No REST API dependencies for maximum performance in HFT scenarios.
315///
316/// # Usage
317///
318/// ```rust,no_run
319/// use std::sync::Arc;
320/// use rusty_ems::exchanges::BinanceWebSocketTrader;
321/// use rusty_common::auth::exchanges::binance::BinanceAuth;
322/// use flume;
323///
324/// # async fn example() -> anyhow::Result<()> {
325/// let auth = Arc::new(BinanceAuth::new_hmac("APKEY".into(), "SECRET".into()));
326/// let trader = Arc::new(BinanceWebSocketTrader::new(auth));
327/// let (report_tx, _report_rx) = flume::bounded(100);
328///
329/// // Connect and use trading methods
330/// trader.connect(report_tx.clone()).await?;
331/// // trader.place_order(order, report_tx.clone()).await?;
332/// // trader.cancel_order("order123".into(), report_tx.clone()).await?;
333/// # Ok(())
334/// # }
335/// ```
336/// Task handles for proper lifecycle management
337#[derive(Default)]
338struct TaskHandles {
339    response_handler: Option<JoinHandle<()>>,
340    ping_handler: Option<JoinHandle<()>>,
341    reconnection_monitor: Option<JoinHandle<()>>,
342}
343
344/// Connection metrics for observability
345/// Cache-line aligned to prevent false sharing in multi-threaded scenarios
346#[repr(align(64))] // Align to cache line boundary (64 bytes on x86-64)
347#[derive(Debug, Default)]
348pub struct ConnectionMetrics {
349    /// Number of WebSocket protocol ping frames sent
350    pub websocket_pings_sent: AtomicU64,
351    /// Number of WebSocket protocol pong frames received
352    pub websocket_pongs_received: AtomicU64,
353    /// Number of JSON ping messages sent for application-level keepalive
354    pub json_pings_sent: AtomicU64,
355    /// Number of JSON pong responses received from server
356    pub json_pongs_received: AtomicU64,
357    /// Total number of messages sent to the server
358    pub messages_sent: AtomicU64,
359    /// Total number of messages received from the server
360    pub messages_received: AtomicU64,
361    /// Number of authentication attempts made
362    pub authentication_attempts: AtomicU64,
363    /// Number of reconnection attempts initiated
364    pub reconnection_attempts: AtomicU64,
365    /// Number of successful reconnection attempts
366    pub successful_reconnections: AtomicU64,
367    /// Number of failed reconnection attempts
368    pub failed_reconnections: AtomicU64,
369}
370
371/// Detailed connection health information
372#[derive(Debug, Clone)]
373pub struct ConnectionHealth {
374    /// Current connection state
375    pub state: ConnectionState,
376    /// Whether the WebSocket connection is established
377    pub is_connected: bool,
378    /// Whether the connection is authenticated and ready for trading
379    pub is_authenticated: bool,
380    /// Time elapsed since last ping was sent
381    pub time_since_last_ping: Option<Duration>,
382    /// Time elapsed since last pong was received
383    pub time_since_last_pong: Option<Duration>,
384    /// Time elapsed since authentication was completed
385    pub time_since_auth: Option<Duration>,
386    /// Total WebSocket protocol pings sent
387    pub websocket_pings_sent: u64,
388    /// Total WebSocket protocol pongs received
389    pub websocket_pongs_received: u64,
390    /// Total JSON ping messages sent
391    pub json_pings_sent: u64,
392    /// Total JSON pong responses received
393    pub json_pongs_received: u64,
394    /// Total messages sent to server
395    pub messages_sent: u64,
396    /// Total messages received from server
397    pub messages_received: u64,
398    /// Total reconnection attempts initiated
399    pub reconnection_attempts: u64,
400    /// Total successful reconnections completed
401    pub successful_reconnections: u64,
402    /// Total failed reconnection attempts
403    pub failed_reconnections: u64,
404}
405
406/// High-performance WebSocket-only trading client for Binance
407///
408/// Provides ultra-low latency order execution using WebSocket API exclusively.
409/// Includes connection management, authentication, rate limiting, and comprehensive metrics.
410pub struct BinanceWebSocketTrader {
411    /// Authentication handler
412    auth: Arc<BinanceAuth>,
413
414    /// WebSocket connection for trading API
415    ws_sink: Arc<AsyncRwLock<Option<WebSocketSink>>>,
416    ws_stream: Arc<AsyncRwLock<Option<WebSocketStream>>>,
417
418    /// Connection state machine
419    state: Arc<AtomicU8>,
420
421    /// High-precision clock (using monotonic time)
422    clock: Clock,
423
424    /// Request ID generator with overflow protection
425    request_id_gen: Arc<RequestIdGenerator>,
426
427    /// Pending requests tracking with string keys for UUID support
428    pending_requests: Arc<RwLock<PendingRequestsMap>>,
429
430    /// WebSocket API URL (production or testnet)
431    ws_url: &'static str,
432
433    /// Last ping sent timestamp (nanoseconds, monotonic)
434    last_ping_time: Arc<AtomicU64>,
435
436    /// Last pong received timestamp (nanoseconds, monotonic)
437    last_pong_time: Arc<AtomicU64>,
438
439    /// Authentication completed timestamp (nanoseconds, monotonic)
440    auth_completed_time: Arc<AtomicU64>,
441
442    /// Task handles for lifecycle management
443    task_handles: Arc<AsyncRwLock<TaskHandles>>,
444
445    /// Connection metrics
446    metrics: Arc<ConnectionMetrics>,
447
448    /// Reconnection backoff state
449    reconnection_backoff_ms: Arc<AtomicU64>,
450
451    /// Control flag for reconnection monitor
452    reconnection_control: Arc<ReconnectionMonitorControl>,
453
454    /// Order rate limiter for batch orders
455    order_rate_limiter: Arc<RwLock<OrderRateLimiter>>,
456
457    /// Position manager for tracking futures positions
458    position_manager: Arc<dyn PositionManager>,
459}
460/// Atomic flag to signal reconnection monitor to stop
461/// This prevents race conditions during socket cleanup
462#[derive(Clone)]
463struct ReconnectionMonitorControl {
464    should_stop: Arc<AtomicBool>,
465}
466
467impl Clone for BinanceWebSocketTrader {
468    fn clone(&self) -> Self {
469        Self {
470            auth: self.auth.clone(),
471            ws_sink: self.ws_sink.clone(),
472            ws_stream: self.ws_stream.clone(),
473            state: self.state.clone(),
474            clock: self.clock.clone(),
475            request_id_gen: self.request_id_gen.clone(), // Shared ID generator
476            pending_requests: self.pending_requests.clone(),
477            ws_url: self.ws_url,
478            last_ping_time: self.last_ping_time.clone(),
479            last_pong_time: self.last_pong_time.clone(),
480            auth_completed_time: self.auth_completed_time.clone(),
481            task_handles: self.task_handles.clone(),
482            metrics: self.metrics.clone(),
483            reconnection_backoff_ms: self.reconnection_backoff_ms.clone(),
484            reconnection_control: self.reconnection_control.clone(),
485            order_rate_limiter: self.order_rate_limiter.clone(),
486            position_manager: self.position_manager.clone(),
487        }
488    }
489}
490
491impl BinanceWebSocketTrader {
492    /// Get current connection state
493    fn get_state(&self) -> ConnectionState {
494        ConnectionState::from(self.state.load(Ordering::Acquire))
495    }
496
497    /// Set connection state
498    fn set_state(&self, new_state: ConnectionState) {
499        let old_state = self.get_state();
500        self.state.store(new_state as u8, Ordering::Release);
501
502        if old_state != new_state {
503            debug!("Connection state transition: {old_state:?} -> {new_state:?}");
504        }
505    }
506
507    /// Check if authenticated and ready for operations
508    #[must_use]
509    pub fn is_authenticated(&self) -> bool {
510        self.get_state() == ConnectionState::Authenticated
511    }
512
513    /// Get connection metrics
514    #[must_use]
515    pub const fn metrics(&self) -> &Arc<ConnectionMetrics> {
516        &self.metrics
517    }
518
519    /// Get detailed connection health information
520    /// Returns a struct with current connection status, timing information, and metrics
521    #[must_use]
522    pub fn get_connection_health(&self) -> ConnectionHealth {
523        let now = self.clock.raw();
524        let last_ping = self.last_ping_time.load(Ordering::Acquire);
525        let last_pong = self.last_pong_time.load(Ordering::Acquire);
526        let auth_time = self.auth_completed_time.load(Ordering::Acquire);
527
528        let time_since_last_ping = if last_ping > 0 {
529            Some(Duration::from_nanos(now.saturating_sub(last_ping)))
530        } else {
531            None
532        };
533
534        let time_since_last_pong = if last_pong > 0 {
535            Some(Duration::from_nanos(now.saturating_sub(last_pong)))
536        } else {
537            None
538        };
539
540        let time_since_auth = if auth_time > 0 {
541            Some(Duration::from_nanos(now.saturating_sub(auth_time)))
542        } else {
543            None
544        };
545
546        ConnectionHealth {
547            state: self.get_state(),
548            is_connected: self.is_connected(),
549            is_authenticated: self.is_authenticated(),
550            time_since_last_ping,
551            time_since_last_pong,
552            time_since_auth,
553            websocket_pings_sent: self.metrics.websocket_pings_sent.load(Ordering::Relaxed),
554            websocket_pongs_received: self
555                .metrics
556                .websocket_pongs_received
557                .load(Ordering::Relaxed),
558            json_pings_sent: self.metrics.json_pings_sent.load(Ordering::Relaxed),
559            json_pongs_received: self.metrics.json_pongs_received.load(Ordering::Relaxed),
560            messages_sent: self.metrics.messages_sent.load(Ordering::Relaxed),
561            messages_received: self.metrics.messages_received.load(Ordering::Relaxed),
562            reconnection_attempts: self.metrics.reconnection_attempts.load(Ordering::Relaxed),
563            successful_reconnections: self
564                .metrics
565                .successful_reconnections
566                .load(Ordering::Relaxed),
567            failed_reconnections: self.metrics.failed_reconnections.load(Ordering::Relaxed),
568        }
569    }
570
571    /// Calculate exponential backoff with jitter
572    fn calculate_backoff_ms(&self) -> u64 {
573        let current = self.reconnection_backoff_ms.load(Ordering::Acquire);
574        let max_backoff = 60_000; // 60 seconds max
575
576        // Add jitter (±25%)
577        let jitter = (current / 4) as i64;
578        let mut rng = rand::rng();
579        let random_jitter = rng.random_range(-jitter..=jitter);
580
581        let next_backoff = ((current * 2).min(max_backoff) as i64 + random_jitter).max(1000) as u64;
582        self.reconnection_backoff_ms
583            .store(next_backoff, Ordering::Release);
584
585        current
586    }
587
588    /// Reset backoff on successful connection
589    fn reset_backoff(&self) {
590        self.reconnection_backoff_ms.store(1000, Ordering::Release);
591    }
592
593    /// Create a new WebSocket-only trading client
594    #[must_use]
595    pub fn new(auth: Arc<BinanceAuth>, position_manager: Arc<dyn PositionManager>) -> Self {
596        let clock = Clock::new();
597        Self {
598            auth,
599            ws_sink: Arc::new(AsyncRwLock::new(None)),
600            ws_stream: Arc::new(AsyncRwLock::new(None)),
601            state: Arc::new(AtomicU8::new(ConnectionState::Disconnected as u8)),
602            clock: clock.clone(),
603            request_id_gen: Arc::new(RequestIdGenerator::new()),
604            pending_requests: Arc::new(RwLock::new(FxHashMap::default())),
605            ws_url: BINANCE_WS_API_URL,
606            last_ping_time: Arc::new(AtomicU64::new(0)),
607            last_pong_time: Arc::new(AtomicU64::new(0)),
608            auth_completed_time: Arc::new(AtomicU64::new(0)),
609            task_handles: Arc::new(AsyncRwLock::new(TaskHandles::default())),
610            metrics: Arc::new(ConnectionMetrics::default()),
611            reconnection_backoff_ms: Arc::new(AtomicU64::new(1000)), // Start with 1 second
612            reconnection_control: Arc::new(ReconnectionMonitorControl {
613                should_stop: Arc::new(AtomicBool::new(false)),
614            }),
615            order_rate_limiter: Arc::new(RwLock::new(OrderRateLimiter::new(clock))),
616            position_manager,
617        }
618    }
619
620    /// Create a new WebSocket-only trading client for testnet
621    #[must_use]
622    pub fn new_testnet(auth: Arc<BinanceAuth>, position_manager: Arc<dyn PositionManager>) -> Self {
623        let clock = Clock::new();
624        Self {
625            auth,
626            ws_sink: Arc::new(AsyncRwLock::new(None)),
627            ws_stream: Arc::new(AsyncRwLock::new(None)),
628            state: Arc::new(AtomicU8::new(ConnectionState::Disconnected as u8)),
629            clock: clock.clone(),
630            request_id_gen: Arc::new(RequestIdGenerator::new()),
631            pending_requests: Arc::new(RwLock::new(FxHashMap::default())),
632            ws_url: BINANCE_WS_API_TESTNET_URL,
633            last_ping_time: Arc::new(AtomicU64::new(0)),
634            last_pong_time: Arc::new(AtomicU64::new(0)),
635            auth_completed_time: Arc::new(AtomicU64::new(0)),
636            task_handles: Arc::new(AsyncRwLock::new(TaskHandles::default())),
637            metrics: Arc::new(ConnectionMetrics::default()),
638            reconnection_backoff_ms: Arc::new(AtomicU64::new(1000)), // Start with 1 second
639            reconnection_control: Arc::new(ReconnectionMonitorControl {
640                should_stop: Arc::new(AtomicBool::new(false)),
641            }),
642            order_rate_limiter: Arc::new(RwLock::new(OrderRateLimiter::new(clock))),
643            position_manager,
644        }
645    }
646
647    /// Connect to Binance WebSocket API
648    pub async fn connect(&self, report_tx: Sender<ExecutionReport>) -> Result<()> {
649        // Check if already connected or connecting
650        match self.get_state() {
651            ConnectionState::Authenticated => {
652                debug!("Already authenticated, skipping connection");
653                return Ok(());
654            }
655            ConnectionState::Connecting
656            | ConnectionState::Connected
657            | ConnectionState::Authenticating => {
658                warn!("Connection already in progress");
659                return Err(anyhow!("Connection already in progress"));
660            }
661            _ => {}
662        }
663
664        // Transition to connecting state
665        self.set_state(ConnectionState::Connecting);
666
667        let ws_url = self.ws_url;
668        debug!("Connecting to Binance WebSocket API: {ws_url}");
669
670        // Create WebSocket configuration
671        let ws_config =
672            WebSocketConfig::new(rusty_common::types::Exchange::Binance, ws_url.to_string());
673
674        let mut connector = WebSocketConnector::new(
675            ws_config,
676            Arc::new(parking_lot::RwLock::new(
677                rusty_common::websocket::ConnectionStats::default(),
678            )),
679            Arc::new(parking_lot::RwLock::new(
680                rusty_common::websocket::ConnectionState::Disconnected,
681            )),
682        );
683
684        let (ws_sink, ws_stream) = match connector.connect_with_retry(ws_url).await {
685            Ok(result) => result,
686            Err(e) => {
687                self.set_state(ConnectionState::Disconnected);
688                return Err(anyhow!("WebSocket connection failed: {}", e));
689            }
690        };
691
692        *self.ws_sink.write().await = Some(ws_sink);
693        *self.ws_stream.write().await = Some(ws_stream);
694
695        // Transition to connected state
696        self.set_state(ConnectionState::Connected);
697
698        // Start response handler and ping handler with proper task management
699        let response_handle = self.start_response_handler(report_tx.clone()).await?;
700        let ping_handle = self.start_ping_handler().await?;
701        let reconnect_handle = self.start_reconnection_monitor(report_tx.clone()).await?;
702
703        // Store task handles
704        {
705            let mut handles = self.task_handles.write().await;
706            handles.response_handler = Some(response_handle);
707            handles.ping_handler = Some(ping_handle);
708            handles.reconnection_monitor = Some(reconnect_handle);
709        }
710
711        // Authenticate the session
712        self.set_state(ConnectionState::Authenticating);
713        match self.authenticate_session().await {
714            Ok(()) => {
715                self.set_state(ConnectionState::Authenticated);
716                self.auth_completed_time
717                    .store(self.clock.raw(), Ordering::Release);
718                self.reset_backoff();
719                debug!("Successfully authenticated to Binance WebSocket API");
720                Ok(())
721            }
722            Err(e) => {
723                error!("Authentication failed: {e}");
724                self.set_state(ConnectionState::Connected); // Revert to connected state
725                Err(e)
726            }
727        }
728    }
729
730    /// Authenticate the WebSocket session using logon request
731    async fn authenticate_session(&self) -> Result<()> {
732        debug!("🔐 Generating WebSocket authentication message...");
733
734        // Generate WebSocket authentication message
735        let auth_message = self.auth.generate_ws_auth().map_err(|e| {
736            error!("🔐 Failed to generate WebSocket auth: {e}");
737            anyhow!("Failed to generate WebSocket auth: {}", e)
738        })?;
739
740        debug!("🔐 Authentication message generated, sending to server...");
741
742        // Send authentication message with timeout
743        let send_result = tokio::time::timeout(
744            Duration::from_secs(10), // 10 second timeout for auth send
745            async {
746                if let Some(ref mut ws_sink) = self.ws_sink.write().await.as_mut() {
747                    ws_sink
748                        .send(Message::Text(auth_message).to_frame_view())
749                        .await
750                        .map_err(|e| anyhow!("Failed to send auth message: {}", e))
751                } else {
752                    Err(anyhow!("WebSocket sink not available"))
753                }
754            },
755        )
756        .await;
757
758        match send_result {
759            Ok(Ok(())) => {
760                debug!("🔐 WebSocket authentication sent successfully");
761                Ok(())
762            }
763            Ok(Err(e)) => {
764                error!("🔐 Failed to send authentication message: {e}");
765                Err(e)
766            }
767            Err(_) => {
768                error!("🔐 Authentication send timed out after 10 seconds");
769                Err(anyhow!("Authentication send timed out"))
770            }
771        }
772    }
773
774    /// Start the response handler for incoming WebSocket messages
775    async fn start_response_handler(
776        &self,
777        report_tx: Sender<ExecutionReport>,
778    ) -> Result<JoinHandle<()>> {
779        let ws_stream = self.ws_stream.clone();
780        let pending_requests = self.pending_requests.clone();
781        let clock = self.clock.clone();
782        let last_pong_time = self.last_pong_time.clone();
783        let state = self.state.clone();
784        let metrics = self.metrics.clone();
785        let position_manager = self.position_manager.clone();
786        let self_clone = Self {
787            auth: self.auth.clone(),
788            ws_sink: self.ws_sink.clone(),
789            ws_stream: self.ws_stream.clone(),
790            state: self.state.clone(),
791            clock: self.clock.clone(),
792            request_id_gen: self.request_id_gen.clone(),
793            pending_requests: self.pending_requests.clone(),
794            ws_url: self.ws_url,
795            last_ping_time: self.last_ping_time.clone(),
796            last_pong_time: self.last_pong_time.clone(),
797            auth_completed_time: self.auth_completed_time.clone(),
798            task_handles: self.task_handles.clone(),
799            metrics: self.metrics.clone(),
800            reconnection_backoff_ms: self.reconnection_backoff_ms.clone(),
801            reconnection_control: self.reconnection_control.clone(),
802            order_rate_limiter: self.order_rate_limiter.clone(),
803            position_manager: self.position_manager.clone(),
804        };
805
806        let handle = tokio::spawn(async move {
807            let mut ws_guard = ws_stream.write().await;
808            if let Some(mut ws_stream) = ws_guard.take() {
809                loop {
810                    // Check if we should still be running
811                    let current_state = ConnectionState::from(state.load(Ordering::Acquire));
812                    if current_state == ConnectionState::Disconnected
813                        || current_state == ConnectionState::Disconnecting
814                    {
815                        debug!("Response handler stopping - state: {current_state:?}");
816                        break;
817                    }
818
819                    if let Some(frame) = ws_stream.next().await {
820                        metrics.messages_received.fetch_add(1, Ordering::Relaxed);
821
822                        // Validate frame size
823                        if frame.payload.len() > MAX_MESSAGE_SIZE {
824                            error!(
825                                "Frame too large: {} bytes (max: {})",
826                                frame.payload.len(),
827                                MAX_MESSAGE_SIZE
828                            );
829                            self_clone.kill_socket("Frame size exceeded maximum").await;
830                            break;
831                        }
832
833                        // Process frames manually according to YAWC guidelines
834                        match frame.opcode {
835                            OpCode::Text => {
836                                // Handle text data frames (JSON messages)
837                                let text = match std::str::from_utf8(&frame.payload) {
838                                    Ok(text) => text,
839                                    Err(e) => {
840                                        error!("Invalid UTF-8 in text frame: {e}");
841                                        self_clone.kill_socket("Invalid UTF-8 in text frame").await;
842                                        break;
843                                    }
844                                };
845
846                                let mut text_copy = text.to_string();
847                                match unsafe { simd_json::from_str::<JsonValue>(&mut text_copy) } {
848                                    Ok(json) => {
849                                        debug!("response is {:?}", json.get("id"));
850                                        Self::handle_websocket_message(
851                                            &json,
852                                            &pending_requests,
853                                            &report_tx,
854                                            &clock,
855                                            &position_manager,
856                                        )
857                                        .await;
858                                    }
859                                    Err(e) => {
860                                        error!("Failed to parse WebSocket message: {e}");
861                                        self_clone.kill_socket("JSON parsing error").await;
862                                        break;
863                                    }
864                                }
865                            }
866                            OpCode::Ping => {
867                                // YAWC handles automatic pong responses
868                                debug!(
869                                    "Received ping - automatic pong response will be sent by YAWC"
870                                );
871                            }
872                            OpCode::Pong => {
873                                // Handle pong responses to our pings
874                                let now = clock.raw();
875                                last_pong_time.store(now, Ordering::SeqCst);
876                                debug!("Received pong from server at {now}");
877                            }
878                            OpCode::Close => {
879                                // Handle close frames with proper code extraction
880                                let close_info = if frame.payload.len() >= 2 {
881                                    let code =
882                                        u16::from_be_bytes([frame.payload[0], frame.payload[1]]);
883                                    let reason = if frame.payload.len() > 2 {
884                                        std::str::from_utf8(&frame.payload[2..])
885                                            .unwrap_or("Invalid UTF-8 in close reason")
886                                    } else {
887                                        "No reason provided"
888                                    };
889                                    format!("code: {code}, reason: {reason}")
890                                } else {
891                                    "No close code provided".to_string()
892                                };
893
894                                warn!("WebSocket connection closing: {close_info}");
895                                self_clone
896                                    .kill_socket(&format!(
897                                        "Connection closed by server: {close_info}"
898                                    ))
899                                    .await;
900                                break;
901                            }
902                            OpCode::Binary => {
903                                // Handle binary frames if needed (currently not expected for Binance)
904                                debug!("Received binary frame with {} bytes", frame.payload.len());
905                            }
906                            _ => {
907                                // Handle unknown opcodes
908                                debug!("Received frame with unknown opcode: {:?}", frame.opcode);
909                            }
910                        }
911                    } else {
912                        // Stream ended - connection lost
913                        error!("WebSocket stream ended unexpectedly");
914                        self_clone.kill_socket("Stream ended unexpectedly").await;
915                        break;
916                    }
917                }
918            } else {
919                error!("No WebSocket stream available in response handler");
920                self_clone
921                    .kill_socket("No WebSocket stream available")
922                    .await;
923            }
924
925            debug!("Response handler task exiting");
926        });
927
928        Ok(handle)
929    }
930
931    /// Start the ping handler for keeping WebSocket connection alive
932    async fn start_ping_handler(&self) -> Result<JoinHandle<()>> {
933        let ws_sink = self.ws_sink.clone();
934        let clock = self.clock.clone();
935        let last_ping_time = self.last_ping_time.clone();
936        let last_pong_time = self.last_pong_time.clone();
937        let state = self.state.clone();
938        let auth_completed_time = self.auth_completed_time.clone();
939        let metrics = self.metrics.clone();
940        let self_clone = Self {
941            auth: self.auth.clone(),
942            ws_sink: self.ws_sink.clone(),
943            ws_stream: self.ws_stream.clone(),
944            state: self.state.clone(),
945            clock: self.clock.clone(),
946            request_id_gen: self.request_id_gen.clone(),
947            pending_requests: self.pending_requests.clone(),
948            ws_url: self.ws_url,
949            last_ping_time: self.last_ping_time.clone(),
950            last_pong_time: self.last_pong_time.clone(),
951            auth_completed_time: self.auth_completed_time.clone(),
952            task_handles: self.task_handles.clone(),
953            metrics: self.metrics.clone(),
954            reconnection_backoff_ms: self.reconnection_backoff_ms.clone(),
955            reconnection_control: self.reconnection_control.clone(),
956            order_rate_limiter: self.order_rate_limiter.clone(),
957            position_manager: self.position_manager.clone(),
958        };
959
960        let handle = tokio::spawn(async move {
961            let mut ping_interval = interval(Duration::from_secs(PING_INTERVAL_SECONDS));
962            ping_interval.tick().await; // Skip first immediate tick
963
964            loop {
965                ping_interval.tick().await;
966
967                // Check if we should still be running
968                let current_state = ConnectionState::from(state.load(Ordering::Acquire));
969                if current_state == ConnectionState::Disconnected
970                    || current_state == ConnectionState::Disconnecting
971                {
972                    debug!("Ping handler stopping - state: {current_state:?}");
973                    break;
974                }
975
976                let now = clock.raw();
977                let last_pong = last_pong_time.load(Ordering::Acquire);
978                let auth_time = auth_completed_time.load(Ordering::Acquire);
979
980                // Only check for pong timeout if we're authenticated
981                if current_state == ConnectionState::Authenticated && auth_time > 0 {
982                    // Check if we've been waiting too long for any pong after authentication
983                    let waiting_for_first_pong = last_pong == 0
984                        && (now - auth_time) > (PONG_TIMEOUT_SECONDS * 1_000_000_000);
985
986                    let pong_timeout =
987                        last_pong > 0 && (now - last_pong) > (PONG_TIMEOUT_SECONDS * 1_000_000_000);
988
989                    if waiting_for_first_pong || pong_timeout {
990                        warn!("Pong timeout detected, connection is dead");
991                        self_clone.kill_socket("Pong timeout").await;
992                        break;
993                    }
994                }
995
996                // Send ping with exception handling
997                if let Some(ref mut sink) = ws_sink.write().await.as_mut() {
998                    let ping_data = b"ping".to_vec();
999                    match sink.send(Message::Ping(ping_data).to_frame_view()).await {
1000                        Ok(()) => {
1001                            last_ping_time.store(now, Ordering::SeqCst);
1002                            metrics.websocket_pings_sent.fetch_add(1, Ordering::Relaxed);
1003                            debug!("Sent ping at {now}");
1004                        }
1005                        Err(e) => {
1006                            error!("Failed to send ping: {e}");
1007                            self_clone
1008                                .kill_socket(&format!("Failed to send ping: {e}"))
1009                                .await;
1010                            break;
1011                        }
1012                    }
1013                } else {
1014                    warn!("WebSocket sink not available for ping");
1015                    self_clone.kill_socket("WebSocket sink not available").await;
1016                    break;
1017                }
1018            }
1019
1020            debug!("Ping handler task exiting");
1021        });
1022
1023        Ok(handle)
1024    }
1025
1026    /// Start the auto-reconnection monitor for automatic recovery
1027    ///
1028    /// This monitor continuously watches the connection state and performs automatic
1029    /// reconnection when the connection is lost. The reconnection behavior includes:
1030    ///
1031    /// - Exponential backoff starting at 1 second, doubling up to 60 seconds max
1032    /// - Jitter of ±25% added to prevent thundering herd
1033    /// - Automatic authentication after successful reconnection
1034    /// - Preservation of pending requests across reconnections
1035    /// - Health metrics tracking for monitoring and alerting
1036    ///
1037    /// The monitor will stop when:
1038    /// - The connection state becomes Disconnected or Disconnecting
1039    /// - The connection has been stable for the configured check interval
1040    /// - A manual disconnect is initiated
1041    async fn start_reconnection_monitor(
1042        &self,
1043        report_tx: Sender<ExecutionReport>,
1044    ) -> Result<JoinHandle<()>> {
1045        let state = self.state.clone();
1046        let metrics = self.metrics.clone();
1047        let control = self.reconnection_control.clone();
1048
1049        // Reset the stop flag when starting a new monitor
1050        control.should_stop.store(false, Ordering::Release);
1051
1052        let self_clone = Self {
1053            auth: self.auth.clone(),
1054            ws_sink: self.ws_sink.clone(),
1055            ws_stream: self.ws_stream.clone(),
1056            state: self.state.clone(),
1057            clock: self.clock.clone(),
1058            request_id_gen: self.request_id_gen.clone(),
1059            pending_requests: self.pending_requests.clone(),
1060            ws_url: self.ws_url,
1061            last_ping_time: self.last_ping_time.clone(),
1062            last_pong_time: self.last_pong_time.clone(),
1063            auth_completed_time: self.auth_completed_time.clone(),
1064            task_handles: self.task_handles.clone(),
1065            metrics: self.metrics.clone(),
1066            reconnection_backoff_ms: self.reconnection_backoff_ms.clone(),
1067            reconnection_control: self.reconnection_control.clone(),
1068            order_rate_limiter: self.order_rate_limiter.clone(),
1069            position_manager: self.position_manager.clone(),
1070        };
1071
1072        let handle = tokio::spawn(async move {
1073            let mut check_interval = interval(Duration::from_secs(3)); // Check every 3 seconds
1074            check_interval.tick().await; // Skip first immediate tick
1075
1076            let mut consecutive_disconnections = 0;
1077            const MAX_RECONNECTION_ATTEMPTS: u32 = 10; // Allow more attempts with backoff
1078
1079            info!(
1080                "🤖 Auto-reconnection monitor started (check every 3s, max {MAX_RECONNECTION_ATTEMPTS} attempts)"
1081            );
1082
1083            loop {
1084                check_interval.tick().await;
1085
1086                // Check if we should stop the monitor
1087                if control.should_stop.load(Ordering::Acquire) {
1088                    info!("🛑 Reconnection monitor received stop signal - exiting gracefully");
1089                    break;
1090                }
1091
1092                // Check if connection is healthy based on state
1093                let current_state = ConnectionState::from(state.load(Ordering::Acquire));
1094                let is_healthy = current_state == ConnectionState::Authenticated;
1095
1096                if !is_healthy
1097                    && current_state != ConnectionState::Connecting
1098                    && current_state != ConnectionState::Authenticating
1099                {
1100                    consecutive_disconnections += 1;
1101                    metrics
1102                        .reconnection_attempts
1103                        .fetch_add(1, Ordering::Relaxed);
1104
1105                    if consecutive_disconnections > MAX_RECONNECTION_ATTEMPTS {
1106                        error!(
1107                            "🔴 Maximum reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded. Stopping auto-reconnection."
1108                        );
1109                        break;
1110                    }
1111
1112                    warn!(
1113                        "🔄 Connection unhealthy detected (state: {current_state:?})! Attempting auto-reconnection #{consecutive_disconnections}/{MAX_RECONNECTION_ATTEMPTS}"
1114                    );
1115
1116                    // Wait with exponential backoff
1117                    let backoff_ms = self_clone.calculate_backoff_ms();
1118                    info!("⏳ Waiting {backoff_ms}ms before reconnection attempt");
1119                    tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
1120
1121                    // Attempt reconnection with timeout
1122                    info!("🔄 Starting internal reconnection attempt...");
1123                    let reconnect_start = std::time::Instant::now();
1124
1125                    match tokio::time::timeout(
1126                        Duration::from_secs(45), // Extended timeout for full reconnection
1127                        self_clone.reconnect_internal(report_tx.clone()),
1128                    )
1129                    .await
1130                    {
1131                        Ok(Ok(())) => {
1132                            info!(
1133                                "✅ Auto-reconnection successful in {:.2}s!",
1134                                reconnect_start.elapsed().as_secs_f32()
1135                            );
1136                            consecutive_disconnections = 0; // Reset counter on success
1137                            metrics
1138                                .successful_reconnections
1139                                .fetch_add(1, Ordering::Relaxed);
1140                            self_clone.reset_backoff(); // Reset exponential backoff
1141                        }
1142                        Ok(Err(e)) => {
1143                            error!(
1144                                "❌ Auto-reconnection failed after {:.2}s: {}",
1145                                reconnect_start.elapsed().as_secs_f32(),
1146                                e
1147                            );
1148                            // Will try again in next iteration
1149                        }
1150                        Err(_) => {
1151                            error!(
1152                                "❌ Auto-reconnection timed out after {:.2}s (45s limit) - will retry",
1153                                reconnect_start.elapsed().as_secs_f32()
1154                            );
1155                            // Force kill any hanging connections
1156                            self_clone
1157                                .kill_socket("Reconnection timeout - forcing cleanup")
1158                                .await;
1159                        }
1160                    }
1161                } else {
1162                    // Connection is healthy, reset counter
1163                    if consecutive_disconnections > 0 {
1164                        consecutive_disconnections = 0;
1165                        debug!("Connection restored and stable");
1166                    }
1167                }
1168            }
1169
1170            warn!("Auto-reconnection monitor exiting");
1171        });
1172
1173        Ok(handle)
1174    }
1175
1176    /// Internal reconnection method that doesn't cause infinite loops
1177    async fn reconnect_internal(&self, report_tx: Sender<ExecutionReport>) -> Result<()> {
1178        info!("🔄 RECONNECT-STEP 1/3: Internal reconnection starting...");
1179
1180        // Kill existing connection first (but don't start new monitor)
1181        info!("🔄 RECONNECT-STEP 2/3: Killing existing connection...");
1182
1183        // Add timeout to kill_socket in case it hangs
1184        match tokio::time::timeout(
1185            Duration::from_secs(5), // 5 second timeout for killing socket
1186            self.kill_socket("Internal reconnection requested"),
1187        )
1188        .await
1189        {
1190            Ok(()) => {
1191                info!("🔄 RECONNECT-STEP 2/3: Socket killed successfully");
1192            }
1193            Err(_) => {
1194                error!("🔄 RECONNECT-STEP 2/3: Socket kill timed out after 5s - continuing anyway");
1195                // Force reset connection state
1196            }
1197        }
1198
1199        // Small delay to allow cleanup
1200        info!("🔄 RECONNECT-STEP 2/3: Waiting 100ms for cleanup...");
1201        tokio::time::sleep(Duration::from_millis(100)).await;
1202        info!("🔄 RECONNECT-STEP 2/3: Cleanup delay completed");
1203
1204        // Attempt new connection (this will NOT start a new monitor since we're already in one)
1205        info!("🔄 RECONNECT-STEP 3/3: Attempting new connection...");
1206        match self.connect_without_monitor(report_tx).await {
1207            Ok(()) => {
1208                info!("🎉 Internal reconnection completed successfully");
1209                Ok(())
1210            }
1211            Err(e) => {
1212                error!("💥 Internal reconnection failed: {e}");
1213                Err(e)
1214            }
1215        }
1216    }
1217
1218    /// Connect without starting a new reconnection monitor (used internally)
1219    async fn connect_without_monitor(&self, report_tx: Sender<ExecutionReport>) -> Result<()> {
1220        let ws_url = self.ws_url;
1221
1222        info!("🔌 STEP 1/6: Connecting to Binance WebSocket API (internal): {ws_url}");
1223
1224        // Create WebSocket configuration
1225        let ws_config =
1226            WebSocketConfig::new(rusty_common::types::Exchange::Binance, ws_url.to_string());
1227
1228        let mut connector = WebSocketConnector::new(
1229            ws_config,
1230            Arc::new(parking_lot::RwLock::new(
1231                rusty_common::websocket::ConnectionStats::default(),
1232            )),
1233            Arc::new(parking_lot::RwLock::new(
1234                rusty_common::websocket::ConnectionState::Disconnected,
1235            )),
1236        );
1237
1238        info!("📡 STEP 2/6: Attempting WebSocket connection with timeout...");
1239        let connection_start = std::time::Instant::now();
1240
1241        let (ws_sink, ws_stream) = match tokio::time::timeout(
1242            Duration::from_secs(15), // 15 second timeout for connection
1243            connector.connect_with_retry(ws_url),
1244        )
1245        .await
1246        {
1247            Ok(Ok((sink, stream))) => {
1248                info!(
1249                    "🔗 STEP 2/6: WebSocket connection established in {:.2}s",
1250                    connection_start.elapsed().as_secs_f32()
1251                );
1252                (sink, stream)
1253            }
1254            Ok(Err(e)) => {
1255                error!(
1256                    "📡 STEP 2/6: WebSocket connection failed after {:.2}s: {}",
1257                    connection_start.elapsed().as_secs_f32(),
1258                    e
1259                );
1260                return Err(anyhow!("WebSocket connection failed: {}", e));
1261            }
1262            Err(_) => {
1263                error!("📡 STEP 2/6: WebSocket connection timed out after 15 seconds");
1264                return Err(anyhow!("WebSocket connection timed out"));
1265            }
1266        };
1267
1268        info!("🔗 STEP 3/6: Setting up WebSocket streams...");
1269        *self.ws_sink.write().await = Some(ws_sink);
1270        *self.ws_stream.write().await = Some(ws_stream);
1271
1272        info!("🚀 STEP 4/6: Starting response handler...");
1273        // Start response handler and ping handler (but NOT reconnection monitor)
1274        self.start_response_handler(report_tx).await.map_err(|e| {
1275            error!("🚀 STEP 4/6: Failed to start response handler: {e}");
1276            anyhow!("Failed to start response handler: {}", e)
1277        })?;
1278
1279        debug!("🏓 STEP 5/6: Starting ping handler...");
1280        self.start_ping_handler().await.map_err(|e| {
1281            error!("🏓 STEP 5/6: Failed to start ping handler: {e}");
1282            anyhow!("Failed to start ping handler: {}", e)
1283        })?;
1284
1285        info!("🔐 STEP 6/6: Authenticating session...");
1286        // Authenticate the session
1287        self.authenticate_session().await.map_err(|e| {
1288            error!("🔐 STEP 6/6: Authentication failed: {e}");
1289            anyhow!("Authentication failed: {}", e)
1290        })?;
1291
1292        info!("✅ Successfully reconnected to Binance WebSocket API");
1293        Ok(())
1294    }
1295
1296    /// Handle incoming WebSocket messages
1297    async fn handle_websocket_message(
1298        json: &JsonValue,
1299        pending_requests: &Arc<RwLock<PendingRequestsMap>>,
1300        report_tx: &Sender<ExecutionReport>,
1301        clock: &Clock,
1302        position_manager: &Arc<dyn PositionManager>,
1303    ) {
1304        // Check if this is a response to our request
1305        let pending_request = if let Some(request_id) = json.get("id") {
1306            let mut pending = pending_requests.write();
1307
1308            // Try to extract ID as either number or string - optimized to avoid string allocation
1309            if let Some(id_num) = request_id.as_u64() {
1310                let key = RequestKey::Sequential(id_num);
1311                pending.remove(&key)
1312            } else if let Some(id_str) = request_id.as_str() {
1313                let key = RequestKey::Uuid(SmartString::from(id_str));
1314                pending.remove(&key)
1315            } else {
1316                None
1317            }
1318        } else {
1319            None
1320        };
1321
1322        if let Some(pending_req) = pending_request {
1323            Self::handle_response(json, &pending_req, report_tx, clock).await;
1324        }
1325        // Check if this is an execution report from user data stream
1326        else if let Some(event_type) = json.get("e").and_then(|v| v.as_str()) {
1327            if event_type == "executionReport" {
1328                Self::handle_execution_report(json, report_tx, clock).await;
1329            } else if event_type == "ACCOUNT_UPDATE" {
1330                Self::handle_account_update(json, position_manager).await;
1331            }
1332        }
1333    }
1334
1335    /// Handle response to our trading requests
1336    async fn handle_response(
1337        json: &JsonValue,
1338        pending_req: &PendingRequest,
1339        report_tx: &Sender<ExecutionReport>,
1340        clock: &Clock,
1341    ) {
1342        // Check for errors
1343        if let Some(error) = json.get("error") {
1344            let error_code = error
1345                .get("code")
1346                .and_then(simd_json::prelude::ValueAsScalar::as_i64)
1347                .unwrap_or(0);
1348            let error_msg = error
1349                .get("msg")
1350                .and_then(|v| v.as_str())
1351                .unwrap_or("Unknown error");
1352
1353            error!("WebSocket API error {error_code}: {error_msg}");
1354
1355            // Send rejection report if we have a report channel
1356            if let Some(ref tx) = pending_req.report_tx {
1357                let report = ExecutionReport {
1358                    id: id_generation::generate_ws_timestamp_id("error", clock.raw()),
1359                    order_id: "unknown".into(),
1360                    exchange_timestamp: 0,
1361                    system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1362                    instrument_id: InstrumentId {
1363                        symbol: "UNKNOWN".into(),
1364                        venue: Venue::Binance,
1365                    },
1366                    status: OrderStatus::Rejected,
1367                    filled_quantity: Decimal::ZERO,
1368                    remaining_quantity: Decimal::ZERO,
1369                    execution_price: None,
1370                    reject_reason: Some(format!("{error_msg} ({error_code})").into()),
1371                    exchange_execution_id: None,
1372                    is_final: true,
1373                };
1374
1375                if let Err(e) = tx.send_async(report).await {
1376                    error!("Failed to send error report: {e}");
1377                }
1378            }
1379            return;
1380        }
1381
1382        // Handle successful responses based on method
1383        match pending_req.method.as_str() {
1384            "order.place" => {
1385                Self::handle_order_place_response(json, pending_req, report_tx, clock).await;
1386            }
1387            "order.cancel" => {
1388                Self::handle_order_cancel_response(json, pending_req, report_tx, clock).await;
1389            }
1390            "order.cancelReplace" => {
1391                Self::handle_order_modify_response(json, pending_req, report_tx, clock).await;
1392            }
1393            "ping" => {
1394                Self::handle_ping_response(json, pending_req).await;
1395            }
1396            "openOrders.cancelAll" => {
1397                Self::handle_cancel_all_orders_response(json, pending_req, report_tx, clock).await;
1398            }
1399            _ => {
1400                info!("Received response for method: {}", pending_req.method);
1401            }
1402        }
1403    }
1404
1405    /// Handle order placement response
1406    async fn handle_order_place_response(
1407        json: &JsonValue,
1408        pending_req: &PendingRequest,
1409        report_tx: &Sender<ExecutionReport>,
1410        clock: &Clock,
1411    ) {
1412        if let Some(ref tx) = pending_req.report_tx
1413            && let Some(result) = json.get("result")
1414        {
1415            let symbol: SmartString = result
1416                .get("symbol")
1417                .and_then(|v| v.as_str())
1418                .unwrap_or("")
1419                .into();
1420
1421            let order_id: SmartString = result
1422                .get("orderId")
1423                .and_then(simd_json::prelude::ValueAsScalar::as_u64)
1424                .unwrap_or(0)
1425                .to_string()
1426                .into();
1427
1428            let client_order_id: SmartString = result
1429                .get("clientOrderId")
1430                .and_then(|v| v.as_str())
1431                .unwrap_or("")
1432                .into();
1433
1434            let status = result.get("status").and_then(|v| v.as_str()).unwrap_or("");
1435
1436            let original_qty: SmartString = result
1437                .get("origQty")
1438                .and_then(|v| v.as_str())
1439                .unwrap_or("0")
1440                .into();
1441
1442            let executed_qty: SmartString = result
1443                .get("executedQty")
1444                .and_then(|v| v.as_str())
1445                .unwrap_or("0")
1446                .into();
1447
1448            let price: SmartString = result
1449                .get("price")
1450                .and_then(|v| v.as_str())
1451                .unwrap_or("0")
1452                .into();
1453
1454            let transaction_time = result
1455                .get("transactTime")
1456                .and_then(simd_json::prelude::ValueAsScalar::as_u64)
1457                .unwrap_or(0);
1458
1459            let report = ExecutionReport {
1460                id: id_generation::generate_ws_report_id("place", &client_order_id),
1461                order_id: client_order_id,
1462                exchange_timestamp: transaction_time * 1_000_000, // Convert ms to ns
1463                system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1464                instrument_id: InstrumentId {
1465                    symbol,
1466                    venue: Venue::Binance,
1467                },
1468                status: Self::map_order_status(status),
1469                filled_quantity: Decimal::from_str_exact(&executed_qty).unwrap_or(Decimal::ZERO),
1470                remaining_quantity: {
1471                    let original = Decimal::from_str_exact(&original_qty).unwrap_or(Decimal::ZERO);
1472                    let executed = Decimal::from_str_exact(&executed_qty).unwrap_or(Decimal::ZERO);
1473                    original - executed
1474                },
1475                execution_price: Decimal::from_str_exact(&price).ok(),
1476                reject_reason: None,
1477                exchange_execution_id: Some(order_id),
1478                is_final: false,
1479            };
1480
1481            // Handle backpressure by using try_send with timeout
1482            match tokio::time::timeout(
1483                Duration::from_millis(100), // 100ms timeout
1484                tx.send_async(report),
1485            )
1486            .await
1487            {
1488                Ok(Ok(())) => debug!("Order place report sent successfully"),
1489                Ok(Err(e)) => {
1490                    error!("Failed to send order place report: {e}");
1491                    // Channel closed, nothing we can do
1492                }
1493                Err(_) => {
1494                    warn!("Channel backpressure detected - order place report send timed out");
1495                    // Could implement a fallback strategy here if needed
1496                }
1497            }
1498        }
1499    }
1500
1501    /// Handle order cancellation response
1502    async fn handle_order_cancel_response(
1503        json: &JsonValue,
1504        pending_req: &PendingRequest,
1505        report_tx: &Sender<ExecutionReport>,
1506        clock: &Clock,
1507    ) {
1508        // Similar to place response but for cancellation
1509        if let Some(ref tx) = pending_req.report_tx
1510            && let Some(result) = json.get("result")
1511        {
1512            let symbol: SmartString = result
1513                .get("symbol")
1514                .and_then(|v| v.as_str())
1515                .unwrap_or("")
1516                .into();
1517
1518            let client_order_id: SmartString = result
1519                .get("clientOrderId")
1520                .and_then(|v| v.as_str())
1521                .unwrap_or("")
1522                .into();
1523
1524            let report = ExecutionReport {
1525                id: id_generation::generate_ws_report_id("cancel", &client_order_id),
1526                order_id: client_order_id,
1527                exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1528                system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1529                instrument_id: InstrumentId {
1530                    symbol,
1531                    venue: Venue::Binance,
1532                },
1533                status: OrderStatus::Cancelled,
1534                filled_quantity: Decimal::ZERO,
1535                remaining_quantity: Decimal::ZERO,
1536                execution_price: None,
1537                reject_reason: None,
1538                exchange_execution_id: None,
1539                is_final: true,
1540            };
1541
1542            // Handle backpressure by using try_send with timeout
1543            match tokio::time::timeout(
1544                Duration::from_millis(100), // 100ms timeout
1545                tx.send_async(report),
1546            )
1547            .await
1548            {
1549                Ok(Ok(())) => debug!("Cancel report sent successfully"),
1550                Ok(Err(e)) => {
1551                    error!("Failed to send cancel report: {e}");
1552                    // Channel closed, nothing we can do
1553                }
1554                Err(_) => {
1555                    warn!("Channel backpressure detected - cancel report send timed out");
1556                    // Could implement a fallback strategy here if needed
1557                }
1558            }
1559        }
1560    }
1561
1562    /// Handle order modification response
1563    async fn handle_order_modify_response(
1564        json: &JsonValue,
1565        pending_req: &PendingRequest,
1566        report_tx: &Sender<ExecutionReport>,
1567        clock: &Clock,
1568    ) {
1569        // Similar handling for order modification
1570        Self::handle_order_place_response(json, pending_req, report_tx, clock).await;
1571    }
1572
1573    /// Handle cancel all orders response
1574    async fn handle_cancel_all_orders_response(
1575        json: &JsonValue,
1576        pending_req: &PendingRequest,
1577        report_tx: &Sender<ExecutionReport>,
1578        clock: &Clock,
1579    ) {
1580        if let Some(ref tx) = pending_req.report_tx {
1581            if let Some(result) = json.get("result") {
1582                // Cancel all orders response contains an array of cancelled orders
1583                if let Some(orders_array) = result.as_array() {
1584                    for order_data in orders_array {
1585                        let symbol: SmartString = order_data
1586                            .get("symbol")
1587                            .and_then(|v| v.as_str())
1588                            .unwrap_or("")
1589                            .into();
1590
1591                        let client_order_id: SmartString = order_data
1592                            .get("clientOrderId")
1593                            .and_then(|v| v.as_str())
1594                            .unwrap_or("")
1595                            .into();
1596
1597                        let report = ExecutionReport {
1598                            id: id_generation::generate_ws_report_id(
1599                                "cancel_all",
1600                                &client_order_id,
1601                            ),
1602                            order_id: client_order_id,
1603                            exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1604                            system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1605                            instrument_id: InstrumentId {
1606                                symbol,
1607                                venue: Venue::Binance,
1608                            },
1609                            status: OrderStatus::Cancelled,
1610                            filled_quantity: Decimal::ZERO,
1611                            remaining_quantity: Decimal::ZERO,
1612                            execution_price: None,
1613                            reject_reason: None,
1614                            exchange_execution_id: None,
1615                            is_final: true,
1616                        };
1617
1618                        // Send execution report for each cancelled order
1619                        match tokio::time::timeout(
1620                            Duration::from_millis(100), // 100ms timeout
1621                            tx.send_async(report),
1622                        )
1623                        .await
1624                        {
1625                            Ok(Ok(())) => debug!(
1626                                "Cancel all order report sent successfully for order: {}",
1627                                order_data
1628                                    .get("clientOrderId")
1629                                    .and_then(|v| v.as_str())
1630                                    .unwrap_or("unknown")
1631                            ),
1632                            Ok(Err(e)) => {
1633                                error!("Failed to send cancel all order report: {e}");
1634                                // Channel closed, nothing we can do
1635                            }
1636                            Err(_) => {
1637                                warn!(
1638                                    "Channel backpressure detected - cancel all order report send timed out"
1639                                );
1640                                // Could implement a fallback strategy here if needed
1641                            }
1642                        }
1643                    }
1644                } else {
1645                    // Handle case where result is not an array (error or empty result)
1646                    debug!("Cancel all orders result is not an array: {result:?}");
1647                }
1648            } else {
1649                debug!("No result field in cancel all orders response");
1650            }
1651        }
1652    }
1653
1654    /// Handle ping response from JSON API
1655    async fn handle_ping_response(json: &JsonValue, pending_req: &PendingRequest) {
1656        if let Some(result) = json.get("result") {
1657            debug!("✅ JSON ping successful - Server responded: {result:?}");
1658            debug!("   Request timestamp: {}", pending_req.timestamp);
1659            debug!("   Method: {}", pending_req.method);
1660        } else {
1661            debug!("✅ JSON ping successful - Empty result");
1662        }
1663    }
1664
1665    /// Handle execution reports from user data stream
1666    async fn handle_execution_report(
1667        json: &JsonValue,
1668        report_tx: &Sender<ExecutionReport>,
1669        clock: &Clock,
1670    ) {
1671        // Handle execution reports similar to the existing WebSocket client
1672        let symbol: SmartString = json.get("s").and_then(|v| v.as_str()).unwrap_or("").into();
1673
1674        let order_id: SmartString = json
1675            .get("i")
1676            .and_then(simd_json::prelude::ValueAsScalar::as_u64)
1677            .unwrap_or(0)
1678            .to_string()
1679            .into();
1680
1681        let client_order_id: SmartString =
1682            json.get("c").and_then(|v| v.as_str()).unwrap_or("").into();
1683
1684        let status = json.get("X").and_then(|v| v.as_str()).unwrap_or("");
1685
1686        let executed_qty: SmartString =
1687            json.get("z").and_then(|v| v.as_str()).unwrap_or("0").into();
1688
1689        let original_qty: SmartString =
1690            json.get("q").and_then(|v| v.as_str()).unwrap_or("0").into();
1691
1692        let price: SmartString = json.get("p").and_then(|v| v.as_str()).unwrap_or("0").into();
1693
1694        let transaction_time = json
1695            .get("T")
1696            .and_then(simd_json::prelude::ValueAsScalar::as_u64)
1697            .unwrap_or(0);
1698
1699        let report = ExecutionReport {
1700            id: id_generation::generate_ws_report_id("exec", &client_order_id),
1701            order_id: client_order_id,
1702            exchange_timestamp: transaction_time * 1_000_000, // Convert ms to ns
1703            system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1704            instrument_id: InstrumentId {
1705                symbol,
1706                venue: Venue::Binance,
1707            },
1708            status: Self::map_order_status(status),
1709            filled_quantity: Decimal::from_str_exact(&executed_qty).unwrap_or(Decimal::ZERO),
1710            remaining_quantity: {
1711                let original = Decimal::from_str_exact(&original_qty).unwrap_or(Decimal::ZERO);
1712                let executed = Decimal::from_str_exact(&executed_qty).unwrap_or(Decimal::ZERO);
1713                original - executed
1714            },
1715            execution_price: Decimal::from_str_exact(&price).ok(),
1716            reject_reason: None,
1717            exchange_execution_id: Some(order_id),
1718            is_final: matches!(
1719                Self::map_order_status(status),
1720                OrderStatus::Filled
1721                    | OrderStatus::Cancelled
1722                    | OrderStatus::Rejected
1723                    | OrderStatus::Expired
1724            ),
1725        };
1726
1727        // Handle backpressure by using try_send with timeout
1728        match tokio::time::timeout(
1729            Duration::from_millis(100), // 100ms timeout
1730            report_tx.send_async(report),
1731        )
1732        .await
1733        {
1734            Ok(Ok(())) => debug!("Execution report sent successfully"),
1735            Ok(Err(e)) => {
1736                error!("Failed to send execution report: {e}");
1737                // Channel closed, nothing we can do
1738            }
1739            Err(_) => {
1740                warn!("Channel backpressure detected - execution report send timed out");
1741                // Could implement a fallback strategy here if needed
1742            }
1743        }
1744    }
1745
1746    /// Handle account update events from user data stream
1747    pub async fn handle_account_update(
1748        json: &JsonValue,
1749        position_manager: &Arc<dyn PositionManager>,
1750    ) {
1751        // Extract account update information from ACCOUNT_UPDATE event
1752        let event_time = json
1753            .get("E")
1754            .and_then(simd_json::prelude::ValueAsScalar::as_u64)
1755            .unwrap_or(0);
1756        let transaction_time = json
1757            .get("T")
1758            .and_then(simd_json::prelude::ValueAsScalar::as_u64)
1759            .unwrap_or(0);
1760
1761        // Handle position updates (futures)
1762        if let Some(positions) = json
1763            .get("a")
1764            .and_then(|a| a.get("P"))
1765            .and_then(|p| p.as_array())
1766        {
1767            for position_json in positions {
1768                if let Some(position_update) =
1769                    Self::parse_position_update(position_json, event_time, transaction_time)
1770                {
1771                    // Log position update
1772                    info!(
1773                        "Position update: {} {} {} @ {}",
1774                        position_update.symbol,
1775                        position_update.side,
1776                        position_update.amount,
1777                        position_update.entry_price
1778                    );
1779
1780                    // Send position update to position manager
1781                    if let Err(e) = position_manager
1782                        .update_position(position_update.clone())
1783                        .await
1784                    {
1785                        error!("Failed to update position in position manager: {e}");
1786                    }
1787                }
1788            }
1789        }
1790
1791        // Handle balance updates
1792        if let Some(balances) = json
1793            .get("a")
1794            .and_then(|a| a.get("B"))
1795            .and_then(|b| b.as_array())
1796        {
1797            for balance_json in balances {
1798                if let Some(asset) = balance_json.get("a").and_then(|v| v.as_str())
1799                    && let Some(wallet_balance) = balance_json.get("wb").and_then(|v| v.as_str())
1800                    && let Some(cross_wallet_balance) =
1801                        balance_json.get("cw").and_then(|v| v.as_str())
1802                {
1803                    debug!(
1804                        "Balance update: {asset} wallet={wallet_balance} cross_wallet={cross_wallet_balance}"
1805                    );
1806                }
1807            }
1808        }
1809    }
1810
1811    /// Generate stable position ID based on position characteristics
1812    /// Uses a single hash for optimal performance in HFT scenarios
1813    fn generate_stable_position_id(
1814        symbol: &str,
1815        side: PositionSide,
1816        venue: Venue,
1817    ) -> rusty_model::types::PositionId {
1818        // Create a deterministic UUID based on position characteristics
1819        // This ensures the same position always gets the same ID
1820
1821        use std::collections::hash_map::DefaultHasher;
1822        use std::hash::{Hash, Hasher};
1823
1824        // Create a hasher and hash all components in order
1825        let mut hasher = DefaultHasher::new();
1826
1827        // Hash a version prefix for future compatibility
1828        "v1-position".hash(&mut hasher);
1829        symbol.hash(&mut hasher);
1830        (side as u8).hash(&mut hasher);
1831        (venue as u8).hash(&mut hasher);
1832
1833        let hash = hasher.finish();
1834
1835        // Create deterministic UUID bytes from the hash
1836        // Use a simple mixing function to expand 64-bit hash to 128-bit UUID
1837        let mut uuid_bytes = [0u8; 16];
1838
1839        // Lower 64 bits: original hash
1840        uuid_bytes[0..8].copy_from_slice(&hash.to_le_bytes());
1841
1842        // Upper 64 bits: rotated and XORed version for better distribution
1843        let mixed = hash.rotate_left(32) ^ hash;
1844        uuid_bytes[8..16].copy_from_slice(&mixed.to_le_bytes());
1845
1846        // Set version (4) and variant bits for valid UUID format
1847        uuid_bytes[6] = (uuid_bytes[6] & 0x0f) | 0x40; // Version 4
1848        uuid_bytes[8] = (uuid_bytes[8] & 0x3f) | 0x80; // Variant 10
1849
1850        let stable_uuid = Uuid::from_bytes(uuid_bytes);
1851        rusty_model::types::PositionId::from_uuid(stable_uuid)
1852    }
1853
1854    /// Parse position update from `ACCOUNT_UPDATE` event
1855    #[must_use]
1856    pub fn parse_position_update(
1857        position_json: &JsonValue,
1858        event_time: u64,
1859        _transaction_time: u64,
1860    ) -> Option<PositionUpdate> {
1861        // Extract position fields from JSON
1862        let symbol = position_json.get("s").and_then(|v| v.as_str())?;
1863        let position_amount_str = position_json.get("pa").and_then(|v| v.as_str())?;
1864        let entry_price_str = position_json.get("ep").and_then(|v| v.as_str())?;
1865        let breakeven_price_str = position_json.get("bep").and_then(|v| v.as_str())?;
1866        let unrealized_pnl_str = position_json.get("up").and_then(|v| v.as_str())?;
1867        let realized_pnl_str = position_json.get("cr").and_then(|v| v.as_str())?;
1868        let margin_type_str = position_json.get("mt").and_then(|v| v.as_str())?;
1869        let isolated_wallet_str = position_json.get("iw").and_then(|v| v.as_str())?;
1870        let position_side_str = position_json.get("ps").and_then(|v| v.as_str())?;
1871
1872        // Parse numeric values
1873        let amount = Decimal::from_str_exact(position_amount_str).ok()?;
1874        let entry_price = Decimal::from_str_exact(entry_price_str).ok()?;
1875        let breakeven_price = Decimal::from_str_exact(breakeven_price_str).ok()?;
1876        let unrealized_pnl = Decimal::from_str_exact(unrealized_pnl_str).ok()?;
1877        let realized_pnl = Decimal::from_str_exact(realized_pnl_str).ok()?;
1878        let isolated_wallet = Decimal::from_str_exact(isolated_wallet_str).ok()?;
1879
1880        // Parse enums
1881        let position_side = position_side_str.parse::<PositionSide>().ok()?;
1882        let margin_type = margin_type_str.parse::<MarginType>().ok()?;
1883
1884        // Skip positions with zero amount (closed positions)
1885        if amount.is_zero() {
1886            debug!("Skipping closed position for {symbol}");
1887            return None;
1888        }
1889
1890        // Generate stable position ID based on position characteristics
1891        let position_id = Self::generate_stable_position_id(symbol, position_side, Venue::Binance);
1892
1893        Some(PositionUpdate {
1894            position_id,
1895            venue: Venue::Binance,
1896            symbol: SmartString::from(symbol),
1897            side: position_side,
1898            amount,
1899            entry_price,
1900            breakeven_price,
1901            unrealized_pnl,
1902            realized_pnl,
1903            margin_type,
1904            isolated_wallet,
1905            timestamp_ns: event_time * 1_000_000, // Convert ms to ns
1906        })
1907    }
1908
1909    /// Map Binance order status to internal status
1910    fn map_order_status(status: &str) -> OrderStatus {
1911        match status {
1912            "NEW" => OrderStatus::New,
1913            "PARTIALLY_FILLED" => OrderStatus::PartiallyFilled,
1914            "FILLED" => OrderStatus::Filled,
1915            "CANCELED" | "CANCELLED" => OrderStatus::Cancelled,
1916            "REJECTED" => OrderStatus::Rejected,
1917            "EXPIRED" => OrderStatus::Expired,
1918            "PENDING_CANCEL" => OrderStatus::Pending,
1919            "PENDING_NEW" => OrderStatus::Pending,
1920            _ => OrderStatus::Unknown,
1921        }
1922    }
1923
1924    /// Place batch orders via WebSocket API (up to 50 orders)
1925    ///
1926    /// This method ensures rate limiting compliance (300 orders/10 seconds)
1927    /// and uses concurrent execution for optimal performance.
1928    ///
1929    /// Returns Ok(()) if all orders succeed, or an error with details about failed orders
1930    pub async fn place_batch_orders(
1931        &self,
1932        orders: Vec<Order>,
1933        report_tx: Sender<ExecutionReport>,
1934    ) -> Result<()> {
1935        if orders.is_empty() {
1936            return Ok(());
1937        }
1938
1939        if orders.len() > MAX_BATCH_SIZE {
1940            bail!("Batch size exceeds maximum of {} orders", MAX_BATCH_SIZE);
1941        }
1942
1943        // Check rate limit
1944        self.check_and_record_rate_limit(orders.len())?;
1945
1946        // Place orders concurrently with proper error handling
1947        let batch_result = self
1948            .place_batch_orders_concurrent(orders, report_tx)
1949            .await?;
1950
1951        // Check if any orders failed
1952        if batch_result.has_failures() {
1953            // Log details about failures
1954            let failures_by_type = batch_result.get_failures_by_error_type();
1955            for (error_type, failed_orders) in failures_by_type {
1956                warn!(
1957                    "{} orders failed due to {}: {:?}",
1958                    failed_orders.len(),
1959                    error_type,
1960                    failed_orders.iter().map(|o| &o.id).collect::<Vec<_>>()
1961                );
1962            }
1963
1964            // Return error if all orders failed
1965            if batch_result.status == BatchStatus::AllFailed {
1966                bail!(
1967                    "All {} orders in batch failed (success rate: {:.1}%)",
1968                    batch_result.summary.total_orders,
1969                    batch_result.success_rate()
1970                );
1971            }
1972        }
1973
1974        info!(
1975            "Batch order placement completed: {} succeeded, {} failed (success rate: {:.1}%)",
1976            batch_result.summary.successful_orders,
1977            batch_result.summary.failed_orders,
1978            batch_result.success_rate()
1979        );
1980
1981        Ok(())
1982    }
1983
1984    /// Internal concurrent batch order placement with detailed error reporting
1985    async fn place_batch_orders_concurrent(
1986        &self,
1987        orders: Vec<Order>,
1988        report_tx: Sender<ExecutionReport>,
1989    ) -> Result<BatchResult<()>> {
1990        use crate::error::EMSError;
1991        use futures::future::join_all;
1992
1993        let start_time = self.clock.raw();
1994        let total_orders = orders.len();
1995
1996        // Create concurrent tasks for each order
1997        let order_tasks: Vec<_> = orders
1998            .into_iter()
1999            .map(|order| {
2000                let report_tx = report_tx.clone();
2001                let client_order_id = order.id;
2002                let order_clone = order.clone();
2003                async move {
2004                    let result = self.place_order_internal(order_clone, report_tx).await;
2005                    (client_order_id, order, result)
2006                }
2007            })
2008            .collect();
2009
2010        // Execute all orders concurrently
2011        let results = join_all(order_tasks).await;
2012
2013        // Process results and build BatchResult
2014        let mut order_results: OrderResultMap<()> = FxHashMap::default();
2015        let mut success_count = 0;
2016        let mut failed_count = 0;
2017
2018        for (client_order_id, order, result) in results {
2019            match result {
2020                Ok(()) => {
2021                    order_results
2022                        .insert(client_order_id.to_string().into(), OrderResult::success(()));
2023                    success_count += 1;
2024                    debug!("Order {client_order_id} placed successfully");
2025                }
2026                Err(e) => {
2027                    // Convert anyhow::Error to EMSError
2028                    let ems_error = if let Some(ems_err) = e.downcast_ref::<EMSError>() {
2029                        ems_err.clone()
2030                    } else {
2031                        EMSError::order_submission(e.to_string())
2032                    };
2033
2034                    order_results.insert(
2035                        client_order_id.to_string().into(),
2036                        OrderResult::failed(ems_error, order),
2037                    );
2038                    failed_count += 1;
2039                    warn!("Order {client_order_id} failed: {e}");
2040                }
2041            }
2042        }
2043
2044        let processing_time_ns = self.clock.raw() - start_time;
2045
2046        // Build appropriate BatchResult based on results
2047        let batch_result = if failed_count == 0 {
2048            // All succeeded
2049            BatchResult::success(order_results, processing_time_ns)
2050        } else if success_count > 0 {
2051            // Partial success
2052            BatchResult::partial_success(order_results, processing_time_ns)
2053        } else {
2054            // All failed
2055            BatchResult::all_failed(order_results, processing_time_ns)
2056        };
2057
2058        // Log summary
2059        match batch_result.status {
2060            BatchStatus::AllSucceeded => {
2061                info!("All {total_orders} orders placed successfully");
2062            }
2063            BatchStatus::PartialSuccess => {
2064                warn!("Batch partially succeeded: {success_count}/{total_orders} orders placed");
2065            }
2066            BatchStatus::AllFailed => {
2067                error!("All {total_orders} orders in batch failed");
2068            }
2069            BatchStatus::TransportFailure => {
2070                // This shouldn't happen in this implementation
2071                error!("Unexpected transport failure status");
2072            }
2073        }
2074
2075        Ok(batch_result)
2076    }
2077
2078    /// Check and record rate limit for order placement
2079    fn check_and_record_rate_limit(&self, num_orders: usize) -> Result<()> {
2080        let mut rate_limiter = self.order_rate_limiter.write();
2081        if !rate_limiter.can_place_orders(num_orders) {
2082            let current_count = rate_limiter.current_order_count();
2083            if num_orders == 1 {
2084                bail!(
2085                    "Rate limit would be exceeded: {} orders in window, limit is {}",
2086                    current_count,
2087                    MAX_ORDERS_PER_10_SECONDS
2088                );
2089            }
2090            bail!(
2091                "Rate limit would be exceeded: {} orders in window, {} requested, limit is {}",
2092                current_count,
2093                num_orders,
2094                MAX_ORDERS_PER_10_SECONDS
2095            );
2096        }
2097        rate_limiter.record_orders(num_orders);
2098        Ok(())
2099    }
2100
2101    /// Place order via WebSocket API
2102    pub async fn place_order(
2103        &self,
2104        order: Order,
2105        report_tx: Sender<ExecutionReport>,
2106    ) -> Result<()> {
2107        // Check rate limit for single order
2108        self.check_and_record_rate_limit(1)?;
2109
2110        self.place_order_internal(order, report_tx).await
2111    }
2112
2113    /// Internal order placement logic without rate limiting
2114    async fn place_order_internal(
2115        &self,
2116        order: Order,
2117        report_tx: Sender<ExecutionReport>,
2118    ) -> Result<()> {
2119        let request_id = self.request_id_gen.next_id();
2120        let timestamp = self.clock.raw();
2121
2122        // Build order request params directly as an Object for clarity and performance.
2123        let mut params = Object::with_capacity(8);
2124        params.insert("symbol".into(), order.symbol.as_str().into());
2125        params.insert("side".into(), Self::map_order_side(order.side).into());
2126        params.insert("type".into(), Self::map_order_type(order.order_type).into());
2127        params.insert("quantity".into(), order.quantity.to_string().into());
2128        params.insert("newClientOrderId".into(), order.id.to_string().into());
2129        params.insert("timestamp".into(), (timestamp / 1_000_000).into());
2130
2131        // Add price if it's a limit order
2132        if let Some(price) = order.price {
2133            params.insert("price".into(), price.to_string().into());
2134        }
2135
2136        // Add time in force
2137        let tif = Self::get_time_in_force(order.order_type);
2138        params.insert("timeInForce".into(), Self::map_time_in_force(tif).into());
2139
2140        // Build complete request
2141        let request = simd_json::json!({
2142            "id": request_id.as_json_value(),
2143            "method": "order.place",
2144            "params": JsonValue::from(params)
2145        });
2146
2147        // Track the pending request with string key
2148        {
2149            let mut pending = self.pending_requests.write();
2150            let key = request_id.to_lookup_key();
2151            pending.insert(
2152                key,
2153                PendingRequest {
2154                    method: "order.place".into(),
2155                    timestamp,
2156                    report_tx: Some(report_tx),
2157                    request_id: request_id.clone(),
2158                },
2159            );
2160        }
2161
2162        // Send the request
2163        self.send_request(request).await?;
2164
2165        Ok(())
2166    }
2167
2168    /// Get current rate limit status
2169    #[must_use]
2170    pub fn get_rate_limit_status(&self) -> (usize, usize) {
2171        let mut rate_limiter = self.order_rate_limiter.write();
2172        let current = rate_limiter.current_order_count();
2173        (current, MAX_ORDERS_PER_10_SECONDS as usize)
2174    }
2175
2176    /// Cancel order via WebSocket API
2177    pub async fn cancel_order(
2178        &self,
2179        order_id: SmartString,
2180        report_tx: Sender<ExecutionReport>,
2181    ) -> Result<()> {
2182        let request_id = self.request_id_gen.next_id();
2183        let timestamp = self.clock.raw();
2184
2185        let request = simd_json::json!({
2186            "id": request_id.as_json_value(),
2187            "method": "order.cancel",
2188            "params": {
2189                "origClientOrderId": order_id,
2190                "timestamp": timestamp / 1_000_000 // Convert ns to ms
2191            }
2192        });
2193
2194        // Track the pending request with string key
2195        {
2196            let mut pending = self.pending_requests.write();
2197            let key = request_id.to_lookup_key();
2198            pending.insert(
2199                key,
2200                PendingRequest {
2201                    method: "order.cancel".into(),
2202                    timestamp,
2203                    report_tx: Some(report_tx),
2204                    request_id: request_id.clone(),
2205                },
2206            );
2207        }
2208
2209        self.send_request(request).await?;
2210
2211        Ok(())
2212    }
2213
2214    /// Modify order via WebSocket API (cancel and replace)
2215    pub async fn modify_order(
2216        &self,
2217        params: ModifyOrderParams,
2218        report_tx: Sender<ExecutionReport>,
2219    ) -> Result<()> {
2220        let request_id = self.request_id_gen.next_id();
2221        let timestamp = self.clock.raw();
2222
2223        let mut request = simd_json::json!({
2224            "id": request_id.as_json_value(),
2225            "method": "order.cancelReplace",
2226            "params": {
2227                "cancelOrigClientOrderId": params.order_id,
2228                "symbol": params.symbol,
2229                "side": Self::map_order_side(params.side),
2230                "type": Self::map_order_type(params.order_type),
2231                "quantity": params.new_quantity.to_string(),
2232                "timeInForce": Self::map_time_in_force(Self::get_time_in_force(params.order_type)),
2233                "timestamp": timestamp / 1_000_000 // Convert ns to ms
2234            }
2235        });
2236
2237        if let Some(price) = params.new_price {
2238            request["params"]["price"] = price.to_string().into();
2239        }
2240
2241        // Track the pending request with string key
2242        {
2243            let mut pending = self.pending_requests.write();
2244            let key = request_id.to_lookup_key();
2245            pending.insert(
2246                key,
2247                PendingRequest {
2248                    method: "order.cancelReplace".into(),
2249                    timestamp,
2250                    report_tx: Some(report_tx),
2251                    request_id: request_id.clone(),
2252                },
2253            );
2254        }
2255
2256        self.send_request(request).await?;
2257
2258        Ok(())
2259    }
2260
2261    /// Send request via WebSocket
2262    async fn send_request(&self, request: JsonValue) -> Result<()> {
2263        let request_str = simd_json::to_string(&request)
2264            .map_err(|e| anyhow!("Failed to serialize JSON request: {}", e))?;
2265        debug!("Sending request: {request_str}");
2266        let request_object = Message::Text(request_str.into()).to_frame_view();
2267
2268        if let Some(ref mut ws_sink) = self.ws_sink.write().await.as_mut() {
2269            match ws_sink.send(request_object).await {
2270                Ok(()) => Ok(()),
2271                Err(e) => {
2272                    // Kill socket on send error
2273                    let error_msg = format!("Failed to send WebSocket request: {e}");
2274                    self.kill_socket(&error_msg).await;
2275                    Err(anyhow!(error_msg))
2276                }
2277            }
2278        } else {
2279            self.kill_socket("WebSocket not connected").await;
2280            Err(anyhow!("WebSocket not connected"))
2281        }
2282    }
2283
2284    /// Convert internal `OrderType` to Binance order type
2285    const fn map_order_type(order_type: OrderType) -> &'static str {
2286        match order_type {
2287            OrderType::Market => "MARKET",
2288            OrderType::Limit => "LIMIT",
2289            OrderType::Stop => "STOP_LOSS",
2290            OrderType::StopLimit => "STOP_LOSS_LIMIT",
2291            OrderType::FillOrKill => "LIMIT",
2292            OrderType::ImmediateOrCancel => "LIMIT",
2293            OrderType::PostOnly => "LIMIT",
2294        }
2295    }
2296
2297    /// Convert internal `OrderSide` to Binance side
2298    const fn map_order_side(side: OrderSide) -> &'static str {
2299        match side {
2300            OrderSide::Buy => "BUY",
2301            OrderSide::Sell => "SELL",
2302        }
2303    }
2304
2305    /// Convert internal `TimeInForce` to Binance TIF
2306    const fn map_time_in_force(tif: TimeInForce) -> &'static str {
2307        match tif {
2308            TimeInForce::GTC => "GTC",
2309            TimeInForce::IOC => "IOC",
2310            TimeInForce::FOK => "FOK",
2311            _ => "GTC",
2312        }
2313    }
2314
2315    /// Get appropriate time in force for order type
2316    const fn get_time_in_force(order_type: OrderType) -> TimeInForce {
2317        match order_type {
2318            OrderType::Market => TimeInForce::IOC,
2319            OrderType::Limit => TimeInForce::GTC,
2320            OrderType::Stop => TimeInForce::GTC,
2321            OrderType::StopLimit => TimeInForce::GTC,
2322            OrderType::FillOrKill => TimeInForce::FOK,
2323            OrderType::ImmediateOrCancel => TimeInForce::IOC,
2324            OrderType::PostOnly => TimeInForce::GTC,
2325        }
2326    }
2327
2328    /// Send JSON ping to check connection
2329    pub async fn send_ping(&self) -> Result<()> {
2330        // Check if connected first
2331        if !self.is_connected() {
2332            return Err(anyhow!("WebSocket not connected - cannot send JSON ping"));
2333        }
2334
2335        let request_id = self.request_id_gen.next_id();
2336        let timestamp = self.clock.raw();
2337
2338        let request = simd_json::json!({
2339            "id": request_id.as_json_value(),
2340            "method": "ping",
2341            "params": {}
2342        });
2343
2344        // Track the pending request with optimized key (no string allocation for sequential IDs)
2345        let key = request_id.to_lookup_key();
2346        {
2347            let mut pending = self.pending_requests.write();
2348            pending.insert(
2349                key.clone(),
2350                PendingRequest {
2351                    method: "ping".into(),
2352                    timestamp,
2353                    report_tx: None, // Ping doesn't need execution report
2354                    request_id: request_id.clone(),
2355                },
2356            );
2357        }
2358
2359        let display_id = match &request_id {
2360            RequestId::Sequential(id) => id.to_string(),
2361            RequestId::Uuid(id) => id.to_string(),
2362        };
2363        debug!("📤 Sending JSON ping request (id: {display_id})");
2364
2365        // Send with additional error handling
2366        match self.send_request(request).await {
2367            Ok(()) => {
2368                debug!("✅ JSON ping #{display_id} sent successfully");
2369                Ok(())
2370            }
2371            Err(e) => {
2372                // Remove from pending requests if send failed
2373                {
2374                    let mut pending = self.pending_requests.write();
2375                    pending.remove(&key);
2376                }
2377
2378                // Mark as disconnected if we get a broken pipe
2379                if e.to_string().contains("Broken pipe") {
2380                    warn!("🔌 WebSocket connection broken, marking as disconnected");
2381                }
2382
2383                Err(e)
2384            }
2385        }
2386    }
2387
2388    /// Check if connected
2389    #[must_use]
2390    pub fn is_connected(&self) -> bool {
2391        let state = self.get_state();
2392        matches!(state, ConnectionState::Authenticated)
2393    }
2394
2395    /// Kill the socket connection and clean up resources
2396    /// This allows background tasks to detect the disconnection and spawn a new socket
2397    async fn kill_socket(&self, reason: &str) {
2398        warn!("🗡️  KILL-STEP 1/9: Killing WebSocket connection: {reason}");
2399
2400        // Transition to disconnecting state
2401        self.set_state(ConnectionState::Disconnecting);
2402        info!("🗡️  KILL-STEP 2/9: State set to Disconnecting");
2403
2404        // Step 1: Signal reconnection monitor to stop gracefully
2405        info!("🗡️  KILL-STEP 3/9: Signaling reconnection monitor to stop...");
2406        self.reconnection_control
2407            .should_stop
2408            .store(true, Ordering::Release);
2409
2410        // Step 2: Cancel other background tasks
2411        info!("🗡️  KILL-STEP 4/9: Cancelling background tasks...");
2412        let handles = {
2413            let mut handles = self.task_handles.write().await;
2414            TaskHandles {
2415                response_handler: handles.response_handler.take(),
2416                ping_handler: handles.ping_handler.take(),
2417                reconnection_monitor: handles.reconnection_monitor.take(),
2418            }
2419        };
2420
2421        // Cancel tasks by aborting them
2422        if let Some(handle) = handles.response_handler {
2423            handle.abort();
2424            debug!("Response handler task aborted");
2425        }
2426        if let Some(handle) = handles.ping_handler {
2427            handle.abort();
2428            debug!("Ping handler task aborted");
2429        }
2430
2431        // Wait a bit for reconnection monitor to exit gracefully
2432        if let Some(handle) = handles.reconnection_monitor {
2433            match tokio::time::timeout(Duration::from_millis(100), handle).await {
2434                Ok(_) => debug!("Reconnection monitor exited gracefully"),
2435                Err(_) => {
2436                    debug!("Reconnection monitor didn't exit in time, continuing anyway");
2437                    // The monitor will exit on its next check interval
2438                }
2439            }
2440        }
2441
2442        info!("🗡️  KILL-STEP 5/9: Tasks cancelled");
2443
2444        // Step 3: Close WebSocket sink first (sends close frame)
2445        info!("🗡️  KILL-STEP 6/9: Closing WebSocket sink...");
2446        if let Some(mut ws_sink) = self.ws_sink.write().await.take() {
2447            // Add timeout to sink close to prevent hanging
2448            match tokio::time::timeout(Duration::from_secs(2), ws_sink.close()).await {
2449                Ok(Ok(())) => {
2450                    info!("🗡️  KILL-STEP 6/9: WebSocket sink closed successfully");
2451                }
2452                Ok(Err(e)) => {
2453                    warn!("🗡️  KILL-STEP 6/9: WebSocket sink close failed: {e}");
2454                }
2455                Err(_) => {
2456                    warn!("🗡️  KILL-STEP 6/9: WebSocket sink close timed out - continuing anyway");
2457                }
2458            }
2459        } else {
2460            info!("🗡️  KILL-STEP 6/9: No WebSocket sink to close");
2461        }
2462
2463        // Step 4: Clean up WebSocket stream
2464        info!("🗡️  KILL-STEP 7/9: Cleaning up WebSocket stream...");
2465        match tokio::time::timeout(Duration::from_secs(1), async {
2466            *self.ws_stream.write().await = None;
2467        })
2468        .await
2469        {
2470            Ok(()) => {
2471                info!("🗡️  KILL-STEP 7/9: WebSocket stream cleaned up successfully");
2472            }
2473            Err(_) => {
2474                warn!("🗡️  KILL-STEP 7/9: WebSocket stream cleanup timed out - continuing anyway");
2475            }
2476        }
2477
2478        // Step 5: Clear any pending requests to prevent memory leaks
2479        info!("🗡️  KILL-STEP 8/9: Clearing pending requests...");
2480        match tokio::time::timeout(Duration::from_secs(1), async {
2481            let mut pending = self.pending_requests.write();
2482            // Check if we have too many pending requests (possible memory leak)
2483            if pending.len() > MAX_PENDING_REQUESTS {
2484                warn!(
2485                    "Detected {} pending requests - possible memory leak",
2486                    pending.len()
2487                );
2488            }
2489            pending.clear();
2490        })
2491        .await
2492        {
2493            Ok(()) => {
2494                info!("🗡️  KILL-STEP 8/9: Pending requests cleared successfully");
2495            }
2496            Err(_) => {
2497                warn!("🗡️  KILL-STEP 8/9: Pending requests cleanup timed out - continuing anyway");
2498            }
2499        }
2500
2501        // Step 6: Reset state
2502        info!("🗡️  KILL-STEP 9/9: Resetting state...");
2503        self.last_ping_time.store(0, Ordering::Release);
2504        self.last_pong_time.store(0, Ordering::Release);
2505        self.auth_completed_time.store(0, Ordering::Release);
2506        self.set_state(ConnectionState::Disconnected);
2507
2508        info!(
2509            "🗡️  ✅ WebSocket connection killed and resources cleaned up - ready for reconnection"
2510        );
2511    }
2512
2513    /// Attempt to reconnect WebSocket (can be called by external monitoring tasks)
2514    /// Note: This will start a new auto-reconnection monitor, so use sparingly
2515    pub async fn reconnect(&self, report_tx: Sender<ExecutionReport>) -> Result<()> {
2516        info!("External reconnection requested...");
2517
2518        // Kill existing connection first
2519        self.kill_socket("External reconnection requested").await;
2520
2521        // Small delay to allow cleanup
2522        tokio::time::sleep(Duration::from_millis(100)).await;
2523
2524        // Attempt new connection (this will start a new auto-reconnection monitor)
2525        self.connect(report_tx).await
2526    }
2527
2528    /// Gracefully disconnect from WebSocket
2529    pub async fn disconnect(&self) -> Result<()> {
2530        info!("Gracefully disconnecting from Binance WebSocket...");
2531
2532        // Use kill_socket for proper cleanup
2533        self.kill_socket("Graceful disconnect requested").await;
2534
2535        // Cancel the reconnection monitor to prevent auto-reconnection
2536        if let Some(handle) = self.task_handles.write().await.reconnection_monitor.take() {
2537            handle.abort();
2538            info!("Reconnection monitor cancelled");
2539        }
2540
2541        info!("Disconnected from Binance WebSocket");
2542        Ok(())
2543    }
2544}
2545
2546#[async_trait]
2547impl crate::execution_engine::Exchange for BinanceWebSocketTrader {
2548    fn venue(&self) -> Venue {
2549        Venue::Binance
2550    }
2551
2552    async fn place_order(&self, order: Order, report_tx: Sender<ExecutionReport>) -> Result<()> {
2553        // Check rate limit for single order
2554        self.check_and_record_rate_limit(1)?;
2555
2556        self.place_order_internal(order, report_tx).await
2557    }
2558
2559    async fn cancel_order(
2560        &self,
2561        order_id: SmartString,
2562        report_tx: Sender<ExecutionReport>,
2563    ) -> Result<()> {
2564        let request_id = self.request_id_gen.next_id();
2565        let timestamp = self.clock.raw();
2566
2567        let mut params = Object::with_capacity(2);
2568        params.insert("origClientOrderId".into(), order_id.as_str().into());
2569        params.insert("timestamp".into(), (timestamp / 1_000_000).into());
2570
2571        // Build complete request
2572        let request = simd_json::json!({
2573            "id": request_id.as_json_value(),
2574            "method": "order.cancel",
2575            "params": JsonValue::from(params)
2576        });
2577
2578        // Track the pending request
2579        {
2580            let mut pending = self.pending_requests.write();
2581            let key = request_id.to_lookup_key();
2582            pending.insert(
2583                key,
2584                PendingRequest {
2585                    method: "order.cancel".into(),
2586                    timestamp,
2587                    report_tx: Some(report_tx),
2588                    request_id: request_id.clone(),
2589                },
2590            );
2591        }
2592
2593        self.send_request(request).await
2594    }
2595
2596    async fn modify_order(
2597        &self,
2598        order_id: SmartString,
2599        new_price: Option<Decimal>,
2600        new_quantity: Option<Decimal>,
2601        report_tx: Sender<ExecutionReport>,
2602    ) -> Result<()> {
2603        let request_id = self.request_id_gen.next_id();
2604        let timestamp = self.clock.raw();
2605
2606        let mut params = Object::with_capacity(4);
2607        params.insert("origClientOrderId".into(), order_id.as_str().into());
2608        params.insert("timestamp".into(), (timestamp / 1_000_000).into());
2609
2610        if let Some(price) = new_price {
2611            params.insert("price".into(), price.to_string().into());
2612        }
2613        if let Some(quantity) = new_quantity {
2614            params.insert("quantity".into(), quantity.to_string().into());
2615        }
2616
2617        // Build complete request
2618        let request = simd_json::json!({
2619            "id": request_id.as_json_value(),
2620            "method": "order.cancelReplace",
2621            "params": JsonValue::from(params)
2622        });
2623
2624        // Track the pending request
2625        {
2626            let mut pending = self.pending_requests.write();
2627            let key = request_id.to_lookup_key();
2628            pending.insert(
2629                key,
2630                PendingRequest {
2631                    method: "order.cancelReplace".into(),
2632                    timestamp,
2633                    report_tx: Some(report_tx),
2634                    request_id: request_id.clone(),
2635                },
2636            );
2637        }
2638
2639        self.send_request(request).await
2640    }
2641
2642    async fn cancel_all_orders(
2643        &self,
2644        instrument_id: Option<InstrumentId>,
2645        report_tx: Sender<ExecutionReport>,
2646    ) -> Result<()> {
2647        let request_id = self.request_id_gen.next_id();
2648        let timestamp = self.clock.raw();
2649
2650        // Binance WebSocket API requires symbol parameter for openOrders.cancelAll
2651        let instrument = instrument_id.ok_or_else(|| {
2652            anyhow!("Binance openOrders.cancelAll requires a symbol parameter - instrument_id cannot be None")
2653        })?;
2654
2655        let mut params = Object::with_capacity(2);
2656        params.insert("timestamp".into(), (timestamp / 1_000_000).into());
2657        params.insert("symbol".into(), instrument.symbol.as_str().into());
2658
2659        // Build complete request
2660        let request = simd_json::json!({
2661            "id": request_id.as_json_value(),
2662            "method": "openOrders.cancelAll",
2663            "params": JsonValue::from(params)
2664        });
2665
2666        // Track the pending request
2667        {
2668            let mut pending = self.pending_requests.write();
2669            let key = request_id.to_lookup_key();
2670            pending.insert(
2671                key,
2672                PendingRequest {
2673                    method: "openOrders.cancelAll".into(),
2674                    timestamp,
2675                    report_tx: Some(report_tx),
2676                    request_id: request_id.clone(),
2677                },
2678            );
2679        }
2680
2681        self.send_request(request).await
2682    }
2683
2684    async fn get_order_status(&self, order_id: &str) -> Result<OrderStatus> {
2685        // This method is typically implemented via REST API for Binance
2686        // For WebSocket-only, we'd need to subscribe to order updates or query via a different method
2687        // For now, return an error indicating not supported
2688        Err(anyhow!(
2689            "get_order_status not supported via WebSocket for Binance"
2690        ))
2691    }
2692
2693    async fn connect(&self, report_sender: Sender<ExecutionReport>) -> Result<()> {
2694        self.connect_without_monitor(report_sender).await
2695    }
2696
2697    async fn disconnect(&self) -> Result<()> {
2698        self.disconnect().await
2699    }
2700
2701    async fn is_connected(&self) -> bool {
2702        self.get_state() == ConnectionState::Authenticated
2703    }
2704
2705    async fn get_instruments(&self) -> Result<SmallVec<[InstrumentId; 32]>> {
2706        // This is typically fetched via REST API. For WebSocket-only, it's not directly available.
2707        // Return an empty list or fetch from a cached source if available.
2708        Ok(SmallVec::new())
2709    }
2710
2711    async fn send_fix_message(&self, message: Vec<u8>) -> Result<()> {
2712        log::warn!(
2713            "BinanceWebSocketTrader: send_fix_message not implemented. Message: {:?}",
2714            message
2715        );
2716        Err(anyhow!(
2717            "FIX message sending not implemented for Binance WebSocket"
2718        ))
2719    }
2720
2721    async fn receive_fix_message(&self) -> Result<Vec<u8>> {
2722        log::warn!("BinanceWebSocketTrader: receive_fix_message not implemented.");
2723        Err(anyhow!(
2724            "FIX message receiving not implemented for Binance WebSocket"
2725        ))
2726    }
2727}
2728
2729#[cfg(test)]
2730mod tests {
2731    use super::*;
2732    use crate::position_manager::{DefaultPositionManager, PositionManager};
2733    use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
2734
2735    /// Create a mock position manager for testing
2736    fn create_mock_position_manager() -> Arc<dyn PositionManager> {
2737        Arc::new(DefaultPositionManager::new(None))
2738    }
2739
2740    #[tokio::test]
2741    async fn test_websocket_trader_creation() {
2742        let api_key = SmartString::from("test_api_key");
2743        let private_key = BASE64.encode([1u8; 32]); // Test key
2744
2745        let auth = Arc::new(BinanceAuth::new_ed25519(api_key, private_key.into()).unwrap());
2746        let trader = BinanceWebSocketTrader::new(auth, create_mock_position_manager());
2747
2748        assert!(!trader.is_connected());
2749
2750        // Test request ID generation
2751        let first_id = trader.request_id_gen.next_id();
2752        assert!(
2753            matches!(first_id, RequestId::Sequential(1)),
2754            "Expected sequential ID 1, got {first_id:?}"
2755        );
2756
2757        let second_id = trader.request_id_gen.next_id();
2758        assert!(
2759            matches!(second_id, RequestId::Sequential(2)),
2760            "Expected sequential ID 2, got {second_id:?}"
2761        );
2762    }
2763
2764    #[test]
2765    fn test_stable_position_id_generation() {
2766        // Test that the same position characteristics generate the same ID
2767        let symbol = "BTCUSDT";
2768        let side = PositionSide::Long;
2769        let venue = Venue::Binance;
2770
2771        let id1 = BinanceWebSocketTrader::generate_stable_position_id(symbol, side, venue);
2772        let id2 = BinanceWebSocketTrader::generate_stable_position_id(symbol, side, venue);
2773
2774        assert_eq!(
2775            id1, id2,
2776            "Same position characteristics should generate same ID"
2777        );
2778
2779        // Test that different characteristics generate different IDs
2780        let different_symbol_id =
2781            BinanceWebSocketTrader::generate_stable_position_id("ETHUSDT", side, venue);
2782        let different_side_id =
2783            BinanceWebSocketTrader::generate_stable_position_id(symbol, PositionSide::Short, venue);
2784
2785        assert_ne!(
2786            id1, different_symbol_id,
2787            "Different symbols should generate different IDs"
2788        );
2789        assert_ne!(
2790            id1, different_side_id,
2791            "Different sides should generate different IDs"
2792        );
2793
2794        // Test that IDs are deterministic across runs
2795        let id3 = BinanceWebSocketTrader::generate_stable_position_id(symbol, side, venue);
2796        assert_eq!(id1, id3, "ID generation should be deterministic");
2797    }
2798
2799    #[test]
2800    fn test_order_type_mapping() {
2801        assert_eq!(
2802            BinanceWebSocketTrader::map_order_type(OrderType::Market),
2803            "MARKET"
2804        );
2805        assert_eq!(
2806            BinanceWebSocketTrader::map_order_type(OrderType::Limit),
2807            "LIMIT"
2808        );
2809        assert_eq!(
2810            BinanceWebSocketTrader::map_order_side(OrderSide::Buy),
2811            "BUY"
2812        );
2813        assert_eq!(
2814            BinanceWebSocketTrader::map_order_side(OrderSide::Sell),
2815            "SELL"
2816        );
2817    }
2818
2819    #[test]
2820    fn test_order_status_mapping() {
2821        assert_eq!(
2822            BinanceWebSocketTrader::map_order_status("NEW"),
2823            OrderStatus::New
2824        );
2825        assert_eq!(
2826            BinanceWebSocketTrader::map_order_status("FILLED"),
2827            OrderStatus::Filled
2828        );
2829        assert_eq!(
2830            BinanceWebSocketTrader::map_order_status("CANCELLED"),
2831            OrderStatus::Cancelled
2832        );
2833    }
2834
2835    #[tokio::test]
2836    async fn test_ping_handler_lifecycle() {
2837        let auth = Arc::new(BinanceAuth::new_hmac(
2838            "test_key".into(),
2839            "test_secret".into(),
2840        ));
2841        let (report_tx, _report_rx) = flume::bounded::<ExecutionReport>(100);
2842        let trader = BinanceWebSocketTrader::new_testnet(auth, create_mock_position_manager());
2843
2844        // Check initial state
2845        assert_eq!(trader.get_state(), ConnectionState::Disconnected);
2846        assert_eq!(
2847            trader.metrics.websocket_pings_sent.load(Ordering::Relaxed),
2848            0
2849        );
2850        assert_eq!(
2851            trader
2852                .metrics
2853                .websocket_pongs_received
2854                .load(Ordering::Relaxed),
2855            0
2856        );
2857
2858        // Verify ping handler starts properly (without actual connection)
2859        let result = trader.start_ping_handler().await;
2860        assert!(result.is_ok());
2861        let handle = result.unwrap();
2862
2863        // Let it run briefly
2864        tokio::time::sleep(Duration::from_millis(100)).await;
2865
2866        // Abort the handler
2867        handle.abort();
2868    }
2869
2870    #[tokio::test]
2871    async fn test_json_ping_not_connected() {
2872        let auth = Arc::new(BinanceAuth::new_hmac(
2873            "test_key".into(),
2874            "test_secret".into(),
2875        ));
2876        let (report_tx, _report_rx) = flume::bounded::<ExecutionReport>(100);
2877        let trader = BinanceWebSocketTrader::new_testnet(auth, create_mock_position_manager());
2878
2879        // Try to send ping when not connected
2880        let result = trader.send_ping().await;
2881        assert!(result.is_err());
2882        assert!(
2883            result
2884                .unwrap_err()
2885                .to_string()
2886                .contains("WebSocket not connected")
2887        );
2888    }
2889
2890    #[test]
2891    fn test_connection_metrics() {
2892        let auth = Arc::new(BinanceAuth::new_hmac(
2893            "test_key".into(),
2894            "test_secret".into(),
2895        ));
2896        let (report_tx, _report_rx) = flume::bounded::<ExecutionReport>(100);
2897        let trader = BinanceWebSocketTrader::new_testnet(auth, create_mock_position_manager());
2898
2899        // Test initial metrics
2900        let metrics = &trader.metrics;
2901        assert_eq!(metrics.websocket_pings_sent.load(Ordering::Relaxed), 0);
2902        assert_eq!(metrics.websocket_pongs_received.load(Ordering::Relaxed), 0);
2903        assert_eq!(metrics.json_pings_sent.load(Ordering::Relaxed), 0);
2904        assert_eq!(metrics.json_pongs_received.load(Ordering::Relaxed), 0);
2905        assert_eq!(metrics.messages_sent.load(Ordering::Relaxed), 0);
2906        assert_eq!(metrics.messages_received.load(Ordering::Relaxed), 0);
2907        assert_eq!(metrics.reconnection_attempts.load(Ordering::Relaxed), 0);
2908        assert_eq!(metrics.successful_reconnections.load(Ordering::Relaxed), 0);
2909        assert_eq!(metrics.failed_reconnections.load(Ordering::Relaxed), 0);
2910
2911        // Test metric updates
2912        metrics.websocket_pings_sent.fetch_add(1, Ordering::Relaxed);
2913        assert_eq!(metrics.websocket_pings_sent.load(Ordering::Relaxed), 1);
2914    }
2915
2916    #[test]
2917    fn test_ping_pong_timeout_calculation() {
2918        // Verify ping/pong constants
2919        assert_eq!(PING_INTERVAL_SECONDS, 30);
2920        assert_eq!(PONG_TIMEOUT_SECONDS, 10);
2921
2922        // Ensure timeout is reasonable compared to interval
2923        // This is a compile-time check - the constants are validated at compile time
2924        const _: () = assert!(PONG_TIMEOUT_SECONDS < PING_INTERVAL_SECONDS);
2925    }
2926
2927    #[test]
2928    fn test_batch_size_constants() {
2929        // Verify new batch size limits
2930        assert_eq!(MAX_BATCH_SIZE, 50);
2931        assert_eq!(MAX_ORDERS_PER_10_SECONDS, 300);
2932
2933        // Ensure batch size is reasonable compared to rate limit
2934        assert!(MAX_BATCH_SIZE <= MAX_ORDERS_PER_10_SECONDS as usize);
2935
2936        // Ensure we can fit multiple batches within rate limit window
2937        assert!(MAX_BATCH_SIZE * 5 < MAX_ORDERS_PER_10_SECONDS as usize);
2938    }
2939
2940    #[tokio::test]
2941    async fn test_batch_orders_empty_list() {
2942        let auth = Arc::new(BinanceAuth::new_hmac(
2943            "test_key".into(),
2944            "test_secret".into(),
2945        ));
2946        let trader = BinanceWebSocketTrader::new(auth, create_mock_position_manager());
2947        let (report_tx, _report_rx) = flume::bounded(100);
2948
2949        // Empty batch should succeed without error
2950        let result = trader.place_batch_orders(vec![], report_tx).await;
2951        assert!(result.is_ok());
2952    }
2953
2954    #[tokio::test]
2955    async fn test_batch_orders_exceeds_limit() {
2956        let auth = Arc::new(BinanceAuth::new_hmac(
2957            "test_key".into(),
2958            "test_secret".into(),
2959        ));
2960        let trader = BinanceWebSocketTrader::new(auth, create_mock_position_manager());
2961        let (report_tx, _report_rx) = flume::bounded(100);
2962
2963        // Create orders exceeding MAX_BATCH_SIZE
2964        let mut orders = Vec::new();
2965        for i in 0..=MAX_BATCH_SIZE {
2966            let order = Order::new(
2967                Venue::Binance,
2968                "BTCUSDT",
2969                OrderSide::Buy,
2970                rusty_model::enums::OrderType::Limit,
2971                Decimal::ONE,
2972                Some(Decimal::from(50000)),
2973                rusty_model::types::ClientId::new(format!("test_client_{i}")),
2974            );
2975            orders.push(order);
2976        }
2977
2978        // Should fail with batch size exceeded error
2979        let result = trader.place_batch_orders(orders, report_tx).await;
2980        assert!(result.is_err());
2981        assert!(result.unwrap_err().to_string().contains("exceeds maximum"));
2982    }
2983
2984    #[test]
2985    fn test_rate_limiter_functionality() {
2986        let auth = Arc::new(BinanceAuth::new_hmac(
2987            "test_key".into(),
2988            "test_secret".into(),
2989        ));
2990        let trader = BinanceWebSocketTrader::new(auth, create_mock_position_manager());
2991
2992        // Test rate limit status through trader interface
2993        let (current, limit) = trader.get_rate_limit_status();
2994        assert_eq!(current, 0); // Initially empty
2995        assert_eq!(limit, MAX_ORDERS_PER_10_SECONDS as usize);
2996
2997        // Test direct rate limiter functionality
2998        let clock = Clock::new();
2999        let mut rate_limiter = OrderRateLimiter::new(clock);
3000
3001        // Should allow orders within limit
3002        assert!(rate_limiter.can_place_orders(100));
3003        rate_limiter.record_orders(100);
3004
3005        // Should allow more orders up to limit
3006        assert!(rate_limiter.can_place_orders(200));
3007        rate_limiter.record_orders(200);
3008
3009        // Should deny orders exceeding limit
3010        assert!(!rate_limiter.can_place_orders(1));
3011
3012        // Test current count reporting
3013        let current = rate_limiter.current_order_count();
3014        assert_eq!(current, 300);
3015    }
3016
3017    #[test]
3018    fn test_order_rate_limiter_new() {
3019        let clock = Clock::new();
3020        let rate_limiter = OrderRateLimiter::new(clock.clone());
3021
3022        // Should start with empty order times
3023        assert_eq!(rate_limiter.order_times.len(), 0);
3024
3025        // Clock should be set correctly
3026        assert_eq!(
3027            rate_limiter.clock.raw() / 1_000_000,
3028            clock.raw() / 1_000_000
3029        );
3030    }
3031
3032    #[test]
3033    fn test_order_rate_limiter_can_place_orders_basic() {
3034        let clock = Clock::new();
3035        let mut rate_limiter = OrderRateLimiter::new(clock);
3036
3037        // Should be able to place orders when empty
3038        assert!(rate_limiter.can_place_orders(1));
3039        assert!(rate_limiter.can_place_orders(10));
3040        assert!(rate_limiter.can_place_orders(MAX_ORDERS_PER_10_SECONDS as usize));
3041
3042        // Should not be able to place more than max orders
3043        assert!(!rate_limiter.can_place_orders(MAX_ORDERS_PER_10_SECONDS as usize + 1));
3044    }
3045
3046    #[test]
3047    fn test_order_rate_limiter_can_place_orders_with_existing() {
3048        let clock = Clock::new();
3049        let mut rate_limiter = OrderRateLimiter::new(clock.clone());
3050
3051        // Add some existing orders
3052        let now = clock.raw() / 1_000_000;
3053        rate_limiter.order_times.push_back(now);
3054        rate_limiter.order_times.push_back(now);
3055
3056        // Should account for existing orders
3057        assert!(rate_limiter.can_place_orders(1));
3058        assert!(rate_limiter.can_place_orders(MAX_ORDERS_PER_10_SECONDS as usize - 2));
3059        assert!(!rate_limiter.can_place_orders(MAX_ORDERS_PER_10_SECONDS as usize - 1));
3060    }
3061
3062    #[test]
3063    fn test_order_rate_limiter_record_orders() {
3064        let clock = Clock::new();
3065        let mut rate_limiter = OrderRateLimiter::new(clock);
3066
3067        // Record single order
3068        rate_limiter.record_orders(1);
3069        assert_eq!(rate_limiter.order_times.len(), 1);
3070
3071        // Record multiple orders
3072        rate_limiter.record_orders(5);
3073        assert_eq!(rate_limiter.order_times.len(), 6);
3074
3075        // All recorded orders should have similar timestamps
3076        let first_time = rate_limiter.order_times[0];
3077        let last_time = rate_limiter.order_times[5];
3078        assert_eq!(first_time, last_time); // Should be same millisecond
3079    }
3080
3081    #[test]
3082    fn test_order_rate_limiter_get_current_usage() {
3083        let clock = Clock::new();
3084        let mut rate_limiter = OrderRateLimiter::new(clock);
3085
3086        // Initial usage should be 0
3087        let (current, limit) = rate_limiter.get_current_usage();
3088        assert_eq!(current, 0);
3089        assert_eq!(limit, MAX_ORDERS_PER_10_SECONDS as usize);
3090
3091        // Record some orders
3092        rate_limiter.record_orders(10);
3093        let (current, limit) = rate_limiter.get_current_usage();
3094        assert_eq!(current, 10);
3095        assert_eq!(limit, MAX_ORDERS_PER_10_SECONDS as usize);
3096    }
3097
3098    #[test]
3099    fn test_order_rate_limiter_current_order_count() {
3100        let clock = Clock::new();
3101        let mut rate_limiter = OrderRateLimiter::new(clock);
3102
3103        // Initial count should be 0
3104        assert_eq!(rate_limiter.current_order_count(), 0);
3105
3106        // Record some orders
3107        rate_limiter.record_orders(7);
3108        assert_eq!(rate_limiter.current_order_count(), 7);
3109
3110        // Record more orders
3111        rate_limiter.record_orders(3);
3112        assert_eq!(rate_limiter.current_order_count(), 10);
3113    }
3114
3115    #[test]
3116    fn test_rate_limiter_window_cleanup() {
3117        let clock = Clock::new();
3118        let mut rate_limiter = OrderRateLimiter::new(clock.clone());
3119
3120        // Add orders that should expire
3121        rate_limiter.order_times.push_back(0); // Very old timestamp
3122        rate_limiter.order_times.push_back(1000); // Old timestamp
3123
3124        // Add current orders
3125        let now = clock.raw() / 1_000_000;
3126        rate_limiter.order_times.push_back(now);
3127        rate_limiter.order_times.push_back(now);
3128
3129        // Check should clean up old orders
3130        assert!(rate_limiter.can_place_orders(1));
3131
3132        // Should only have current orders remaining
3133        assert_eq!(rate_limiter.order_times.len(), 2);
3134    }
3135
3136    #[test]
3137    fn test_order_rate_limiter_edge_cases() {
3138        let clock = Clock::new();
3139        let mut rate_limiter = OrderRateLimiter::new(clock);
3140
3141        // Test with zero count
3142        assert!(rate_limiter.can_place_orders(0));
3143        rate_limiter.record_orders(0);
3144        assert_eq!(rate_limiter.current_order_count(), 0);
3145
3146        // Test at exact limit
3147        let max_orders = MAX_ORDERS_PER_10_SECONDS as usize;
3148        assert!(rate_limiter.can_place_orders(max_orders));
3149        rate_limiter.record_orders(max_orders);
3150        assert_eq!(rate_limiter.current_order_count(), max_orders);
3151
3152        // Should not be able to place any more
3153        assert!(!rate_limiter.can_place_orders(1));
3154    }
3155
3156    #[test]
3157    fn test_order_rate_limiter_window_sliding() {
3158        let clock = Clock::new();
3159        let mut rate_limiter = OrderRateLimiter::new(clock.clone());
3160
3161        // Add orders at different times within the window
3162        let now = clock.raw() / 1_000_000;
3163        let window_start = now.saturating_sub(RATE_LIMIT_WINDOW_MS);
3164
3165        // Add orders at start of window (should still be valid)
3166        rate_limiter.order_times.push_back(window_start + 1000); // 1 second into window
3167        rate_limiter.order_times.push_back(window_start + 5000); // 5 seconds into window
3168
3169        // Add orders at end of window (current time)
3170        rate_limiter.order_times.push_back(now);
3171
3172        // All orders should be counted
3173        assert_eq!(rate_limiter.current_order_count(), 3);
3174
3175        // Should be able to place more orders up to limit
3176        assert!(rate_limiter.can_place_orders(MAX_ORDERS_PER_10_SECONDS as usize - 3));
3177    }
3178
3179    #[test]
3180    fn test_order_rate_limiter_cleanup_all_methods() {
3181        let clock = Clock::new();
3182        let mut rate_limiter = OrderRateLimiter::new(clock);
3183
3184        // Add very old orders that should be cleaned up
3185        rate_limiter.order_times.push_back(0);
3186        rate_limiter.order_times.push_back(1000);
3187        rate_limiter.order_times.push_back(2000);
3188
3189        // Test that all methods clean up old orders
3190        assert!(rate_limiter.can_place_orders(1));
3191        assert_eq!(rate_limiter.order_times.len(), 0); // Should be cleaned up
3192
3193        // Add old orders again
3194        rate_limiter.order_times.push_back(0);
3195        rate_limiter.order_times.push_back(1000);
3196
3197        let count = rate_limiter.current_order_count();
3198        assert_eq!(count, 0); // Should be cleaned up
3199        assert_eq!(rate_limiter.order_times.len(), 0);
3200
3201        // Add old orders again
3202        rate_limiter.order_times.push_back(0);
3203        rate_limiter.order_times.push_back(1000);
3204
3205        let (current, _) = rate_limiter.get_current_usage();
3206        assert_eq!(current, 0); // Should be cleaned up
3207        assert_eq!(rate_limiter.order_times.len(), 0);
3208    }
3209
3210    #[tokio::test]
3211    async fn test_concurrent_batch_error_handling() {
3212        let auth = Arc::new(BinanceAuth::new_hmac(
3213            "test_key".into(),
3214            "test_secret".into(),
3215        ));
3216        let trader = BinanceWebSocketTrader::new(auth, create_mock_position_manager());
3217        let (report_tx, _report_rx) = flume::bounded(100);
3218
3219        // Create a small batch that won't trigger rate limits
3220        let mut orders = Vec::new();
3221        for i in 0..3 {
3222            let order = Order::new(
3223                Venue::Binance,
3224                "BTCUSDT",
3225                OrderSide::Buy,
3226                rusty_model::enums::OrderType::Limit,
3227                Decimal::ONE,
3228                Some(Decimal::from(50000)),
3229                rusty_model::types::ClientId::new(format!("test_client_{i}")),
3230            );
3231            orders.push(order);
3232        }
3233
3234        // This will fail because we're not connected, but should test the concurrent logic
3235        let result = trader.place_batch_orders(orders, report_tx).await;
3236        assert!(result.is_err());
3237
3238        // Error should mention batch failure rather than individual order failure
3239        let error_msg = result.unwrap_err().to_string();
3240        assert!(error_msg.contains("batch") || error_msg.contains("All orders"));
3241    }
3242
3243    #[test]
3244    fn test_batch_size_increase() {
3245        // Verify that we increased batch size from 5 to 50
3246        assert_eq!(MAX_BATCH_SIZE, 50);
3247
3248        // Should fit well within rate limits
3249        assert!(MAX_BATCH_SIZE < MAX_ORDERS_PER_10_SECONDS as usize / 2);
3250    }
3251}