1use rusty_common::collections::FxHashMap;
68use std::sync::Arc;
69use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
70
71use anyhow::{Result, anyhow, bail};
72use async_trait::async_trait;
73use flume::Sender;
74use futures::{SinkExt, StreamExt};
75use log::{debug, error, info, warn};
76use parking_lot::RwLock;
77use quanta::Clock;
78use rand::Rng;
79use rust_decimal::Decimal;
80use rusty_common::SmartString;
81use rusty_common::auth::exchanges::binance::BinanceAuth;
82use rusty_common::id_generation;
83use rusty_common::websocket::connector::{WebSocketSink, WebSocketStream};
84use rusty_common::websocket::{Message, WebSocketConfig, WebSocketConnector};
85use rusty_model::{
86 enums::{OrderSide, OrderStatus, OrderType, TimeInForce},
87 instruments::InstrumentId,
88 position::{MarginType, PositionSide, PositionUpdate},
89 trading_order::Order,
90 venues::Venue,
91};
92use simd_json::prelude::*;
93use simd_json::value::owned::{Object, Value as JsonValue};
94use smallvec::SmallVec;
95use std::time::Duration;
96use tokio::sync::RwLock as AsyncRwLock;
97use tokio::task::JoinHandle;
98use tokio::time::interval;
99use uuid::Uuid;
100use yawc::frame::OpCode;
101
102use crate::error::batch_errors::{BatchResult, BatchStatus, OrderResult, OrderResultMap};
103use crate::execution_engine::ExecutionReport;
104use crate::position_manager::PositionManager;
105
106const BINANCE_WS_API_URL: &str = "wss://ws-api.binance.com/ws-api/v3";
108const BINANCE_WS_API_TESTNET_URL: &str = "wss://ws-api.testnet.binance.vision/ws-api/v3";
109
110const PING_INTERVAL_SECONDS: u64 = 30;
112
113const PONG_TIMEOUT_SECONDS: u64 = 10;
115
116const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
118
119const MAX_PENDING_REQUESTS: usize = 10000;
121
122const MAX_ORDERS_PER_10_SECONDS: u32 = 300;
124const RATE_LIMIT_WINDOW_MS: u64 = 10_000; const MAX_BATCH_SIZE: usize = 50; struct OrderRateLimiter {
129 order_times: std::collections::VecDeque<u64>,
130 clock: Clock,
131}
132
133impl OrderRateLimiter {
134 const fn new(clock: Clock) -> Self {
135 Self {
136 order_times: std::collections::VecDeque::new(),
137 clock,
138 }
139 }
140
141 fn cleanup_expired(&mut self) {
143 let now = self.clock.raw() / 1_000_000; let window_start = now.saturating_sub(RATE_LIMIT_WINDOW_MS);
145
146 while let Some(&front) = self.order_times.front() {
147 if front < window_start {
148 self.order_times.pop_front();
149 } else {
150 break;
151 }
152 }
153 }
154
155 fn can_place_orders(&mut self, count: usize) -> bool {
156 self.cleanup_expired();
157 self.order_times.len() + count <= MAX_ORDERS_PER_10_SECONDS as usize
159 }
160
161 fn record_orders(&mut self, count: usize) {
162 let now = self.clock.raw() / 1_000_000; for _ in 0..count {
164 self.order_times.push_back(now);
165 }
166 }
167
168 fn get_current_usage(&mut self) -> (usize, usize) {
169 self.cleanup_expired();
170 (self.order_times.len(), MAX_ORDERS_PER_10_SECONDS as usize)
171 }
172
173 fn current_order_count(&mut self) -> usize {
174 self.cleanup_expired();
175 self.order_times.len()
176 }
177}
178
179#[derive(Debug, Clone, Copy, PartialEq, Eq)]
181#[repr(u8)]
182pub enum ConnectionState {
183 Disconnected = 0,
185 Connecting = 1,
187 Connected = 2,
189 Authenticating = 3,
191 Authenticated = 4,
193 Disconnecting = 5,
195}
196
197impl From<u8> for ConnectionState {
198 fn from(value: u8) -> Self {
199 match value {
200 0 => Self::Disconnected,
201 1 => Self::Connecting,
202 2 => Self::Connected,
203 3 => Self::Authenticating,
204 4 => Self::Authenticated,
205 5 => Self::Disconnecting,
206 _ => Self::Disconnected,
207 }
208 }
209}
210
211#[derive(Debug, Clone)]
213enum RequestId {
214 Sequential(u64),
215 Uuid(SmartString),
216}
217
218impl RequestId {
219 fn as_json_value(&self) -> JsonValue {
220 match self {
221 Self::Sequential(id) => JsonValue::from(*id),
222 Self::Uuid(id) => JsonValue::from(id.as_str()),
223 }
224 }
225
226 fn to_lookup_key(&self) -> RequestKey {
228 match self {
229 Self::Sequential(id) => RequestKey::Sequential(*id),
230 Self::Uuid(id) => RequestKey::Uuid(id.clone()),
231 }
232 }
233}
234
235#[derive(Debug, Clone, PartialEq, Eq, Hash)]
238enum RequestKey {
239 Sequential(u64),
240 Uuid(SmartString),
241}
242
243impl From<&RequestId> for RequestKey {
244 fn from(req_id: &RequestId) -> Self {
245 req_id.to_lookup_key()
246 }
247}
248
249#[derive(Debug)]
251struct RequestIdGenerator {
252 counter: AtomicU64,
253 overflow_detected: AtomicBool,
254}
255
256impl RequestIdGenerator {
257 const fn new() -> Self {
258 Self {
259 counter: AtomicU64::new(1),
260 overflow_detected: AtomicBool::new(false),
261 }
262 }
263
264 fn next_id(&self) -> RequestId {
265 if self.overflow_detected.load(Ordering::Acquire) {
266 return RequestId::Uuid(SmartString::from(Uuid::new_v4().to_string()));
268 }
269
270 let id = self.counter.fetch_add(1, Ordering::SeqCst);
271 if id == u64::MAX {
272 self.overflow_detected.store(true, Ordering::Release);
274 RequestId::Uuid(SmartString::from(Uuid::new_v4().to_string()))
275 } else {
276 RequestId::Sequential(id)
277 }
278 }
279}
280
281#[derive(Debug, Clone)]
283pub struct ModifyOrderParams {
284 pub order_id: SmartString,
286 pub symbol: SmartString,
288 pub side: OrderSide,
290 pub order_type: OrderType,
292 pub new_quantity: Decimal,
294 pub new_price: Option<Decimal>,
296}
297
298#[derive(Debug, Clone)]
300struct PendingRequest {
301 pub method: SmartString,
302 pub timestamp: u64,
303 pub report_tx: Option<Sender<ExecutionReport>>,
304 pub request_id: RequestId,
305}
306
307type PendingRequestsMap = FxHashMap<RequestKey, PendingRequest>;
310
311#[derive(Default)]
338struct TaskHandles {
339 response_handler: Option<JoinHandle<()>>,
340 ping_handler: Option<JoinHandle<()>>,
341 reconnection_monitor: Option<JoinHandle<()>>,
342}
343
344#[repr(align(64))] #[derive(Debug, Default)]
348pub struct ConnectionMetrics {
349 pub websocket_pings_sent: AtomicU64,
351 pub websocket_pongs_received: AtomicU64,
353 pub json_pings_sent: AtomicU64,
355 pub json_pongs_received: AtomicU64,
357 pub messages_sent: AtomicU64,
359 pub messages_received: AtomicU64,
361 pub authentication_attempts: AtomicU64,
363 pub reconnection_attempts: AtomicU64,
365 pub successful_reconnections: AtomicU64,
367 pub failed_reconnections: AtomicU64,
369}
370
371#[derive(Debug, Clone)]
373pub struct ConnectionHealth {
374 pub state: ConnectionState,
376 pub is_connected: bool,
378 pub is_authenticated: bool,
380 pub time_since_last_ping: Option<Duration>,
382 pub time_since_last_pong: Option<Duration>,
384 pub time_since_auth: Option<Duration>,
386 pub websocket_pings_sent: u64,
388 pub websocket_pongs_received: u64,
390 pub json_pings_sent: u64,
392 pub json_pongs_received: u64,
394 pub messages_sent: u64,
396 pub messages_received: u64,
398 pub reconnection_attempts: u64,
400 pub successful_reconnections: u64,
402 pub failed_reconnections: u64,
404}
405
406pub struct BinanceWebSocketTrader {
411 auth: Arc<BinanceAuth>,
413
414 ws_sink: Arc<AsyncRwLock<Option<WebSocketSink>>>,
416 ws_stream: Arc<AsyncRwLock<Option<WebSocketStream>>>,
417
418 state: Arc<AtomicU8>,
420
421 clock: Clock,
423
424 request_id_gen: Arc<RequestIdGenerator>,
426
427 pending_requests: Arc<RwLock<PendingRequestsMap>>,
429
430 ws_url: &'static str,
432
433 last_ping_time: Arc<AtomicU64>,
435
436 last_pong_time: Arc<AtomicU64>,
438
439 auth_completed_time: Arc<AtomicU64>,
441
442 task_handles: Arc<AsyncRwLock<TaskHandles>>,
444
445 metrics: Arc<ConnectionMetrics>,
447
448 reconnection_backoff_ms: Arc<AtomicU64>,
450
451 reconnection_control: Arc<ReconnectionMonitorControl>,
453
454 order_rate_limiter: Arc<RwLock<OrderRateLimiter>>,
456
457 position_manager: Arc<dyn PositionManager>,
459}
460#[derive(Clone)]
463struct ReconnectionMonitorControl {
464 should_stop: Arc<AtomicBool>,
465}
466
467impl Clone for BinanceWebSocketTrader {
468 fn clone(&self) -> Self {
469 Self {
470 auth: self.auth.clone(),
471 ws_sink: self.ws_sink.clone(),
472 ws_stream: self.ws_stream.clone(),
473 state: self.state.clone(),
474 clock: self.clock.clone(),
475 request_id_gen: self.request_id_gen.clone(), pending_requests: self.pending_requests.clone(),
477 ws_url: self.ws_url,
478 last_ping_time: self.last_ping_time.clone(),
479 last_pong_time: self.last_pong_time.clone(),
480 auth_completed_time: self.auth_completed_time.clone(),
481 task_handles: self.task_handles.clone(),
482 metrics: self.metrics.clone(),
483 reconnection_backoff_ms: self.reconnection_backoff_ms.clone(),
484 reconnection_control: self.reconnection_control.clone(),
485 order_rate_limiter: self.order_rate_limiter.clone(),
486 position_manager: self.position_manager.clone(),
487 }
488 }
489}
490
491impl BinanceWebSocketTrader {
492 fn get_state(&self) -> ConnectionState {
494 ConnectionState::from(self.state.load(Ordering::Acquire))
495 }
496
497 fn set_state(&self, new_state: ConnectionState) {
499 let old_state = self.get_state();
500 self.state.store(new_state as u8, Ordering::Release);
501
502 if old_state != new_state {
503 debug!("Connection state transition: {old_state:?} -> {new_state:?}");
504 }
505 }
506
507 #[must_use]
509 pub fn is_authenticated(&self) -> bool {
510 self.get_state() == ConnectionState::Authenticated
511 }
512
513 #[must_use]
515 pub const fn metrics(&self) -> &Arc<ConnectionMetrics> {
516 &self.metrics
517 }
518
519 #[must_use]
522 pub fn get_connection_health(&self) -> ConnectionHealth {
523 let now = self.clock.raw();
524 let last_ping = self.last_ping_time.load(Ordering::Acquire);
525 let last_pong = self.last_pong_time.load(Ordering::Acquire);
526 let auth_time = self.auth_completed_time.load(Ordering::Acquire);
527
528 let time_since_last_ping = if last_ping > 0 {
529 Some(Duration::from_nanos(now.saturating_sub(last_ping)))
530 } else {
531 None
532 };
533
534 let time_since_last_pong = if last_pong > 0 {
535 Some(Duration::from_nanos(now.saturating_sub(last_pong)))
536 } else {
537 None
538 };
539
540 let time_since_auth = if auth_time > 0 {
541 Some(Duration::from_nanos(now.saturating_sub(auth_time)))
542 } else {
543 None
544 };
545
546 ConnectionHealth {
547 state: self.get_state(),
548 is_connected: self.is_connected(),
549 is_authenticated: self.is_authenticated(),
550 time_since_last_ping,
551 time_since_last_pong,
552 time_since_auth,
553 websocket_pings_sent: self.metrics.websocket_pings_sent.load(Ordering::Relaxed),
554 websocket_pongs_received: self
555 .metrics
556 .websocket_pongs_received
557 .load(Ordering::Relaxed),
558 json_pings_sent: self.metrics.json_pings_sent.load(Ordering::Relaxed),
559 json_pongs_received: self.metrics.json_pongs_received.load(Ordering::Relaxed),
560 messages_sent: self.metrics.messages_sent.load(Ordering::Relaxed),
561 messages_received: self.metrics.messages_received.load(Ordering::Relaxed),
562 reconnection_attempts: self.metrics.reconnection_attempts.load(Ordering::Relaxed),
563 successful_reconnections: self
564 .metrics
565 .successful_reconnections
566 .load(Ordering::Relaxed),
567 failed_reconnections: self.metrics.failed_reconnections.load(Ordering::Relaxed),
568 }
569 }
570
571 fn calculate_backoff_ms(&self) -> u64 {
573 let current = self.reconnection_backoff_ms.load(Ordering::Acquire);
574 let max_backoff = 60_000; let jitter = (current / 4) as i64;
578 let mut rng = rand::rng();
579 let random_jitter = rng.random_range(-jitter..=jitter);
580
581 let next_backoff = ((current * 2).min(max_backoff) as i64 + random_jitter).max(1000) as u64;
582 self.reconnection_backoff_ms
583 .store(next_backoff, Ordering::Release);
584
585 current
586 }
587
588 fn reset_backoff(&self) {
590 self.reconnection_backoff_ms.store(1000, Ordering::Release);
591 }
592
593 #[must_use]
595 pub fn new(auth: Arc<BinanceAuth>, position_manager: Arc<dyn PositionManager>) -> Self {
596 let clock = Clock::new();
597 Self {
598 auth,
599 ws_sink: Arc::new(AsyncRwLock::new(None)),
600 ws_stream: Arc::new(AsyncRwLock::new(None)),
601 state: Arc::new(AtomicU8::new(ConnectionState::Disconnected as u8)),
602 clock: clock.clone(),
603 request_id_gen: Arc::new(RequestIdGenerator::new()),
604 pending_requests: Arc::new(RwLock::new(FxHashMap::default())),
605 ws_url: BINANCE_WS_API_URL,
606 last_ping_time: Arc::new(AtomicU64::new(0)),
607 last_pong_time: Arc::new(AtomicU64::new(0)),
608 auth_completed_time: Arc::new(AtomicU64::new(0)),
609 task_handles: Arc::new(AsyncRwLock::new(TaskHandles::default())),
610 metrics: Arc::new(ConnectionMetrics::default()),
611 reconnection_backoff_ms: Arc::new(AtomicU64::new(1000)), reconnection_control: Arc::new(ReconnectionMonitorControl {
613 should_stop: Arc::new(AtomicBool::new(false)),
614 }),
615 order_rate_limiter: Arc::new(RwLock::new(OrderRateLimiter::new(clock))),
616 position_manager,
617 }
618 }
619
620 #[must_use]
622 pub fn new_testnet(auth: Arc<BinanceAuth>, position_manager: Arc<dyn PositionManager>) -> Self {
623 let clock = Clock::new();
624 Self {
625 auth,
626 ws_sink: Arc::new(AsyncRwLock::new(None)),
627 ws_stream: Arc::new(AsyncRwLock::new(None)),
628 state: Arc::new(AtomicU8::new(ConnectionState::Disconnected as u8)),
629 clock: clock.clone(),
630 request_id_gen: Arc::new(RequestIdGenerator::new()),
631 pending_requests: Arc::new(RwLock::new(FxHashMap::default())),
632 ws_url: BINANCE_WS_API_TESTNET_URL,
633 last_ping_time: Arc::new(AtomicU64::new(0)),
634 last_pong_time: Arc::new(AtomicU64::new(0)),
635 auth_completed_time: Arc::new(AtomicU64::new(0)),
636 task_handles: Arc::new(AsyncRwLock::new(TaskHandles::default())),
637 metrics: Arc::new(ConnectionMetrics::default()),
638 reconnection_backoff_ms: Arc::new(AtomicU64::new(1000)), reconnection_control: Arc::new(ReconnectionMonitorControl {
640 should_stop: Arc::new(AtomicBool::new(false)),
641 }),
642 order_rate_limiter: Arc::new(RwLock::new(OrderRateLimiter::new(clock))),
643 position_manager,
644 }
645 }
646
647 pub async fn connect(&self, report_tx: Sender<ExecutionReport>) -> Result<()> {
649 match self.get_state() {
651 ConnectionState::Authenticated => {
652 debug!("Already authenticated, skipping connection");
653 return Ok(());
654 }
655 ConnectionState::Connecting
656 | ConnectionState::Connected
657 | ConnectionState::Authenticating => {
658 warn!("Connection already in progress");
659 return Err(anyhow!("Connection already in progress"));
660 }
661 _ => {}
662 }
663
664 self.set_state(ConnectionState::Connecting);
666
667 let ws_url = self.ws_url;
668 debug!("Connecting to Binance WebSocket API: {ws_url}");
669
670 let ws_config =
672 WebSocketConfig::new(rusty_common::types::Exchange::Binance, ws_url.to_string());
673
674 let mut connector = WebSocketConnector::new(
675 ws_config,
676 Arc::new(parking_lot::RwLock::new(
677 rusty_common::websocket::ConnectionStats::default(),
678 )),
679 Arc::new(parking_lot::RwLock::new(
680 rusty_common::websocket::ConnectionState::Disconnected,
681 )),
682 );
683
684 let (ws_sink, ws_stream) = match connector.connect_with_retry(ws_url).await {
685 Ok(result) => result,
686 Err(e) => {
687 self.set_state(ConnectionState::Disconnected);
688 return Err(anyhow!("WebSocket connection failed: {}", e));
689 }
690 };
691
692 *self.ws_sink.write().await = Some(ws_sink);
693 *self.ws_stream.write().await = Some(ws_stream);
694
695 self.set_state(ConnectionState::Connected);
697
698 let response_handle = self.start_response_handler(report_tx.clone()).await?;
700 let ping_handle = self.start_ping_handler().await?;
701 let reconnect_handle = self.start_reconnection_monitor(report_tx.clone()).await?;
702
703 {
705 let mut handles = self.task_handles.write().await;
706 handles.response_handler = Some(response_handle);
707 handles.ping_handler = Some(ping_handle);
708 handles.reconnection_monitor = Some(reconnect_handle);
709 }
710
711 self.set_state(ConnectionState::Authenticating);
713 match self.authenticate_session().await {
714 Ok(()) => {
715 self.set_state(ConnectionState::Authenticated);
716 self.auth_completed_time
717 .store(self.clock.raw(), Ordering::Release);
718 self.reset_backoff();
719 debug!("Successfully authenticated to Binance WebSocket API");
720 Ok(())
721 }
722 Err(e) => {
723 error!("Authentication failed: {e}");
724 self.set_state(ConnectionState::Connected); Err(e)
726 }
727 }
728 }
729
730 async fn authenticate_session(&self) -> Result<()> {
732 debug!("🔐 Generating WebSocket authentication message...");
733
734 let auth_message = self.auth.generate_ws_auth().map_err(|e| {
736 error!("🔐 Failed to generate WebSocket auth: {e}");
737 anyhow!("Failed to generate WebSocket auth: {}", e)
738 })?;
739
740 debug!("🔐 Authentication message generated, sending to server...");
741
742 let send_result = tokio::time::timeout(
744 Duration::from_secs(10), async {
746 if let Some(ref mut ws_sink) = self.ws_sink.write().await.as_mut() {
747 ws_sink
748 .send(Message::Text(auth_message).to_frame_view())
749 .await
750 .map_err(|e| anyhow!("Failed to send auth message: {}", e))
751 } else {
752 Err(anyhow!("WebSocket sink not available"))
753 }
754 },
755 )
756 .await;
757
758 match send_result {
759 Ok(Ok(())) => {
760 debug!("🔐 WebSocket authentication sent successfully");
761 Ok(())
762 }
763 Ok(Err(e)) => {
764 error!("🔐 Failed to send authentication message: {e}");
765 Err(e)
766 }
767 Err(_) => {
768 error!("🔐 Authentication send timed out after 10 seconds");
769 Err(anyhow!("Authentication send timed out"))
770 }
771 }
772 }
773
774 async fn start_response_handler(
776 &self,
777 report_tx: Sender<ExecutionReport>,
778 ) -> Result<JoinHandle<()>> {
779 let ws_stream = self.ws_stream.clone();
780 let pending_requests = self.pending_requests.clone();
781 let clock = self.clock.clone();
782 let last_pong_time = self.last_pong_time.clone();
783 let state = self.state.clone();
784 let metrics = self.metrics.clone();
785 let position_manager = self.position_manager.clone();
786 let self_clone = Self {
787 auth: self.auth.clone(),
788 ws_sink: self.ws_sink.clone(),
789 ws_stream: self.ws_stream.clone(),
790 state: self.state.clone(),
791 clock: self.clock.clone(),
792 request_id_gen: self.request_id_gen.clone(),
793 pending_requests: self.pending_requests.clone(),
794 ws_url: self.ws_url,
795 last_ping_time: self.last_ping_time.clone(),
796 last_pong_time: self.last_pong_time.clone(),
797 auth_completed_time: self.auth_completed_time.clone(),
798 task_handles: self.task_handles.clone(),
799 metrics: self.metrics.clone(),
800 reconnection_backoff_ms: self.reconnection_backoff_ms.clone(),
801 reconnection_control: self.reconnection_control.clone(),
802 order_rate_limiter: self.order_rate_limiter.clone(),
803 position_manager: self.position_manager.clone(),
804 };
805
806 let handle = tokio::spawn(async move {
807 let mut ws_guard = ws_stream.write().await;
808 if let Some(mut ws_stream) = ws_guard.take() {
809 loop {
810 let current_state = ConnectionState::from(state.load(Ordering::Acquire));
812 if current_state == ConnectionState::Disconnected
813 || current_state == ConnectionState::Disconnecting
814 {
815 debug!("Response handler stopping - state: {current_state:?}");
816 break;
817 }
818
819 if let Some(frame) = ws_stream.next().await {
820 metrics.messages_received.fetch_add(1, Ordering::Relaxed);
821
822 if frame.payload.len() > MAX_MESSAGE_SIZE {
824 error!(
825 "Frame too large: {} bytes (max: {})",
826 frame.payload.len(),
827 MAX_MESSAGE_SIZE
828 );
829 self_clone.kill_socket("Frame size exceeded maximum").await;
830 break;
831 }
832
833 match frame.opcode {
835 OpCode::Text => {
836 let text = match std::str::from_utf8(&frame.payload) {
838 Ok(text) => text,
839 Err(e) => {
840 error!("Invalid UTF-8 in text frame: {e}");
841 self_clone.kill_socket("Invalid UTF-8 in text frame").await;
842 break;
843 }
844 };
845
846 let mut text_copy = text.to_string();
847 match unsafe { simd_json::from_str::<JsonValue>(&mut text_copy) } {
848 Ok(json) => {
849 debug!("response is {:?}", json.get("id"));
850 Self::handle_websocket_message(
851 &json,
852 &pending_requests,
853 &report_tx,
854 &clock,
855 &position_manager,
856 )
857 .await;
858 }
859 Err(e) => {
860 error!("Failed to parse WebSocket message: {e}");
861 self_clone.kill_socket("JSON parsing error").await;
862 break;
863 }
864 }
865 }
866 OpCode::Ping => {
867 debug!(
869 "Received ping - automatic pong response will be sent by YAWC"
870 );
871 }
872 OpCode::Pong => {
873 let now = clock.raw();
875 last_pong_time.store(now, Ordering::SeqCst);
876 debug!("Received pong from server at {now}");
877 }
878 OpCode::Close => {
879 let close_info = if frame.payload.len() >= 2 {
881 let code =
882 u16::from_be_bytes([frame.payload[0], frame.payload[1]]);
883 let reason = if frame.payload.len() > 2 {
884 std::str::from_utf8(&frame.payload[2..])
885 .unwrap_or("Invalid UTF-8 in close reason")
886 } else {
887 "No reason provided"
888 };
889 format!("code: {code}, reason: {reason}")
890 } else {
891 "No close code provided".to_string()
892 };
893
894 warn!("WebSocket connection closing: {close_info}");
895 self_clone
896 .kill_socket(&format!(
897 "Connection closed by server: {close_info}"
898 ))
899 .await;
900 break;
901 }
902 OpCode::Binary => {
903 debug!("Received binary frame with {} bytes", frame.payload.len());
905 }
906 _ => {
907 debug!("Received frame with unknown opcode: {:?}", frame.opcode);
909 }
910 }
911 } else {
912 error!("WebSocket stream ended unexpectedly");
914 self_clone.kill_socket("Stream ended unexpectedly").await;
915 break;
916 }
917 }
918 } else {
919 error!("No WebSocket stream available in response handler");
920 self_clone
921 .kill_socket("No WebSocket stream available")
922 .await;
923 }
924
925 debug!("Response handler task exiting");
926 });
927
928 Ok(handle)
929 }
930
931 async fn start_ping_handler(&self) -> Result<JoinHandle<()>> {
933 let ws_sink = self.ws_sink.clone();
934 let clock = self.clock.clone();
935 let last_ping_time = self.last_ping_time.clone();
936 let last_pong_time = self.last_pong_time.clone();
937 let state = self.state.clone();
938 let auth_completed_time = self.auth_completed_time.clone();
939 let metrics = self.metrics.clone();
940 let self_clone = Self {
941 auth: self.auth.clone(),
942 ws_sink: self.ws_sink.clone(),
943 ws_stream: self.ws_stream.clone(),
944 state: self.state.clone(),
945 clock: self.clock.clone(),
946 request_id_gen: self.request_id_gen.clone(),
947 pending_requests: self.pending_requests.clone(),
948 ws_url: self.ws_url,
949 last_ping_time: self.last_ping_time.clone(),
950 last_pong_time: self.last_pong_time.clone(),
951 auth_completed_time: self.auth_completed_time.clone(),
952 task_handles: self.task_handles.clone(),
953 metrics: self.metrics.clone(),
954 reconnection_backoff_ms: self.reconnection_backoff_ms.clone(),
955 reconnection_control: self.reconnection_control.clone(),
956 order_rate_limiter: self.order_rate_limiter.clone(),
957 position_manager: self.position_manager.clone(),
958 };
959
960 let handle = tokio::spawn(async move {
961 let mut ping_interval = interval(Duration::from_secs(PING_INTERVAL_SECONDS));
962 ping_interval.tick().await; loop {
965 ping_interval.tick().await;
966
967 let current_state = ConnectionState::from(state.load(Ordering::Acquire));
969 if current_state == ConnectionState::Disconnected
970 || current_state == ConnectionState::Disconnecting
971 {
972 debug!("Ping handler stopping - state: {current_state:?}");
973 break;
974 }
975
976 let now = clock.raw();
977 let last_pong = last_pong_time.load(Ordering::Acquire);
978 let auth_time = auth_completed_time.load(Ordering::Acquire);
979
980 if current_state == ConnectionState::Authenticated && auth_time > 0 {
982 let waiting_for_first_pong = last_pong == 0
984 && (now - auth_time) > (PONG_TIMEOUT_SECONDS * 1_000_000_000);
985
986 let pong_timeout =
987 last_pong > 0 && (now - last_pong) > (PONG_TIMEOUT_SECONDS * 1_000_000_000);
988
989 if waiting_for_first_pong || pong_timeout {
990 warn!("Pong timeout detected, connection is dead");
991 self_clone.kill_socket("Pong timeout").await;
992 break;
993 }
994 }
995
996 if let Some(ref mut sink) = ws_sink.write().await.as_mut() {
998 let ping_data = b"ping".to_vec();
999 match sink.send(Message::Ping(ping_data).to_frame_view()).await {
1000 Ok(()) => {
1001 last_ping_time.store(now, Ordering::SeqCst);
1002 metrics.websocket_pings_sent.fetch_add(1, Ordering::Relaxed);
1003 debug!("Sent ping at {now}");
1004 }
1005 Err(e) => {
1006 error!("Failed to send ping: {e}");
1007 self_clone
1008 .kill_socket(&format!("Failed to send ping: {e}"))
1009 .await;
1010 break;
1011 }
1012 }
1013 } else {
1014 warn!("WebSocket sink not available for ping");
1015 self_clone.kill_socket("WebSocket sink not available").await;
1016 break;
1017 }
1018 }
1019
1020 debug!("Ping handler task exiting");
1021 });
1022
1023 Ok(handle)
1024 }
1025
1026 async fn start_reconnection_monitor(
1042 &self,
1043 report_tx: Sender<ExecutionReport>,
1044 ) -> Result<JoinHandle<()>> {
1045 let state = self.state.clone();
1046 let metrics = self.metrics.clone();
1047 let control = self.reconnection_control.clone();
1048
1049 control.should_stop.store(false, Ordering::Release);
1051
1052 let self_clone = Self {
1053 auth: self.auth.clone(),
1054 ws_sink: self.ws_sink.clone(),
1055 ws_stream: self.ws_stream.clone(),
1056 state: self.state.clone(),
1057 clock: self.clock.clone(),
1058 request_id_gen: self.request_id_gen.clone(),
1059 pending_requests: self.pending_requests.clone(),
1060 ws_url: self.ws_url,
1061 last_ping_time: self.last_ping_time.clone(),
1062 last_pong_time: self.last_pong_time.clone(),
1063 auth_completed_time: self.auth_completed_time.clone(),
1064 task_handles: self.task_handles.clone(),
1065 metrics: self.metrics.clone(),
1066 reconnection_backoff_ms: self.reconnection_backoff_ms.clone(),
1067 reconnection_control: self.reconnection_control.clone(),
1068 order_rate_limiter: self.order_rate_limiter.clone(),
1069 position_manager: self.position_manager.clone(),
1070 };
1071
1072 let handle = tokio::spawn(async move {
1073 let mut check_interval = interval(Duration::from_secs(3)); check_interval.tick().await; let mut consecutive_disconnections = 0;
1077 const MAX_RECONNECTION_ATTEMPTS: u32 = 10; info!(
1080 "🤖 Auto-reconnection monitor started (check every 3s, max {MAX_RECONNECTION_ATTEMPTS} attempts)"
1081 );
1082
1083 loop {
1084 check_interval.tick().await;
1085
1086 if control.should_stop.load(Ordering::Acquire) {
1088 info!("🛑 Reconnection monitor received stop signal - exiting gracefully");
1089 break;
1090 }
1091
1092 let current_state = ConnectionState::from(state.load(Ordering::Acquire));
1094 let is_healthy = current_state == ConnectionState::Authenticated;
1095
1096 if !is_healthy
1097 && current_state != ConnectionState::Connecting
1098 && current_state != ConnectionState::Authenticating
1099 {
1100 consecutive_disconnections += 1;
1101 metrics
1102 .reconnection_attempts
1103 .fetch_add(1, Ordering::Relaxed);
1104
1105 if consecutive_disconnections > MAX_RECONNECTION_ATTEMPTS {
1106 error!(
1107 "🔴 Maximum reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded. Stopping auto-reconnection."
1108 );
1109 break;
1110 }
1111
1112 warn!(
1113 "🔄 Connection unhealthy detected (state: {current_state:?})! Attempting auto-reconnection #{consecutive_disconnections}/{MAX_RECONNECTION_ATTEMPTS}"
1114 );
1115
1116 let backoff_ms = self_clone.calculate_backoff_ms();
1118 info!("⏳ Waiting {backoff_ms}ms before reconnection attempt");
1119 tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
1120
1121 info!("🔄 Starting internal reconnection attempt...");
1123 let reconnect_start = std::time::Instant::now();
1124
1125 match tokio::time::timeout(
1126 Duration::from_secs(45), self_clone.reconnect_internal(report_tx.clone()),
1128 )
1129 .await
1130 {
1131 Ok(Ok(())) => {
1132 info!(
1133 "✅ Auto-reconnection successful in {:.2}s!",
1134 reconnect_start.elapsed().as_secs_f32()
1135 );
1136 consecutive_disconnections = 0; metrics
1138 .successful_reconnections
1139 .fetch_add(1, Ordering::Relaxed);
1140 self_clone.reset_backoff(); }
1142 Ok(Err(e)) => {
1143 error!(
1144 "❌ Auto-reconnection failed after {:.2}s: {}",
1145 reconnect_start.elapsed().as_secs_f32(),
1146 e
1147 );
1148 }
1150 Err(_) => {
1151 error!(
1152 "❌ Auto-reconnection timed out after {:.2}s (45s limit) - will retry",
1153 reconnect_start.elapsed().as_secs_f32()
1154 );
1155 self_clone
1157 .kill_socket("Reconnection timeout - forcing cleanup")
1158 .await;
1159 }
1160 }
1161 } else {
1162 if consecutive_disconnections > 0 {
1164 consecutive_disconnections = 0;
1165 debug!("Connection restored and stable");
1166 }
1167 }
1168 }
1169
1170 warn!("Auto-reconnection monitor exiting");
1171 });
1172
1173 Ok(handle)
1174 }
1175
1176 async fn reconnect_internal(&self, report_tx: Sender<ExecutionReport>) -> Result<()> {
1178 info!("🔄 RECONNECT-STEP 1/3: Internal reconnection starting...");
1179
1180 info!("🔄 RECONNECT-STEP 2/3: Killing existing connection...");
1182
1183 match tokio::time::timeout(
1185 Duration::from_secs(5), self.kill_socket("Internal reconnection requested"),
1187 )
1188 .await
1189 {
1190 Ok(()) => {
1191 info!("🔄 RECONNECT-STEP 2/3: Socket killed successfully");
1192 }
1193 Err(_) => {
1194 error!("🔄 RECONNECT-STEP 2/3: Socket kill timed out after 5s - continuing anyway");
1195 }
1197 }
1198
1199 info!("🔄 RECONNECT-STEP 2/3: Waiting 100ms for cleanup...");
1201 tokio::time::sleep(Duration::from_millis(100)).await;
1202 info!("🔄 RECONNECT-STEP 2/3: Cleanup delay completed");
1203
1204 info!("🔄 RECONNECT-STEP 3/3: Attempting new connection...");
1206 match self.connect_without_monitor(report_tx).await {
1207 Ok(()) => {
1208 info!("🎉 Internal reconnection completed successfully");
1209 Ok(())
1210 }
1211 Err(e) => {
1212 error!("💥 Internal reconnection failed: {e}");
1213 Err(e)
1214 }
1215 }
1216 }
1217
1218 async fn connect_without_monitor(&self, report_tx: Sender<ExecutionReport>) -> Result<()> {
1220 let ws_url = self.ws_url;
1221
1222 info!("🔌 STEP 1/6: Connecting to Binance WebSocket API (internal): {ws_url}");
1223
1224 let ws_config =
1226 WebSocketConfig::new(rusty_common::types::Exchange::Binance, ws_url.to_string());
1227
1228 let mut connector = WebSocketConnector::new(
1229 ws_config,
1230 Arc::new(parking_lot::RwLock::new(
1231 rusty_common::websocket::ConnectionStats::default(),
1232 )),
1233 Arc::new(parking_lot::RwLock::new(
1234 rusty_common::websocket::ConnectionState::Disconnected,
1235 )),
1236 );
1237
1238 info!("📡 STEP 2/6: Attempting WebSocket connection with timeout...");
1239 let connection_start = std::time::Instant::now();
1240
1241 let (ws_sink, ws_stream) = match tokio::time::timeout(
1242 Duration::from_secs(15), connector.connect_with_retry(ws_url),
1244 )
1245 .await
1246 {
1247 Ok(Ok((sink, stream))) => {
1248 info!(
1249 "🔗 STEP 2/6: WebSocket connection established in {:.2}s",
1250 connection_start.elapsed().as_secs_f32()
1251 );
1252 (sink, stream)
1253 }
1254 Ok(Err(e)) => {
1255 error!(
1256 "📡 STEP 2/6: WebSocket connection failed after {:.2}s: {}",
1257 connection_start.elapsed().as_secs_f32(),
1258 e
1259 );
1260 return Err(anyhow!("WebSocket connection failed: {}", e));
1261 }
1262 Err(_) => {
1263 error!("📡 STEP 2/6: WebSocket connection timed out after 15 seconds");
1264 return Err(anyhow!("WebSocket connection timed out"));
1265 }
1266 };
1267
1268 info!("🔗 STEP 3/6: Setting up WebSocket streams...");
1269 *self.ws_sink.write().await = Some(ws_sink);
1270 *self.ws_stream.write().await = Some(ws_stream);
1271
1272 info!("🚀 STEP 4/6: Starting response handler...");
1273 self.start_response_handler(report_tx).await.map_err(|e| {
1275 error!("🚀 STEP 4/6: Failed to start response handler: {e}");
1276 anyhow!("Failed to start response handler: {}", e)
1277 })?;
1278
1279 debug!("🏓 STEP 5/6: Starting ping handler...");
1280 self.start_ping_handler().await.map_err(|e| {
1281 error!("🏓 STEP 5/6: Failed to start ping handler: {e}");
1282 anyhow!("Failed to start ping handler: {}", e)
1283 })?;
1284
1285 info!("🔐 STEP 6/6: Authenticating session...");
1286 self.authenticate_session().await.map_err(|e| {
1288 error!("🔐 STEP 6/6: Authentication failed: {e}");
1289 anyhow!("Authentication failed: {}", e)
1290 })?;
1291
1292 info!("✅ Successfully reconnected to Binance WebSocket API");
1293 Ok(())
1294 }
1295
1296 async fn handle_websocket_message(
1298 json: &JsonValue,
1299 pending_requests: &Arc<RwLock<PendingRequestsMap>>,
1300 report_tx: &Sender<ExecutionReport>,
1301 clock: &Clock,
1302 position_manager: &Arc<dyn PositionManager>,
1303 ) {
1304 let pending_request = if let Some(request_id) = json.get("id") {
1306 let mut pending = pending_requests.write();
1307
1308 if let Some(id_num) = request_id.as_u64() {
1310 let key = RequestKey::Sequential(id_num);
1311 pending.remove(&key)
1312 } else if let Some(id_str) = request_id.as_str() {
1313 let key = RequestKey::Uuid(SmartString::from(id_str));
1314 pending.remove(&key)
1315 } else {
1316 None
1317 }
1318 } else {
1319 None
1320 };
1321
1322 if let Some(pending_req) = pending_request {
1323 Self::handle_response(json, &pending_req, report_tx, clock).await;
1324 }
1325 else if let Some(event_type) = json.get("e").and_then(|v| v.as_str()) {
1327 if event_type == "executionReport" {
1328 Self::handle_execution_report(json, report_tx, clock).await;
1329 } else if event_type == "ACCOUNT_UPDATE" {
1330 Self::handle_account_update(json, position_manager).await;
1331 }
1332 }
1333 }
1334
1335 async fn handle_response(
1337 json: &JsonValue,
1338 pending_req: &PendingRequest,
1339 report_tx: &Sender<ExecutionReport>,
1340 clock: &Clock,
1341 ) {
1342 if let Some(error) = json.get("error") {
1344 let error_code = error
1345 .get("code")
1346 .and_then(simd_json::prelude::ValueAsScalar::as_i64)
1347 .unwrap_or(0);
1348 let error_msg = error
1349 .get("msg")
1350 .and_then(|v| v.as_str())
1351 .unwrap_or("Unknown error");
1352
1353 error!("WebSocket API error {error_code}: {error_msg}");
1354
1355 if let Some(ref tx) = pending_req.report_tx {
1357 let report = ExecutionReport {
1358 id: id_generation::generate_ws_timestamp_id("error", clock.raw()),
1359 order_id: "unknown".into(),
1360 exchange_timestamp: 0,
1361 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1362 instrument_id: InstrumentId {
1363 symbol: "UNKNOWN".into(),
1364 venue: Venue::Binance,
1365 },
1366 status: OrderStatus::Rejected,
1367 filled_quantity: Decimal::ZERO,
1368 remaining_quantity: Decimal::ZERO,
1369 execution_price: None,
1370 reject_reason: Some(format!("{error_msg} ({error_code})").into()),
1371 exchange_execution_id: None,
1372 is_final: true,
1373 };
1374
1375 if let Err(e) = tx.send_async(report).await {
1376 error!("Failed to send error report: {e}");
1377 }
1378 }
1379 return;
1380 }
1381
1382 match pending_req.method.as_str() {
1384 "order.place" => {
1385 Self::handle_order_place_response(json, pending_req, report_tx, clock).await;
1386 }
1387 "order.cancel" => {
1388 Self::handle_order_cancel_response(json, pending_req, report_tx, clock).await;
1389 }
1390 "order.cancelReplace" => {
1391 Self::handle_order_modify_response(json, pending_req, report_tx, clock).await;
1392 }
1393 "ping" => {
1394 Self::handle_ping_response(json, pending_req).await;
1395 }
1396 "openOrders.cancelAll" => {
1397 Self::handle_cancel_all_orders_response(json, pending_req, report_tx, clock).await;
1398 }
1399 _ => {
1400 info!("Received response for method: {}", pending_req.method);
1401 }
1402 }
1403 }
1404
1405 async fn handle_order_place_response(
1407 json: &JsonValue,
1408 pending_req: &PendingRequest,
1409 report_tx: &Sender<ExecutionReport>,
1410 clock: &Clock,
1411 ) {
1412 if let Some(ref tx) = pending_req.report_tx
1413 && let Some(result) = json.get("result")
1414 {
1415 let symbol: SmartString = result
1416 .get("symbol")
1417 .and_then(|v| v.as_str())
1418 .unwrap_or("")
1419 .into();
1420
1421 let order_id: SmartString = result
1422 .get("orderId")
1423 .and_then(simd_json::prelude::ValueAsScalar::as_u64)
1424 .unwrap_or(0)
1425 .to_string()
1426 .into();
1427
1428 let client_order_id: SmartString = result
1429 .get("clientOrderId")
1430 .and_then(|v| v.as_str())
1431 .unwrap_or("")
1432 .into();
1433
1434 let status = result.get("status").and_then(|v| v.as_str()).unwrap_or("");
1435
1436 let original_qty: SmartString = result
1437 .get("origQty")
1438 .and_then(|v| v.as_str())
1439 .unwrap_or("0")
1440 .into();
1441
1442 let executed_qty: SmartString = result
1443 .get("executedQty")
1444 .and_then(|v| v.as_str())
1445 .unwrap_or("0")
1446 .into();
1447
1448 let price: SmartString = result
1449 .get("price")
1450 .and_then(|v| v.as_str())
1451 .unwrap_or("0")
1452 .into();
1453
1454 let transaction_time = result
1455 .get("transactTime")
1456 .and_then(simd_json::prelude::ValueAsScalar::as_u64)
1457 .unwrap_or(0);
1458
1459 let report = ExecutionReport {
1460 id: id_generation::generate_ws_report_id("place", &client_order_id),
1461 order_id: client_order_id,
1462 exchange_timestamp: transaction_time * 1_000_000, system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1464 instrument_id: InstrumentId {
1465 symbol,
1466 venue: Venue::Binance,
1467 },
1468 status: Self::map_order_status(status),
1469 filled_quantity: Decimal::from_str_exact(&executed_qty).unwrap_or(Decimal::ZERO),
1470 remaining_quantity: {
1471 let original = Decimal::from_str_exact(&original_qty).unwrap_or(Decimal::ZERO);
1472 let executed = Decimal::from_str_exact(&executed_qty).unwrap_or(Decimal::ZERO);
1473 original - executed
1474 },
1475 execution_price: Decimal::from_str_exact(&price).ok(),
1476 reject_reason: None,
1477 exchange_execution_id: Some(order_id),
1478 is_final: false,
1479 };
1480
1481 match tokio::time::timeout(
1483 Duration::from_millis(100), tx.send_async(report),
1485 )
1486 .await
1487 {
1488 Ok(Ok(())) => debug!("Order place report sent successfully"),
1489 Ok(Err(e)) => {
1490 error!("Failed to send order place report: {e}");
1491 }
1493 Err(_) => {
1494 warn!("Channel backpressure detected - order place report send timed out");
1495 }
1497 }
1498 }
1499 }
1500
1501 async fn handle_order_cancel_response(
1503 json: &JsonValue,
1504 pending_req: &PendingRequest,
1505 report_tx: &Sender<ExecutionReport>,
1506 clock: &Clock,
1507 ) {
1508 if let Some(ref tx) = pending_req.report_tx
1510 && let Some(result) = json.get("result")
1511 {
1512 let symbol: SmartString = result
1513 .get("symbol")
1514 .and_then(|v| v.as_str())
1515 .unwrap_or("")
1516 .into();
1517
1518 let client_order_id: SmartString = result
1519 .get("clientOrderId")
1520 .and_then(|v| v.as_str())
1521 .unwrap_or("")
1522 .into();
1523
1524 let report = ExecutionReport {
1525 id: id_generation::generate_ws_report_id("cancel", &client_order_id),
1526 order_id: client_order_id,
1527 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1528 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1529 instrument_id: InstrumentId {
1530 symbol,
1531 venue: Venue::Binance,
1532 },
1533 status: OrderStatus::Cancelled,
1534 filled_quantity: Decimal::ZERO,
1535 remaining_quantity: Decimal::ZERO,
1536 execution_price: None,
1537 reject_reason: None,
1538 exchange_execution_id: None,
1539 is_final: true,
1540 };
1541
1542 match tokio::time::timeout(
1544 Duration::from_millis(100), tx.send_async(report),
1546 )
1547 .await
1548 {
1549 Ok(Ok(())) => debug!("Cancel report sent successfully"),
1550 Ok(Err(e)) => {
1551 error!("Failed to send cancel report: {e}");
1552 }
1554 Err(_) => {
1555 warn!("Channel backpressure detected - cancel report send timed out");
1556 }
1558 }
1559 }
1560 }
1561
1562 async fn handle_order_modify_response(
1564 json: &JsonValue,
1565 pending_req: &PendingRequest,
1566 report_tx: &Sender<ExecutionReport>,
1567 clock: &Clock,
1568 ) {
1569 Self::handle_order_place_response(json, pending_req, report_tx, clock).await;
1571 }
1572
1573 async fn handle_cancel_all_orders_response(
1575 json: &JsonValue,
1576 pending_req: &PendingRequest,
1577 report_tx: &Sender<ExecutionReport>,
1578 clock: &Clock,
1579 ) {
1580 if let Some(ref tx) = pending_req.report_tx {
1581 if let Some(result) = json.get("result") {
1582 if let Some(orders_array) = result.as_array() {
1584 for order_data in orders_array {
1585 let symbol: SmartString = order_data
1586 .get("symbol")
1587 .and_then(|v| v.as_str())
1588 .unwrap_or("")
1589 .into();
1590
1591 let client_order_id: SmartString = order_data
1592 .get("clientOrderId")
1593 .and_then(|v| v.as_str())
1594 .unwrap_or("")
1595 .into();
1596
1597 let report = ExecutionReport {
1598 id: id_generation::generate_ws_report_id(
1599 "cancel_all",
1600 &client_order_id,
1601 ),
1602 order_id: client_order_id,
1603 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1604 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1605 instrument_id: InstrumentId {
1606 symbol,
1607 venue: Venue::Binance,
1608 },
1609 status: OrderStatus::Cancelled,
1610 filled_quantity: Decimal::ZERO,
1611 remaining_quantity: Decimal::ZERO,
1612 execution_price: None,
1613 reject_reason: None,
1614 exchange_execution_id: None,
1615 is_final: true,
1616 };
1617
1618 match tokio::time::timeout(
1620 Duration::from_millis(100), tx.send_async(report),
1622 )
1623 .await
1624 {
1625 Ok(Ok(())) => debug!(
1626 "Cancel all order report sent successfully for order: {}",
1627 order_data
1628 .get("clientOrderId")
1629 .and_then(|v| v.as_str())
1630 .unwrap_or("unknown")
1631 ),
1632 Ok(Err(e)) => {
1633 error!("Failed to send cancel all order report: {e}");
1634 }
1636 Err(_) => {
1637 warn!(
1638 "Channel backpressure detected - cancel all order report send timed out"
1639 );
1640 }
1642 }
1643 }
1644 } else {
1645 debug!("Cancel all orders result is not an array: {result:?}");
1647 }
1648 } else {
1649 debug!("No result field in cancel all orders response");
1650 }
1651 }
1652 }
1653
1654 async fn handle_ping_response(json: &JsonValue, pending_req: &PendingRequest) {
1656 if let Some(result) = json.get("result") {
1657 debug!("✅ JSON ping successful - Server responded: {result:?}");
1658 debug!(" Request timestamp: {}", pending_req.timestamp);
1659 debug!(" Method: {}", pending_req.method);
1660 } else {
1661 debug!("✅ JSON ping successful - Empty result");
1662 }
1663 }
1664
1665 async fn handle_execution_report(
1667 json: &JsonValue,
1668 report_tx: &Sender<ExecutionReport>,
1669 clock: &Clock,
1670 ) {
1671 let symbol: SmartString = json.get("s").and_then(|v| v.as_str()).unwrap_or("").into();
1673
1674 let order_id: SmartString = json
1675 .get("i")
1676 .and_then(simd_json::prelude::ValueAsScalar::as_u64)
1677 .unwrap_or(0)
1678 .to_string()
1679 .into();
1680
1681 let client_order_id: SmartString =
1682 json.get("c").and_then(|v| v.as_str()).unwrap_or("").into();
1683
1684 let status = json.get("X").and_then(|v| v.as_str()).unwrap_or("");
1685
1686 let executed_qty: SmartString =
1687 json.get("z").and_then(|v| v.as_str()).unwrap_or("0").into();
1688
1689 let original_qty: SmartString =
1690 json.get("q").and_then(|v| v.as_str()).unwrap_or("0").into();
1691
1692 let price: SmartString = json.get("p").and_then(|v| v.as_str()).unwrap_or("0").into();
1693
1694 let transaction_time = json
1695 .get("T")
1696 .and_then(simd_json::prelude::ValueAsScalar::as_u64)
1697 .unwrap_or(0);
1698
1699 let report = ExecutionReport {
1700 id: id_generation::generate_ws_report_id("exec", &client_order_id),
1701 order_id: client_order_id,
1702 exchange_timestamp: transaction_time * 1_000_000, system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1704 instrument_id: InstrumentId {
1705 symbol,
1706 venue: Venue::Binance,
1707 },
1708 status: Self::map_order_status(status),
1709 filled_quantity: Decimal::from_str_exact(&executed_qty).unwrap_or(Decimal::ZERO),
1710 remaining_quantity: {
1711 let original = Decimal::from_str_exact(&original_qty).unwrap_or(Decimal::ZERO);
1712 let executed = Decimal::from_str_exact(&executed_qty).unwrap_or(Decimal::ZERO);
1713 original - executed
1714 },
1715 execution_price: Decimal::from_str_exact(&price).ok(),
1716 reject_reason: None,
1717 exchange_execution_id: Some(order_id),
1718 is_final: matches!(
1719 Self::map_order_status(status),
1720 OrderStatus::Filled
1721 | OrderStatus::Cancelled
1722 | OrderStatus::Rejected
1723 | OrderStatus::Expired
1724 ),
1725 };
1726
1727 match tokio::time::timeout(
1729 Duration::from_millis(100), report_tx.send_async(report),
1731 )
1732 .await
1733 {
1734 Ok(Ok(())) => debug!("Execution report sent successfully"),
1735 Ok(Err(e)) => {
1736 error!("Failed to send execution report: {e}");
1737 }
1739 Err(_) => {
1740 warn!("Channel backpressure detected - execution report send timed out");
1741 }
1743 }
1744 }
1745
1746 pub async fn handle_account_update(
1748 json: &JsonValue,
1749 position_manager: &Arc<dyn PositionManager>,
1750 ) {
1751 let event_time = json
1753 .get("E")
1754 .and_then(simd_json::prelude::ValueAsScalar::as_u64)
1755 .unwrap_or(0);
1756 let transaction_time = json
1757 .get("T")
1758 .and_then(simd_json::prelude::ValueAsScalar::as_u64)
1759 .unwrap_or(0);
1760
1761 if let Some(positions) = json
1763 .get("a")
1764 .and_then(|a| a.get("P"))
1765 .and_then(|p| p.as_array())
1766 {
1767 for position_json in positions {
1768 if let Some(position_update) =
1769 Self::parse_position_update(position_json, event_time, transaction_time)
1770 {
1771 info!(
1773 "Position update: {} {} {} @ {}",
1774 position_update.symbol,
1775 position_update.side,
1776 position_update.amount,
1777 position_update.entry_price
1778 );
1779
1780 if let Err(e) = position_manager
1782 .update_position(position_update.clone())
1783 .await
1784 {
1785 error!("Failed to update position in position manager: {e}");
1786 }
1787 }
1788 }
1789 }
1790
1791 if let Some(balances) = json
1793 .get("a")
1794 .and_then(|a| a.get("B"))
1795 .and_then(|b| b.as_array())
1796 {
1797 for balance_json in balances {
1798 if let Some(asset) = balance_json.get("a").and_then(|v| v.as_str())
1799 && let Some(wallet_balance) = balance_json.get("wb").and_then(|v| v.as_str())
1800 && let Some(cross_wallet_balance) =
1801 balance_json.get("cw").and_then(|v| v.as_str())
1802 {
1803 debug!(
1804 "Balance update: {asset} wallet={wallet_balance} cross_wallet={cross_wallet_balance}"
1805 );
1806 }
1807 }
1808 }
1809 }
1810
1811 fn generate_stable_position_id(
1814 symbol: &str,
1815 side: PositionSide,
1816 venue: Venue,
1817 ) -> rusty_model::types::PositionId {
1818 use std::collections::hash_map::DefaultHasher;
1822 use std::hash::{Hash, Hasher};
1823
1824 let mut hasher = DefaultHasher::new();
1826
1827 "v1-position".hash(&mut hasher);
1829 symbol.hash(&mut hasher);
1830 (side as u8).hash(&mut hasher);
1831 (venue as u8).hash(&mut hasher);
1832
1833 let hash = hasher.finish();
1834
1835 let mut uuid_bytes = [0u8; 16];
1838
1839 uuid_bytes[0..8].copy_from_slice(&hash.to_le_bytes());
1841
1842 let mixed = hash.rotate_left(32) ^ hash;
1844 uuid_bytes[8..16].copy_from_slice(&mixed.to_le_bytes());
1845
1846 uuid_bytes[6] = (uuid_bytes[6] & 0x0f) | 0x40; uuid_bytes[8] = (uuid_bytes[8] & 0x3f) | 0x80; let stable_uuid = Uuid::from_bytes(uuid_bytes);
1851 rusty_model::types::PositionId::from_uuid(stable_uuid)
1852 }
1853
1854 #[must_use]
1856 pub fn parse_position_update(
1857 position_json: &JsonValue,
1858 event_time: u64,
1859 _transaction_time: u64,
1860 ) -> Option<PositionUpdate> {
1861 let symbol = position_json.get("s").and_then(|v| v.as_str())?;
1863 let position_amount_str = position_json.get("pa").and_then(|v| v.as_str())?;
1864 let entry_price_str = position_json.get("ep").and_then(|v| v.as_str())?;
1865 let breakeven_price_str = position_json.get("bep").and_then(|v| v.as_str())?;
1866 let unrealized_pnl_str = position_json.get("up").and_then(|v| v.as_str())?;
1867 let realized_pnl_str = position_json.get("cr").and_then(|v| v.as_str())?;
1868 let margin_type_str = position_json.get("mt").and_then(|v| v.as_str())?;
1869 let isolated_wallet_str = position_json.get("iw").and_then(|v| v.as_str())?;
1870 let position_side_str = position_json.get("ps").and_then(|v| v.as_str())?;
1871
1872 let amount = Decimal::from_str_exact(position_amount_str).ok()?;
1874 let entry_price = Decimal::from_str_exact(entry_price_str).ok()?;
1875 let breakeven_price = Decimal::from_str_exact(breakeven_price_str).ok()?;
1876 let unrealized_pnl = Decimal::from_str_exact(unrealized_pnl_str).ok()?;
1877 let realized_pnl = Decimal::from_str_exact(realized_pnl_str).ok()?;
1878 let isolated_wallet = Decimal::from_str_exact(isolated_wallet_str).ok()?;
1879
1880 let position_side = position_side_str.parse::<PositionSide>().ok()?;
1882 let margin_type = margin_type_str.parse::<MarginType>().ok()?;
1883
1884 if amount.is_zero() {
1886 debug!("Skipping closed position for {symbol}");
1887 return None;
1888 }
1889
1890 let position_id = Self::generate_stable_position_id(symbol, position_side, Venue::Binance);
1892
1893 Some(PositionUpdate {
1894 position_id,
1895 venue: Venue::Binance,
1896 symbol: SmartString::from(symbol),
1897 side: position_side,
1898 amount,
1899 entry_price,
1900 breakeven_price,
1901 unrealized_pnl,
1902 realized_pnl,
1903 margin_type,
1904 isolated_wallet,
1905 timestamp_ns: event_time * 1_000_000, })
1907 }
1908
1909 fn map_order_status(status: &str) -> OrderStatus {
1911 match status {
1912 "NEW" => OrderStatus::New,
1913 "PARTIALLY_FILLED" => OrderStatus::PartiallyFilled,
1914 "FILLED" => OrderStatus::Filled,
1915 "CANCELED" | "CANCELLED" => OrderStatus::Cancelled,
1916 "REJECTED" => OrderStatus::Rejected,
1917 "EXPIRED" => OrderStatus::Expired,
1918 "PENDING_CANCEL" => OrderStatus::Pending,
1919 "PENDING_NEW" => OrderStatus::Pending,
1920 _ => OrderStatus::Unknown,
1921 }
1922 }
1923
1924 pub async fn place_batch_orders(
1931 &self,
1932 orders: Vec<Order>,
1933 report_tx: Sender<ExecutionReport>,
1934 ) -> Result<()> {
1935 if orders.is_empty() {
1936 return Ok(());
1937 }
1938
1939 if orders.len() > MAX_BATCH_SIZE {
1940 bail!("Batch size exceeds maximum of {} orders", MAX_BATCH_SIZE);
1941 }
1942
1943 self.check_and_record_rate_limit(orders.len())?;
1945
1946 let batch_result = self
1948 .place_batch_orders_concurrent(orders, report_tx)
1949 .await?;
1950
1951 if batch_result.has_failures() {
1953 let failures_by_type = batch_result.get_failures_by_error_type();
1955 for (error_type, failed_orders) in failures_by_type {
1956 warn!(
1957 "{} orders failed due to {}: {:?}",
1958 failed_orders.len(),
1959 error_type,
1960 failed_orders.iter().map(|o| &o.id).collect::<Vec<_>>()
1961 );
1962 }
1963
1964 if batch_result.status == BatchStatus::AllFailed {
1966 bail!(
1967 "All {} orders in batch failed (success rate: {:.1}%)",
1968 batch_result.summary.total_orders,
1969 batch_result.success_rate()
1970 );
1971 }
1972 }
1973
1974 info!(
1975 "Batch order placement completed: {} succeeded, {} failed (success rate: {:.1}%)",
1976 batch_result.summary.successful_orders,
1977 batch_result.summary.failed_orders,
1978 batch_result.success_rate()
1979 );
1980
1981 Ok(())
1982 }
1983
1984 async fn place_batch_orders_concurrent(
1986 &self,
1987 orders: Vec<Order>,
1988 report_tx: Sender<ExecutionReport>,
1989 ) -> Result<BatchResult<()>> {
1990 use crate::error::EMSError;
1991 use futures::future::join_all;
1992
1993 let start_time = self.clock.raw();
1994 let total_orders = orders.len();
1995
1996 let order_tasks: Vec<_> = orders
1998 .into_iter()
1999 .map(|order| {
2000 let report_tx = report_tx.clone();
2001 let client_order_id = order.id;
2002 let order_clone = order.clone();
2003 async move {
2004 let result = self.place_order_internal(order_clone, report_tx).await;
2005 (client_order_id, order, result)
2006 }
2007 })
2008 .collect();
2009
2010 let results = join_all(order_tasks).await;
2012
2013 let mut order_results: OrderResultMap<()> = FxHashMap::default();
2015 let mut success_count = 0;
2016 let mut failed_count = 0;
2017
2018 for (client_order_id, order, result) in results {
2019 match result {
2020 Ok(()) => {
2021 order_results
2022 .insert(client_order_id.to_string().into(), OrderResult::success(()));
2023 success_count += 1;
2024 debug!("Order {client_order_id} placed successfully");
2025 }
2026 Err(e) => {
2027 let ems_error = if let Some(ems_err) = e.downcast_ref::<EMSError>() {
2029 ems_err.clone()
2030 } else {
2031 EMSError::order_submission(e.to_string())
2032 };
2033
2034 order_results.insert(
2035 client_order_id.to_string().into(),
2036 OrderResult::failed(ems_error, order),
2037 );
2038 failed_count += 1;
2039 warn!("Order {client_order_id} failed: {e}");
2040 }
2041 }
2042 }
2043
2044 let processing_time_ns = self.clock.raw() - start_time;
2045
2046 let batch_result = if failed_count == 0 {
2048 BatchResult::success(order_results, processing_time_ns)
2050 } else if success_count > 0 {
2051 BatchResult::partial_success(order_results, processing_time_ns)
2053 } else {
2054 BatchResult::all_failed(order_results, processing_time_ns)
2056 };
2057
2058 match batch_result.status {
2060 BatchStatus::AllSucceeded => {
2061 info!("All {total_orders} orders placed successfully");
2062 }
2063 BatchStatus::PartialSuccess => {
2064 warn!("Batch partially succeeded: {success_count}/{total_orders} orders placed");
2065 }
2066 BatchStatus::AllFailed => {
2067 error!("All {total_orders} orders in batch failed");
2068 }
2069 BatchStatus::TransportFailure => {
2070 error!("Unexpected transport failure status");
2072 }
2073 }
2074
2075 Ok(batch_result)
2076 }
2077
2078 fn check_and_record_rate_limit(&self, num_orders: usize) -> Result<()> {
2080 let mut rate_limiter = self.order_rate_limiter.write();
2081 if !rate_limiter.can_place_orders(num_orders) {
2082 let current_count = rate_limiter.current_order_count();
2083 if num_orders == 1 {
2084 bail!(
2085 "Rate limit would be exceeded: {} orders in window, limit is {}",
2086 current_count,
2087 MAX_ORDERS_PER_10_SECONDS
2088 );
2089 }
2090 bail!(
2091 "Rate limit would be exceeded: {} orders in window, {} requested, limit is {}",
2092 current_count,
2093 num_orders,
2094 MAX_ORDERS_PER_10_SECONDS
2095 );
2096 }
2097 rate_limiter.record_orders(num_orders);
2098 Ok(())
2099 }
2100
2101 pub async fn place_order(
2103 &self,
2104 order: Order,
2105 report_tx: Sender<ExecutionReport>,
2106 ) -> Result<()> {
2107 self.check_and_record_rate_limit(1)?;
2109
2110 self.place_order_internal(order, report_tx).await
2111 }
2112
2113 async fn place_order_internal(
2115 &self,
2116 order: Order,
2117 report_tx: Sender<ExecutionReport>,
2118 ) -> Result<()> {
2119 let request_id = self.request_id_gen.next_id();
2120 let timestamp = self.clock.raw();
2121
2122 let mut params = Object::with_capacity(8);
2124 params.insert("symbol".into(), order.symbol.as_str().into());
2125 params.insert("side".into(), Self::map_order_side(order.side).into());
2126 params.insert("type".into(), Self::map_order_type(order.order_type).into());
2127 params.insert("quantity".into(), order.quantity.to_string().into());
2128 params.insert("newClientOrderId".into(), order.id.to_string().into());
2129 params.insert("timestamp".into(), (timestamp / 1_000_000).into());
2130
2131 if let Some(price) = order.price {
2133 params.insert("price".into(), price.to_string().into());
2134 }
2135
2136 let tif = Self::get_time_in_force(order.order_type);
2138 params.insert("timeInForce".into(), Self::map_time_in_force(tif).into());
2139
2140 let request = simd_json::json!({
2142 "id": request_id.as_json_value(),
2143 "method": "order.place",
2144 "params": JsonValue::from(params)
2145 });
2146
2147 {
2149 let mut pending = self.pending_requests.write();
2150 let key = request_id.to_lookup_key();
2151 pending.insert(
2152 key,
2153 PendingRequest {
2154 method: "order.place".into(),
2155 timestamp,
2156 report_tx: Some(report_tx),
2157 request_id: request_id.clone(),
2158 },
2159 );
2160 }
2161
2162 self.send_request(request).await?;
2164
2165 Ok(())
2166 }
2167
2168 #[must_use]
2170 pub fn get_rate_limit_status(&self) -> (usize, usize) {
2171 let mut rate_limiter = self.order_rate_limiter.write();
2172 let current = rate_limiter.current_order_count();
2173 (current, MAX_ORDERS_PER_10_SECONDS as usize)
2174 }
2175
2176 pub async fn cancel_order(
2178 &self,
2179 order_id: SmartString,
2180 report_tx: Sender<ExecutionReport>,
2181 ) -> Result<()> {
2182 let request_id = self.request_id_gen.next_id();
2183 let timestamp = self.clock.raw();
2184
2185 let request = simd_json::json!({
2186 "id": request_id.as_json_value(),
2187 "method": "order.cancel",
2188 "params": {
2189 "origClientOrderId": order_id,
2190 "timestamp": timestamp / 1_000_000 }
2192 });
2193
2194 {
2196 let mut pending = self.pending_requests.write();
2197 let key = request_id.to_lookup_key();
2198 pending.insert(
2199 key,
2200 PendingRequest {
2201 method: "order.cancel".into(),
2202 timestamp,
2203 report_tx: Some(report_tx),
2204 request_id: request_id.clone(),
2205 },
2206 );
2207 }
2208
2209 self.send_request(request).await?;
2210
2211 Ok(())
2212 }
2213
2214 pub async fn modify_order(
2216 &self,
2217 params: ModifyOrderParams,
2218 report_tx: Sender<ExecutionReport>,
2219 ) -> Result<()> {
2220 let request_id = self.request_id_gen.next_id();
2221 let timestamp = self.clock.raw();
2222
2223 let mut request = simd_json::json!({
2224 "id": request_id.as_json_value(),
2225 "method": "order.cancelReplace",
2226 "params": {
2227 "cancelOrigClientOrderId": params.order_id,
2228 "symbol": params.symbol,
2229 "side": Self::map_order_side(params.side),
2230 "type": Self::map_order_type(params.order_type),
2231 "quantity": params.new_quantity.to_string(),
2232 "timeInForce": Self::map_time_in_force(Self::get_time_in_force(params.order_type)),
2233 "timestamp": timestamp / 1_000_000 }
2235 });
2236
2237 if let Some(price) = params.new_price {
2238 request["params"]["price"] = price.to_string().into();
2239 }
2240
2241 {
2243 let mut pending = self.pending_requests.write();
2244 let key = request_id.to_lookup_key();
2245 pending.insert(
2246 key,
2247 PendingRequest {
2248 method: "order.cancelReplace".into(),
2249 timestamp,
2250 report_tx: Some(report_tx),
2251 request_id: request_id.clone(),
2252 },
2253 );
2254 }
2255
2256 self.send_request(request).await?;
2257
2258 Ok(())
2259 }
2260
2261 async fn send_request(&self, request: JsonValue) -> Result<()> {
2263 let request_str = simd_json::to_string(&request)
2264 .map_err(|e| anyhow!("Failed to serialize JSON request: {}", e))?;
2265 debug!("Sending request: {request_str}");
2266 let request_object = Message::Text(request_str.into()).to_frame_view();
2267
2268 if let Some(ref mut ws_sink) = self.ws_sink.write().await.as_mut() {
2269 match ws_sink.send(request_object).await {
2270 Ok(()) => Ok(()),
2271 Err(e) => {
2272 let error_msg = format!("Failed to send WebSocket request: {e}");
2274 self.kill_socket(&error_msg).await;
2275 Err(anyhow!(error_msg))
2276 }
2277 }
2278 } else {
2279 self.kill_socket("WebSocket not connected").await;
2280 Err(anyhow!("WebSocket not connected"))
2281 }
2282 }
2283
2284 const fn map_order_type(order_type: OrderType) -> &'static str {
2286 match order_type {
2287 OrderType::Market => "MARKET",
2288 OrderType::Limit => "LIMIT",
2289 OrderType::Stop => "STOP_LOSS",
2290 OrderType::StopLimit => "STOP_LOSS_LIMIT",
2291 OrderType::FillOrKill => "LIMIT",
2292 OrderType::ImmediateOrCancel => "LIMIT",
2293 OrderType::PostOnly => "LIMIT",
2294 }
2295 }
2296
2297 const fn map_order_side(side: OrderSide) -> &'static str {
2299 match side {
2300 OrderSide::Buy => "BUY",
2301 OrderSide::Sell => "SELL",
2302 }
2303 }
2304
2305 const fn map_time_in_force(tif: TimeInForce) -> &'static str {
2307 match tif {
2308 TimeInForce::GTC => "GTC",
2309 TimeInForce::IOC => "IOC",
2310 TimeInForce::FOK => "FOK",
2311 _ => "GTC",
2312 }
2313 }
2314
2315 const fn get_time_in_force(order_type: OrderType) -> TimeInForce {
2317 match order_type {
2318 OrderType::Market => TimeInForce::IOC,
2319 OrderType::Limit => TimeInForce::GTC,
2320 OrderType::Stop => TimeInForce::GTC,
2321 OrderType::StopLimit => TimeInForce::GTC,
2322 OrderType::FillOrKill => TimeInForce::FOK,
2323 OrderType::ImmediateOrCancel => TimeInForce::IOC,
2324 OrderType::PostOnly => TimeInForce::GTC,
2325 }
2326 }
2327
2328 pub async fn send_ping(&self) -> Result<()> {
2330 if !self.is_connected() {
2332 return Err(anyhow!("WebSocket not connected - cannot send JSON ping"));
2333 }
2334
2335 let request_id = self.request_id_gen.next_id();
2336 let timestamp = self.clock.raw();
2337
2338 let request = simd_json::json!({
2339 "id": request_id.as_json_value(),
2340 "method": "ping",
2341 "params": {}
2342 });
2343
2344 let key = request_id.to_lookup_key();
2346 {
2347 let mut pending = self.pending_requests.write();
2348 pending.insert(
2349 key.clone(),
2350 PendingRequest {
2351 method: "ping".into(),
2352 timestamp,
2353 report_tx: None, request_id: request_id.clone(),
2355 },
2356 );
2357 }
2358
2359 let display_id = match &request_id {
2360 RequestId::Sequential(id) => id.to_string(),
2361 RequestId::Uuid(id) => id.to_string(),
2362 };
2363 debug!("📤 Sending JSON ping request (id: {display_id})");
2364
2365 match self.send_request(request).await {
2367 Ok(()) => {
2368 debug!("✅ JSON ping #{display_id} sent successfully");
2369 Ok(())
2370 }
2371 Err(e) => {
2372 {
2374 let mut pending = self.pending_requests.write();
2375 pending.remove(&key);
2376 }
2377
2378 if e.to_string().contains("Broken pipe") {
2380 warn!("🔌 WebSocket connection broken, marking as disconnected");
2381 }
2382
2383 Err(e)
2384 }
2385 }
2386 }
2387
2388 #[must_use]
2390 pub fn is_connected(&self) -> bool {
2391 let state = self.get_state();
2392 matches!(state, ConnectionState::Authenticated)
2393 }
2394
2395 async fn kill_socket(&self, reason: &str) {
2398 warn!("🗡️ KILL-STEP 1/9: Killing WebSocket connection: {reason}");
2399
2400 self.set_state(ConnectionState::Disconnecting);
2402 info!("🗡️ KILL-STEP 2/9: State set to Disconnecting");
2403
2404 info!("🗡️ KILL-STEP 3/9: Signaling reconnection monitor to stop...");
2406 self.reconnection_control
2407 .should_stop
2408 .store(true, Ordering::Release);
2409
2410 info!("🗡️ KILL-STEP 4/9: Cancelling background tasks...");
2412 let handles = {
2413 let mut handles = self.task_handles.write().await;
2414 TaskHandles {
2415 response_handler: handles.response_handler.take(),
2416 ping_handler: handles.ping_handler.take(),
2417 reconnection_monitor: handles.reconnection_monitor.take(),
2418 }
2419 };
2420
2421 if let Some(handle) = handles.response_handler {
2423 handle.abort();
2424 debug!("Response handler task aborted");
2425 }
2426 if let Some(handle) = handles.ping_handler {
2427 handle.abort();
2428 debug!("Ping handler task aborted");
2429 }
2430
2431 if let Some(handle) = handles.reconnection_monitor {
2433 match tokio::time::timeout(Duration::from_millis(100), handle).await {
2434 Ok(_) => debug!("Reconnection monitor exited gracefully"),
2435 Err(_) => {
2436 debug!("Reconnection monitor didn't exit in time, continuing anyway");
2437 }
2439 }
2440 }
2441
2442 info!("🗡️ KILL-STEP 5/9: Tasks cancelled");
2443
2444 info!("🗡️ KILL-STEP 6/9: Closing WebSocket sink...");
2446 if let Some(mut ws_sink) = self.ws_sink.write().await.take() {
2447 match tokio::time::timeout(Duration::from_secs(2), ws_sink.close()).await {
2449 Ok(Ok(())) => {
2450 info!("🗡️ KILL-STEP 6/9: WebSocket sink closed successfully");
2451 }
2452 Ok(Err(e)) => {
2453 warn!("🗡️ KILL-STEP 6/9: WebSocket sink close failed: {e}");
2454 }
2455 Err(_) => {
2456 warn!("🗡️ KILL-STEP 6/9: WebSocket sink close timed out - continuing anyway");
2457 }
2458 }
2459 } else {
2460 info!("🗡️ KILL-STEP 6/9: No WebSocket sink to close");
2461 }
2462
2463 info!("🗡️ KILL-STEP 7/9: Cleaning up WebSocket stream...");
2465 match tokio::time::timeout(Duration::from_secs(1), async {
2466 *self.ws_stream.write().await = None;
2467 })
2468 .await
2469 {
2470 Ok(()) => {
2471 info!("🗡️ KILL-STEP 7/9: WebSocket stream cleaned up successfully");
2472 }
2473 Err(_) => {
2474 warn!("🗡️ KILL-STEP 7/9: WebSocket stream cleanup timed out - continuing anyway");
2475 }
2476 }
2477
2478 info!("🗡️ KILL-STEP 8/9: Clearing pending requests...");
2480 match tokio::time::timeout(Duration::from_secs(1), async {
2481 let mut pending = self.pending_requests.write();
2482 if pending.len() > MAX_PENDING_REQUESTS {
2484 warn!(
2485 "Detected {} pending requests - possible memory leak",
2486 pending.len()
2487 );
2488 }
2489 pending.clear();
2490 })
2491 .await
2492 {
2493 Ok(()) => {
2494 info!("🗡️ KILL-STEP 8/9: Pending requests cleared successfully");
2495 }
2496 Err(_) => {
2497 warn!("🗡️ KILL-STEP 8/9: Pending requests cleanup timed out - continuing anyway");
2498 }
2499 }
2500
2501 info!("🗡️ KILL-STEP 9/9: Resetting state...");
2503 self.last_ping_time.store(0, Ordering::Release);
2504 self.last_pong_time.store(0, Ordering::Release);
2505 self.auth_completed_time.store(0, Ordering::Release);
2506 self.set_state(ConnectionState::Disconnected);
2507
2508 info!(
2509 "🗡️ ✅ WebSocket connection killed and resources cleaned up - ready for reconnection"
2510 );
2511 }
2512
2513 pub async fn reconnect(&self, report_tx: Sender<ExecutionReport>) -> Result<()> {
2516 info!("External reconnection requested...");
2517
2518 self.kill_socket("External reconnection requested").await;
2520
2521 tokio::time::sleep(Duration::from_millis(100)).await;
2523
2524 self.connect(report_tx).await
2526 }
2527
2528 pub async fn disconnect(&self) -> Result<()> {
2530 info!("Gracefully disconnecting from Binance WebSocket...");
2531
2532 self.kill_socket("Graceful disconnect requested").await;
2534
2535 if let Some(handle) = self.task_handles.write().await.reconnection_monitor.take() {
2537 handle.abort();
2538 info!("Reconnection monitor cancelled");
2539 }
2540
2541 info!("Disconnected from Binance WebSocket");
2542 Ok(())
2543 }
2544}
2545
2546#[async_trait]
2547impl crate::execution_engine::Exchange for BinanceWebSocketTrader {
2548 fn venue(&self) -> Venue {
2549 Venue::Binance
2550 }
2551
2552 async fn place_order(&self, order: Order, report_tx: Sender<ExecutionReport>) -> Result<()> {
2553 self.check_and_record_rate_limit(1)?;
2555
2556 self.place_order_internal(order, report_tx).await
2557 }
2558
2559 async fn cancel_order(
2560 &self,
2561 order_id: SmartString,
2562 report_tx: Sender<ExecutionReport>,
2563 ) -> Result<()> {
2564 let request_id = self.request_id_gen.next_id();
2565 let timestamp = self.clock.raw();
2566
2567 let mut params = Object::with_capacity(2);
2568 params.insert("origClientOrderId".into(), order_id.as_str().into());
2569 params.insert("timestamp".into(), (timestamp / 1_000_000).into());
2570
2571 let request = simd_json::json!({
2573 "id": request_id.as_json_value(),
2574 "method": "order.cancel",
2575 "params": JsonValue::from(params)
2576 });
2577
2578 {
2580 let mut pending = self.pending_requests.write();
2581 let key = request_id.to_lookup_key();
2582 pending.insert(
2583 key,
2584 PendingRequest {
2585 method: "order.cancel".into(),
2586 timestamp,
2587 report_tx: Some(report_tx),
2588 request_id: request_id.clone(),
2589 },
2590 );
2591 }
2592
2593 self.send_request(request).await
2594 }
2595
2596 async fn modify_order(
2597 &self,
2598 order_id: SmartString,
2599 new_price: Option<Decimal>,
2600 new_quantity: Option<Decimal>,
2601 report_tx: Sender<ExecutionReport>,
2602 ) -> Result<()> {
2603 let request_id = self.request_id_gen.next_id();
2604 let timestamp = self.clock.raw();
2605
2606 let mut params = Object::with_capacity(4);
2607 params.insert("origClientOrderId".into(), order_id.as_str().into());
2608 params.insert("timestamp".into(), (timestamp / 1_000_000).into());
2609
2610 if let Some(price) = new_price {
2611 params.insert("price".into(), price.to_string().into());
2612 }
2613 if let Some(quantity) = new_quantity {
2614 params.insert("quantity".into(), quantity.to_string().into());
2615 }
2616
2617 let request = simd_json::json!({
2619 "id": request_id.as_json_value(),
2620 "method": "order.cancelReplace",
2621 "params": JsonValue::from(params)
2622 });
2623
2624 {
2626 let mut pending = self.pending_requests.write();
2627 let key = request_id.to_lookup_key();
2628 pending.insert(
2629 key,
2630 PendingRequest {
2631 method: "order.cancelReplace".into(),
2632 timestamp,
2633 report_tx: Some(report_tx),
2634 request_id: request_id.clone(),
2635 },
2636 );
2637 }
2638
2639 self.send_request(request).await
2640 }
2641
2642 async fn cancel_all_orders(
2643 &self,
2644 instrument_id: Option<InstrumentId>,
2645 report_tx: Sender<ExecutionReport>,
2646 ) -> Result<()> {
2647 let request_id = self.request_id_gen.next_id();
2648 let timestamp = self.clock.raw();
2649
2650 let instrument = instrument_id.ok_or_else(|| {
2652 anyhow!("Binance openOrders.cancelAll requires a symbol parameter - instrument_id cannot be None")
2653 })?;
2654
2655 let mut params = Object::with_capacity(2);
2656 params.insert("timestamp".into(), (timestamp / 1_000_000).into());
2657 params.insert("symbol".into(), instrument.symbol.as_str().into());
2658
2659 let request = simd_json::json!({
2661 "id": request_id.as_json_value(),
2662 "method": "openOrders.cancelAll",
2663 "params": JsonValue::from(params)
2664 });
2665
2666 {
2668 let mut pending = self.pending_requests.write();
2669 let key = request_id.to_lookup_key();
2670 pending.insert(
2671 key,
2672 PendingRequest {
2673 method: "openOrders.cancelAll".into(),
2674 timestamp,
2675 report_tx: Some(report_tx),
2676 request_id: request_id.clone(),
2677 },
2678 );
2679 }
2680
2681 self.send_request(request).await
2682 }
2683
2684 async fn get_order_status(&self, order_id: &str) -> Result<OrderStatus> {
2685 Err(anyhow!(
2689 "get_order_status not supported via WebSocket for Binance"
2690 ))
2691 }
2692
2693 async fn connect(&self, report_sender: Sender<ExecutionReport>) -> Result<()> {
2694 self.connect_without_monitor(report_sender).await
2695 }
2696
2697 async fn disconnect(&self) -> Result<()> {
2698 self.disconnect().await
2699 }
2700
2701 async fn is_connected(&self) -> bool {
2702 self.get_state() == ConnectionState::Authenticated
2703 }
2704
2705 async fn get_instruments(&self) -> Result<SmallVec<[InstrumentId; 32]>> {
2706 Ok(SmallVec::new())
2709 }
2710
2711 async fn send_fix_message(&self, message: Vec<u8>) -> Result<()> {
2712 log::warn!(
2713 "BinanceWebSocketTrader: send_fix_message not implemented. Message: {:?}",
2714 message
2715 );
2716 Err(anyhow!(
2717 "FIX message sending not implemented for Binance WebSocket"
2718 ))
2719 }
2720
2721 async fn receive_fix_message(&self) -> Result<Vec<u8>> {
2722 log::warn!("BinanceWebSocketTrader: receive_fix_message not implemented.");
2723 Err(anyhow!(
2724 "FIX message receiving not implemented for Binance WebSocket"
2725 ))
2726 }
2727}
2728
2729#[cfg(test)]
2730mod tests {
2731 use super::*;
2732 use crate::position_manager::{DefaultPositionManager, PositionManager};
2733 use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
2734
2735 fn create_mock_position_manager() -> Arc<dyn PositionManager> {
2737 Arc::new(DefaultPositionManager::new(None))
2738 }
2739
2740 #[tokio::test]
2741 async fn test_websocket_trader_creation() {
2742 let api_key = SmartString::from("test_api_key");
2743 let private_key = BASE64.encode([1u8; 32]); let auth = Arc::new(BinanceAuth::new_ed25519(api_key, private_key.into()).unwrap());
2746 let trader = BinanceWebSocketTrader::new(auth, create_mock_position_manager());
2747
2748 assert!(!trader.is_connected());
2749
2750 let first_id = trader.request_id_gen.next_id();
2752 assert!(
2753 matches!(first_id, RequestId::Sequential(1)),
2754 "Expected sequential ID 1, got {first_id:?}"
2755 );
2756
2757 let second_id = trader.request_id_gen.next_id();
2758 assert!(
2759 matches!(second_id, RequestId::Sequential(2)),
2760 "Expected sequential ID 2, got {second_id:?}"
2761 );
2762 }
2763
2764 #[test]
2765 fn test_stable_position_id_generation() {
2766 let symbol = "BTCUSDT";
2768 let side = PositionSide::Long;
2769 let venue = Venue::Binance;
2770
2771 let id1 = BinanceWebSocketTrader::generate_stable_position_id(symbol, side, venue);
2772 let id2 = BinanceWebSocketTrader::generate_stable_position_id(symbol, side, venue);
2773
2774 assert_eq!(
2775 id1, id2,
2776 "Same position characteristics should generate same ID"
2777 );
2778
2779 let different_symbol_id =
2781 BinanceWebSocketTrader::generate_stable_position_id("ETHUSDT", side, venue);
2782 let different_side_id =
2783 BinanceWebSocketTrader::generate_stable_position_id(symbol, PositionSide::Short, venue);
2784
2785 assert_ne!(
2786 id1, different_symbol_id,
2787 "Different symbols should generate different IDs"
2788 );
2789 assert_ne!(
2790 id1, different_side_id,
2791 "Different sides should generate different IDs"
2792 );
2793
2794 let id3 = BinanceWebSocketTrader::generate_stable_position_id(symbol, side, venue);
2796 assert_eq!(id1, id3, "ID generation should be deterministic");
2797 }
2798
2799 #[test]
2800 fn test_order_type_mapping() {
2801 assert_eq!(
2802 BinanceWebSocketTrader::map_order_type(OrderType::Market),
2803 "MARKET"
2804 );
2805 assert_eq!(
2806 BinanceWebSocketTrader::map_order_type(OrderType::Limit),
2807 "LIMIT"
2808 );
2809 assert_eq!(
2810 BinanceWebSocketTrader::map_order_side(OrderSide::Buy),
2811 "BUY"
2812 );
2813 assert_eq!(
2814 BinanceWebSocketTrader::map_order_side(OrderSide::Sell),
2815 "SELL"
2816 );
2817 }
2818
2819 #[test]
2820 fn test_order_status_mapping() {
2821 assert_eq!(
2822 BinanceWebSocketTrader::map_order_status("NEW"),
2823 OrderStatus::New
2824 );
2825 assert_eq!(
2826 BinanceWebSocketTrader::map_order_status("FILLED"),
2827 OrderStatus::Filled
2828 );
2829 assert_eq!(
2830 BinanceWebSocketTrader::map_order_status("CANCELLED"),
2831 OrderStatus::Cancelled
2832 );
2833 }
2834
2835 #[tokio::test]
2836 async fn test_ping_handler_lifecycle() {
2837 let auth = Arc::new(BinanceAuth::new_hmac(
2838 "test_key".into(),
2839 "test_secret".into(),
2840 ));
2841 let (report_tx, _report_rx) = flume::bounded::<ExecutionReport>(100);
2842 let trader = BinanceWebSocketTrader::new_testnet(auth, create_mock_position_manager());
2843
2844 assert_eq!(trader.get_state(), ConnectionState::Disconnected);
2846 assert_eq!(
2847 trader.metrics.websocket_pings_sent.load(Ordering::Relaxed),
2848 0
2849 );
2850 assert_eq!(
2851 trader
2852 .metrics
2853 .websocket_pongs_received
2854 .load(Ordering::Relaxed),
2855 0
2856 );
2857
2858 let result = trader.start_ping_handler().await;
2860 assert!(result.is_ok());
2861 let handle = result.unwrap();
2862
2863 tokio::time::sleep(Duration::from_millis(100)).await;
2865
2866 handle.abort();
2868 }
2869
2870 #[tokio::test]
2871 async fn test_json_ping_not_connected() {
2872 let auth = Arc::new(BinanceAuth::new_hmac(
2873 "test_key".into(),
2874 "test_secret".into(),
2875 ));
2876 let (report_tx, _report_rx) = flume::bounded::<ExecutionReport>(100);
2877 let trader = BinanceWebSocketTrader::new_testnet(auth, create_mock_position_manager());
2878
2879 let result = trader.send_ping().await;
2881 assert!(result.is_err());
2882 assert!(
2883 result
2884 .unwrap_err()
2885 .to_string()
2886 .contains("WebSocket not connected")
2887 );
2888 }
2889
2890 #[test]
2891 fn test_connection_metrics() {
2892 let auth = Arc::new(BinanceAuth::new_hmac(
2893 "test_key".into(),
2894 "test_secret".into(),
2895 ));
2896 let (report_tx, _report_rx) = flume::bounded::<ExecutionReport>(100);
2897 let trader = BinanceWebSocketTrader::new_testnet(auth, create_mock_position_manager());
2898
2899 let metrics = &trader.metrics;
2901 assert_eq!(metrics.websocket_pings_sent.load(Ordering::Relaxed), 0);
2902 assert_eq!(metrics.websocket_pongs_received.load(Ordering::Relaxed), 0);
2903 assert_eq!(metrics.json_pings_sent.load(Ordering::Relaxed), 0);
2904 assert_eq!(metrics.json_pongs_received.load(Ordering::Relaxed), 0);
2905 assert_eq!(metrics.messages_sent.load(Ordering::Relaxed), 0);
2906 assert_eq!(metrics.messages_received.load(Ordering::Relaxed), 0);
2907 assert_eq!(metrics.reconnection_attempts.load(Ordering::Relaxed), 0);
2908 assert_eq!(metrics.successful_reconnections.load(Ordering::Relaxed), 0);
2909 assert_eq!(metrics.failed_reconnections.load(Ordering::Relaxed), 0);
2910
2911 metrics.websocket_pings_sent.fetch_add(1, Ordering::Relaxed);
2913 assert_eq!(metrics.websocket_pings_sent.load(Ordering::Relaxed), 1);
2914 }
2915
2916 #[test]
2917 fn test_ping_pong_timeout_calculation() {
2918 assert_eq!(PING_INTERVAL_SECONDS, 30);
2920 assert_eq!(PONG_TIMEOUT_SECONDS, 10);
2921
2922 const _: () = assert!(PONG_TIMEOUT_SECONDS < PING_INTERVAL_SECONDS);
2925 }
2926
2927 #[test]
2928 fn test_batch_size_constants() {
2929 assert_eq!(MAX_BATCH_SIZE, 50);
2931 assert_eq!(MAX_ORDERS_PER_10_SECONDS, 300);
2932
2933 assert!(MAX_BATCH_SIZE <= MAX_ORDERS_PER_10_SECONDS as usize);
2935
2936 assert!(MAX_BATCH_SIZE * 5 < MAX_ORDERS_PER_10_SECONDS as usize);
2938 }
2939
2940 #[tokio::test]
2941 async fn test_batch_orders_empty_list() {
2942 let auth = Arc::new(BinanceAuth::new_hmac(
2943 "test_key".into(),
2944 "test_secret".into(),
2945 ));
2946 let trader = BinanceWebSocketTrader::new(auth, create_mock_position_manager());
2947 let (report_tx, _report_rx) = flume::bounded(100);
2948
2949 let result = trader.place_batch_orders(vec![], report_tx).await;
2951 assert!(result.is_ok());
2952 }
2953
2954 #[tokio::test]
2955 async fn test_batch_orders_exceeds_limit() {
2956 let auth = Arc::new(BinanceAuth::new_hmac(
2957 "test_key".into(),
2958 "test_secret".into(),
2959 ));
2960 let trader = BinanceWebSocketTrader::new(auth, create_mock_position_manager());
2961 let (report_tx, _report_rx) = flume::bounded(100);
2962
2963 let mut orders = Vec::new();
2965 for i in 0..=MAX_BATCH_SIZE {
2966 let order = Order::new(
2967 Venue::Binance,
2968 "BTCUSDT",
2969 OrderSide::Buy,
2970 rusty_model::enums::OrderType::Limit,
2971 Decimal::ONE,
2972 Some(Decimal::from(50000)),
2973 rusty_model::types::ClientId::new(format!("test_client_{i}")),
2974 );
2975 orders.push(order);
2976 }
2977
2978 let result = trader.place_batch_orders(orders, report_tx).await;
2980 assert!(result.is_err());
2981 assert!(result.unwrap_err().to_string().contains("exceeds maximum"));
2982 }
2983
2984 #[test]
2985 fn test_rate_limiter_functionality() {
2986 let auth = Arc::new(BinanceAuth::new_hmac(
2987 "test_key".into(),
2988 "test_secret".into(),
2989 ));
2990 let trader = BinanceWebSocketTrader::new(auth, create_mock_position_manager());
2991
2992 let (current, limit) = trader.get_rate_limit_status();
2994 assert_eq!(current, 0); assert_eq!(limit, MAX_ORDERS_PER_10_SECONDS as usize);
2996
2997 let clock = Clock::new();
2999 let mut rate_limiter = OrderRateLimiter::new(clock);
3000
3001 assert!(rate_limiter.can_place_orders(100));
3003 rate_limiter.record_orders(100);
3004
3005 assert!(rate_limiter.can_place_orders(200));
3007 rate_limiter.record_orders(200);
3008
3009 assert!(!rate_limiter.can_place_orders(1));
3011
3012 let current = rate_limiter.current_order_count();
3014 assert_eq!(current, 300);
3015 }
3016
3017 #[test]
3018 fn test_order_rate_limiter_new() {
3019 let clock = Clock::new();
3020 let rate_limiter = OrderRateLimiter::new(clock.clone());
3021
3022 assert_eq!(rate_limiter.order_times.len(), 0);
3024
3025 assert_eq!(
3027 rate_limiter.clock.raw() / 1_000_000,
3028 clock.raw() / 1_000_000
3029 );
3030 }
3031
3032 #[test]
3033 fn test_order_rate_limiter_can_place_orders_basic() {
3034 let clock = Clock::new();
3035 let mut rate_limiter = OrderRateLimiter::new(clock);
3036
3037 assert!(rate_limiter.can_place_orders(1));
3039 assert!(rate_limiter.can_place_orders(10));
3040 assert!(rate_limiter.can_place_orders(MAX_ORDERS_PER_10_SECONDS as usize));
3041
3042 assert!(!rate_limiter.can_place_orders(MAX_ORDERS_PER_10_SECONDS as usize + 1));
3044 }
3045
3046 #[test]
3047 fn test_order_rate_limiter_can_place_orders_with_existing() {
3048 let clock = Clock::new();
3049 let mut rate_limiter = OrderRateLimiter::new(clock.clone());
3050
3051 let now = clock.raw() / 1_000_000;
3053 rate_limiter.order_times.push_back(now);
3054 rate_limiter.order_times.push_back(now);
3055
3056 assert!(rate_limiter.can_place_orders(1));
3058 assert!(rate_limiter.can_place_orders(MAX_ORDERS_PER_10_SECONDS as usize - 2));
3059 assert!(!rate_limiter.can_place_orders(MAX_ORDERS_PER_10_SECONDS as usize - 1));
3060 }
3061
3062 #[test]
3063 fn test_order_rate_limiter_record_orders() {
3064 let clock = Clock::new();
3065 let mut rate_limiter = OrderRateLimiter::new(clock);
3066
3067 rate_limiter.record_orders(1);
3069 assert_eq!(rate_limiter.order_times.len(), 1);
3070
3071 rate_limiter.record_orders(5);
3073 assert_eq!(rate_limiter.order_times.len(), 6);
3074
3075 let first_time = rate_limiter.order_times[0];
3077 let last_time = rate_limiter.order_times[5];
3078 assert_eq!(first_time, last_time); }
3080
3081 #[test]
3082 fn test_order_rate_limiter_get_current_usage() {
3083 let clock = Clock::new();
3084 let mut rate_limiter = OrderRateLimiter::new(clock);
3085
3086 let (current, limit) = rate_limiter.get_current_usage();
3088 assert_eq!(current, 0);
3089 assert_eq!(limit, MAX_ORDERS_PER_10_SECONDS as usize);
3090
3091 rate_limiter.record_orders(10);
3093 let (current, limit) = rate_limiter.get_current_usage();
3094 assert_eq!(current, 10);
3095 assert_eq!(limit, MAX_ORDERS_PER_10_SECONDS as usize);
3096 }
3097
3098 #[test]
3099 fn test_order_rate_limiter_current_order_count() {
3100 let clock = Clock::new();
3101 let mut rate_limiter = OrderRateLimiter::new(clock);
3102
3103 assert_eq!(rate_limiter.current_order_count(), 0);
3105
3106 rate_limiter.record_orders(7);
3108 assert_eq!(rate_limiter.current_order_count(), 7);
3109
3110 rate_limiter.record_orders(3);
3112 assert_eq!(rate_limiter.current_order_count(), 10);
3113 }
3114
3115 #[test]
3116 fn test_rate_limiter_window_cleanup() {
3117 let clock = Clock::new();
3118 let mut rate_limiter = OrderRateLimiter::new(clock.clone());
3119
3120 rate_limiter.order_times.push_back(0); rate_limiter.order_times.push_back(1000); let now = clock.raw() / 1_000_000;
3126 rate_limiter.order_times.push_back(now);
3127 rate_limiter.order_times.push_back(now);
3128
3129 assert!(rate_limiter.can_place_orders(1));
3131
3132 assert_eq!(rate_limiter.order_times.len(), 2);
3134 }
3135
3136 #[test]
3137 fn test_order_rate_limiter_edge_cases() {
3138 let clock = Clock::new();
3139 let mut rate_limiter = OrderRateLimiter::new(clock);
3140
3141 assert!(rate_limiter.can_place_orders(0));
3143 rate_limiter.record_orders(0);
3144 assert_eq!(rate_limiter.current_order_count(), 0);
3145
3146 let max_orders = MAX_ORDERS_PER_10_SECONDS as usize;
3148 assert!(rate_limiter.can_place_orders(max_orders));
3149 rate_limiter.record_orders(max_orders);
3150 assert_eq!(rate_limiter.current_order_count(), max_orders);
3151
3152 assert!(!rate_limiter.can_place_orders(1));
3154 }
3155
3156 #[test]
3157 fn test_order_rate_limiter_window_sliding() {
3158 let clock = Clock::new();
3159 let mut rate_limiter = OrderRateLimiter::new(clock.clone());
3160
3161 let now = clock.raw() / 1_000_000;
3163 let window_start = now.saturating_sub(RATE_LIMIT_WINDOW_MS);
3164
3165 rate_limiter.order_times.push_back(window_start + 1000); rate_limiter.order_times.push_back(window_start + 5000); rate_limiter.order_times.push_back(now);
3171
3172 assert_eq!(rate_limiter.current_order_count(), 3);
3174
3175 assert!(rate_limiter.can_place_orders(MAX_ORDERS_PER_10_SECONDS as usize - 3));
3177 }
3178
3179 #[test]
3180 fn test_order_rate_limiter_cleanup_all_methods() {
3181 let clock = Clock::new();
3182 let mut rate_limiter = OrderRateLimiter::new(clock);
3183
3184 rate_limiter.order_times.push_back(0);
3186 rate_limiter.order_times.push_back(1000);
3187 rate_limiter.order_times.push_back(2000);
3188
3189 assert!(rate_limiter.can_place_orders(1));
3191 assert_eq!(rate_limiter.order_times.len(), 0); rate_limiter.order_times.push_back(0);
3195 rate_limiter.order_times.push_back(1000);
3196
3197 let count = rate_limiter.current_order_count();
3198 assert_eq!(count, 0); assert_eq!(rate_limiter.order_times.len(), 0);
3200
3201 rate_limiter.order_times.push_back(0);
3203 rate_limiter.order_times.push_back(1000);
3204
3205 let (current, _) = rate_limiter.get_current_usage();
3206 assert_eq!(current, 0); assert_eq!(rate_limiter.order_times.len(), 0);
3208 }
3209
3210 #[tokio::test]
3211 async fn test_concurrent_batch_error_handling() {
3212 let auth = Arc::new(BinanceAuth::new_hmac(
3213 "test_key".into(),
3214 "test_secret".into(),
3215 ));
3216 let trader = BinanceWebSocketTrader::new(auth, create_mock_position_manager());
3217 let (report_tx, _report_rx) = flume::bounded(100);
3218
3219 let mut orders = Vec::new();
3221 for i in 0..3 {
3222 let order = Order::new(
3223 Venue::Binance,
3224 "BTCUSDT",
3225 OrderSide::Buy,
3226 rusty_model::enums::OrderType::Limit,
3227 Decimal::ONE,
3228 Some(Decimal::from(50000)),
3229 rusty_model::types::ClientId::new(format!("test_client_{i}")),
3230 );
3231 orders.push(order);
3232 }
3233
3234 let result = trader.place_batch_orders(orders, report_tx).await;
3236 assert!(result.is_err());
3237
3238 let error_msg = result.unwrap_err().to_string();
3240 assert!(error_msg.contains("batch") || error_msg.contains("All orders"));
3241 }
3242
3243 #[test]
3244 fn test_batch_size_increase() {
3245 assert_eq!(MAX_BATCH_SIZE, 50);
3247
3248 assert!(MAX_BATCH_SIZE < MAX_ORDERS_PER_10_SECONDS as usize / 2);
3250 }
3251}