rusty_ems/exchanges/
bybit_websocket_trading.rs

1//! Bybit V5 WebSocket Trading Implementation
2//!
3//! This module provides a high-performance WebSocket trading client for Bybit V5 exchange
4//! with robust connection management, automatic reconnection, and comprehensive health monitoring.
5//!
6//! # Features
7//!
8//! - **V5 unified authentication**: Support for all product types (spot, linear, inverse, option)
9//! - **Batch operations**: Create, cancel, and amend up to 20 orders in a single request
10//! - **Position updates**: Real-time position change notifications
11//! - **Connection health monitoring**: Real-time metrics and health status
12//! - **Automatic reconnection**: Configurable backoff strategy with exponential delays
13//! - **Zero-copy message processing**: Optimized for low-latency trading
14//!
15//! # Connection Management
16//!
17//! The client maintains connection health through:
18//! - WebSocket ping frames sent every 20 seconds (as recommended by Bybit)
19//! - JSON ping requests for API-level health checks
20//! - Automatic reconnection on connection loss
21//! - Configurable pong timeout (default: 10 seconds)
22//!
23//! # Batch Operations
24//!
25//! Supports Bybit V5 batch operations:
26//! - `order.create-batch`: Create up to 20 orders (10 for spot) in one request
27//! - `order.cancel-batch`: Cancel multiple orders by IDs
28//! - `order.amend-batch`: Modify multiple orders
29//!
30//! # Usage Example
31//!
32//! ```rust,no_run
33//! use rusty_common::auth::exchanges::bybit::BybitAuth;
34//! use rusty_ems::exchanges::bybit_websocket_trading::BybitWebSocketTrader;
35//! use std::sync::Arc;
36//!
37//! #[tokio::main]
38//! async fn main() -> Result<()> {
39//!     let auth = Arc::new(BybitAuth::new(
40//!         "api_key".into(),
41//!         "secret_key".into()
42//!     ));
43//!
44//!     let trader = BybitWebSocketTrader::new(auth, false); // false = mainnet
45//!     let (report_tx, report_rx) = flume::bounded(100);
46//!
47//!     // Connect and authenticate
48//!     trader.connect(report_tx).await?;
49//!
50//!     // Check connection health
51//!     let health = trader.get_connection_health();
52//!     println!("Connection healthy: {}", health.is_healthy);
53//!
54//!     Ok(())
55//! }
56//! ```
57
58use crate::instrument_registry::InstrumentRegistry;
59use rusty_common::auth::exchanges::bybit::{BybitAuth, BybitWsTradingMessage};
60use rusty_common::collections::FxHashMap;
61use rusty_common::utils::id_generation;
62use std::sync::Arc;
63use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
64
65use crate::execution_engine::ExecutionReport;
66use anyhow::{Result, bail};
67use async_trait::async_trait;
68use flume::Sender;
69use futures::{SinkExt, StreamExt};
70use log::{debug, error, info, warn};
71use parking_lot::RwLock;
72use quanta::Clock;
73use rust_decimal::Decimal;
74use rusty_common::types::Exchange;
75use rusty_common::websocket::{
76    Message, WebSocketConfig,
77    client::ConnectionState as WsConnectionState,
78    connector::{WebSocketConnector, WebSocketSink, WebSocketStream},
79    stats::new_shared_stats,
80};
81use rusty_model::{
82    enums::{OrderSide, OrderStatus, OrderType, TimeInForce},
83    instruments::InstrumentId,
84    position::{MarginType, PositionSide, PositionUpdate},
85    trading_order::Order,
86    types::PositionId,
87    venues::Venue,
88};
89use simd_json::prelude::*;
90use simd_json::value::owned::Value as JsonValue;
91use smallvec::SmallVec;
92use smartstring::alias::String as SmartString;
93use std::str::FromStr;
94use std::time::Duration;
95use tokio::sync::RwLock as AsyncRwLock;
96use tokio::sync::oneshot;
97use tokio::task::JoinHandle;
98use tokio::time::interval;
99use uuid::Uuid;
100
101/// Default WebSocket URLs for Bybit V5
102const WS_MAINNET_URL: &str = "wss://stream.bybit.com/v5/trade";
103const WS_TESTNET_URL: &str = "wss://stream-testnet.bybit.com/v5/trade";
104
105/// Connection configuration constants
106const PING_INTERVAL_SECONDS: u64 = 20; // Bybit recommends 20 seconds
107const PONG_TIMEOUT_SECONDS: u64 = 10;
108const MAX_RECONNECTION_ATTEMPTS: u8 = 10;
109const INITIAL_BACKOFF_MS: u64 = 1000;
110const MAX_BACKOFF_MS: u64 = 60000;
111
112/// Bybit V5 product categories
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub enum BybitCategory {
115    /// Spot trading category for spot markets
116    Spot,
117    /// Linear futures category for USDT-margined contracts
118    Linear,
119    /// Inverse futures category for coin-margined contracts
120    Inverse,
121    /// Options category for options contracts
122    Options,
123}
124
125/// Bybit account type
126#[derive(Debug, Clone, Copy, PartialEq, Eq)]
127pub enum BybitAccountType {
128    /// Unified margin account (cross-margin across all products)
129    Unified,
130    /// Classic account (isolated margin per product)
131    Contract,
132}
133
134impl BybitCategory {
135    const fn as_str(&self) -> &'static str {
136        match self {
137            Self::Spot => "spot",
138            Self::Linear => "linear",
139            Self::Inverse => "inverse",
140            Self::Options => "option",
141        }
142    }
143
144    /// Parse from API string
145    #[must_use]
146    pub fn parse_category(s: &str) -> Option<Self> {
147        match s {
148            "spot" => Some(Self::Spot),
149            "linear" => Some(Self::Linear),
150            "inverse" => Some(Self::Inverse),
151            "option" => Some(Self::Options),
152            _ => None,
153        }
154    }
155}
156
157/// Instrument info for symbol categorization
158#[derive(Debug, Clone)]
159struct InstrumentInfo {
160    /// Trading symbol (e.g., "BTCUSDT")
161    symbol: SmartString,
162    /// Bybit product category (spot, linear, inverse, options)
163    category: BybitCategory,
164    /// Base coin of the trading pair (e.g., "BTC")
165    base_coin: SmartString,
166    /// Quote coin of the trading pair (e.g., "USDT")
167    quote_coin: SmartString,
168}
169
170/// Connection health metrics
171#[derive(Debug, Clone, Default)]
172pub struct ConnectionHealth {
173    /// Whether the connection is healthy
174    pub is_healthy: bool,
175    /// Timestamp of last ping sent (nanoseconds)
176    pub last_ping_sent: u64,
177    /// Timestamp of last pong received (nanoseconds)
178    pub last_pong_received: u64,
179    /// Number of reconnection attempts made
180    pub reconnection_attempts: u8,
181    /// Total number of messages sent
182    pub messages_sent: u64,
183    /// Total number of messages received
184    pub messages_received: u64,
185}
186
187/// Bybit WebSocket order request for individual orders
188#[derive(Debug, Clone)]
189pub struct BybitOrderRequest {
190    /// Trading symbol (e.g., "BTCUSDT")
191    pub symbol: SmartString,
192    /// Order side (Buy or Sell)
193    pub side: OrderSide,
194    /// Order type (Market, Limit, etc.)
195    pub order_type: OrderType,
196    /// Order quantity
197    pub quantity: Decimal,
198    /// Order price (required for limit orders)
199    pub price: Option<Decimal>,
200    /// Time in force (GTC, IOC, FOK, etc.)
201    pub time_in_force: TimeInForce,
202    /// Bybit product category (spot, linear, inverse, options)
203    pub category: BybitCategory,
204    /// Client-provided order link ID for tracking
205    pub order_link_id: Option<SmartString>,
206    /// Whether this is a reduce-only order
207    pub reduce_only: Option<bool>,
208    /// Whether to close position on trigger
209    pub close_on_trigger: Option<bool>,
210    /// Position index for hedge mode (0=one-way, 1=buy-side, 2=sell-side)
211    pub position_idx: Option<u32>,
212    /// Take profit price
213    pub take_profit: Option<Decimal>,
214    /// Stop loss price
215    pub stop_loss: Option<Decimal>,
216    /// Take profit trigger price type (LastPrice, MarkPrice, IndexPrice)
217    pub tp_trigger_by: Option<SmartString>,
218    /// Stop loss trigger price type (LastPrice, MarkPrice, IndexPrice)
219    pub sl_trigger_by: Option<SmartString>,
220    /// Take profit limit price for TP limit orders
221    pub tp_limit_price: Option<Decimal>,
222    /// Stop loss limit price for SL limit orders
223    pub sl_limit_price: Option<Decimal>,
224}
225
226/// Bybit WebSocket batch order request
227#[derive(Debug, Clone)]
228pub struct BybitBatchOrderRequest {
229    /// Bybit product category for all orders in the batch
230    pub category: BybitCategory,
231    /// List of orders to be placed in the batch (max 20 orders, 10 for spot)
232    pub orders: Vec<BybitOrderRequest>,
233}
234
235/// Order amendment request
236#[derive(Debug, Clone)]
237pub struct BybitAmendOrderRequest {
238    /// Bybit product category (spot, linear, inverse, options)
239    pub category: BybitCategory,
240    /// Trading symbol (e.g., "BTCUSDT")
241    pub symbol: SmartString,
242    /// Bybit order ID (either order_id or order_link_id must be provided)
243    pub order_id: Option<SmartString>,
244    /// Client-provided order link ID (either order_id or order_link_id must be provided)
245    pub order_link_id: Option<SmartString>,
246    /// New order quantity
247    pub qty: Option<Decimal>,
248    /// New order price
249    pub price: Option<Decimal>,
250    /// New take profit price
251    pub take_profit: Option<Decimal>,
252    /// New stop loss price
253    pub stop_loss: Option<Decimal>,
254    /// Take profit trigger price type (LastPrice, MarkPrice, IndexPrice)
255    pub tp_trigger_by: Option<SmartString>,
256    /// Stop loss trigger price type (LastPrice, MarkPrice, IndexPrice)
257    pub sl_trigger_by: Option<SmartString>,
258    /// New trigger price for conditional orders
259    pub trigger_price: Option<Decimal>,
260    /// New take profit limit price for TP limit orders
261    pub tp_limit_price: Option<Decimal>,
262    /// New stop loss limit price for SL limit orders
263    pub sl_limit_price: Option<Decimal>,
264}
265
266/// Cancel all orders request
267#[derive(Debug, Clone)]
268pub struct BybitCancelAllRequest {
269    /// Bybit product category (spot, linear, inverse, options)
270    pub category: BybitCategory,
271    /// Optional symbol filter (cancels only orders for this symbol)
272    pub symbol: Option<SmartString>,
273    /// Optional base coin filter (cancels only orders for this base coin)
274    pub base_coin: Option<SmartString>,
275    /// Optional settle coin filter (cancels only orders for this settle coin)
276    pub settle_coin: Option<SmartString>,
277}
278
279/// Request tracking information for pending orders
280#[derive(Debug, Clone)]
281struct PendingRequest {
282    /// Instrument ID associated with the request
283    pub instrument_id: InstrumentId,
284    /// Bybit product category (spot, linear, inverse, options)
285    pub category: BybitCategory,
286    /// Type of request (create, cancel, amend, batch operations)
287    pub request_type: RequestType,
288    /// Timestamp when the request was created (nanoseconds)
289    pub timestamp: u64,
290}
291
292/// Type of request being tracked
293#[derive(Debug, Clone, PartialEq)]
294enum RequestType {
295    /// Single order creation request
296    CreateOrder,
297    /// Single order cancellation request
298    CancelOrder,
299    /// Single order amendment request
300    AmendOrder,
301    /// Batch order creation request
302    BatchCreate,
303    /// Batch order cancellation request
304    BatchCancel,
305    /// Batch order amendment request
306    BatchAmend,
307}
308
309/// Bybit WebSocket Trading Client
310pub struct BybitWebSocketTrader {
311    auth: Arc<BybitAuth>,
312    testnet: bool,
313    connection_health: Arc<RwLock<ConnectionHealth>>,
314    ws_sink: Arc<AsyncRwLock<Option<WebSocketSink>>>,
315    ws_stream: Arc<AsyncRwLock<Option<WebSocketStream>>>,
316    is_connected: Arc<AtomicBool>,
317    is_authenticated: Arc<AtomicBool>,
318    reconnection_attempts: Arc<AtomicU8>,
319    clock: Clock,
320    ping_task: Arc<AsyncRwLock<Option<JoinHandle<()>>>>,
321    message_task: Arc<AsyncRwLock<Option<JoinHandle<()>>>>,
322    cleanup_task: Arc<AsyncRwLock<Option<JoinHandle<()>>>>,
323    request_counter: Arc<AtomicU64>,
324    /// Channel for authentication completion signaling
325    auth_completion_tx: Arc<AsyncRwLock<Option<oneshot::Sender<Result<()>>>>>,
326    /// Track pending requests to map responses to original instrument IDs
327    pending_requests: Arc<AsyncRwLock<FxHashMap<SmartString, PendingRequest>>>,
328    /// Track position state for position updates
329    position_state: Arc<RwLock<FxHashMap<PositionId, PositionUpdate>>>,
330
331    /// Account type (UNIFIED or CONTRACT)
332    account_type: RwLock<Option<BybitAccountType>>,
333
334    /// Instrument info cache for symbol categorization
335    instrument_cache: Arc<RwLock<FxHashMap<SmartString, InstrumentInfo>>>,
336
337    /// Instrument registry for order/instrument mapping
338    instrument_registry: Arc<dyn InstrumentRegistry>,
339}
340
341impl BybitWebSocketTrader {
342    /// Create a new Bybit WebSocket trader
343    pub fn new(
344        auth: Arc<BybitAuth>,
345        testnet: bool,
346        instrument_registry: Arc<dyn InstrumentRegistry>,
347    ) -> Self {
348        Self {
349            auth,
350            testnet,
351            connection_health: Arc::new(RwLock::new(ConnectionHealth::default())),
352            ws_sink: Arc::new(AsyncRwLock::new(None)),
353            ws_stream: Arc::new(AsyncRwLock::new(None)),
354            is_connected: Arc::new(AtomicBool::new(false)),
355            is_authenticated: Arc::new(AtomicBool::new(false)),
356            reconnection_attempts: Arc::new(AtomicU8::new(0)),
357            clock: Clock::new(),
358            ping_task: Arc::new(AsyncRwLock::new(None)),
359            message_task: Arc::new(AsyncRwLock::new(None)),
360            cleanup_task: Arc::new(AsyncRwLock::new(None)),
361            request_counter: Arc::new(AtomicU64::new(0)),
362            auth_completion_tx: Arc::new(AsyncRwLock::new(None)),
363            pending_requests: Arc::new(AsyncRwLock::new(FxHashMap::default())),
364            position_state: Arc::new(RwLock::new(FxHashMap::default())),
365            account_type: RwLock::new(None),
366            instrument_cache: Arc::new(RwLock::new(FxHashMap::default())),
367            instrument_registry,
368        }
369    }
370
371    /// Get the WebSocket URL based on testnet flag
372    const fn get_ws_url(&self) -> &'static str {
373        if self.testnet {
374            WS_TESTNET_URL
375        } else {
376            WS_MAINNET_URL
377        }
378    }
379
380    /// Get connection health status
381    pub fn get_connection_health(&self) -> ConnectionHealth {
382        self.connection_health.read().clone()
383    }
384
385    /// Check if connected and authenticated
386    pub fn is_ready(&self) -> bool {
387        self.is_connected.load(Ordering::Relaxed) && self.is_authenticated.load(Ordering::Relaxed)
388    }
389
390    /// Connect to Bybit WebSocket trading endpoint
391    pub async fn connect(&self, report_tx: Sender<ExecutionReport>) -> Result<()> {
392        let url = self.get_ws_url();
393        info!("Connecting to Bybit WebSocket: {url}");
394
395        // Create WebSocket connection
396        let config = WebSocketConfig::builder(Exchange::Bybit, url.to_string())
397            .connect_timeout(Duration::from_secs(30))
398            .timeout(Duration::from_secs(10))
399            .heartbeat(30000, 60000, 3)
400            .build();
401
402        let stats = new_shared_stats();
403        let connection_state = Arc::new(RwLock::new(WsConnectionState::Disconnected));
404        let mut connector =
405            WebSocketConnector::new(config, stats.clone(), connection_state.clone());
406        let (sink, stream) = connector.connect_with_retry(url).await?;
407
408        // Store connection references
409        *self.ws_sink.write().await = Some(sink);
410        *self.ws_stream.write().await = Some(stream);
411
412        self.is_connected.store(true, Ordering::Relaxed);
413        self.reconnection_attempts.store(0, Ordering::Relaxed);
414
415        // Update connection health
416        {
417            let mut health = self.connection_health.write();
418            health.is_healthy = true;
419            health.last_pong_received = self.clock.raw();
420        }
421
422        // Authenticate
423        self.authenticate().await?;
424
425        // Start ping task
426        self.start_ping_task().await;
427
428        // Start message processing task
429        self.start_message_task(report_tx).await;
430
431        // Start periodic cleanup task for old requests
432        self.start_cleanup_task().await;
433
434        info!("Successfully connected and authenticated to Bybit WebSocket");
435        Ok(())
436    }
437
438    /// Authenticate with Bybit WebSocket
439    async fn authenticate(&self) -> Result<()> {
440        let req_id = format!("auth_{}", Uuid::new_v4());
441        let auth_message = self
442            .auth
443            .create_ws_auth_message(Some(req_id.clone().into()))
444            .map_err(|e| anyhow::anyhow!("Failed to create WebSocket auth message: {}", e))?;
445
446        // Create authentication completion channel
447        let (auth_tx, auth_rx) = oneshot::channel();
448
449        // Store the sender for use in message handling
450        {
451            let mut completion_tx = self.auth_completion_tx.write().await;
452            *completion_tx = Some(auth_tx);
453        }
454
455        let json_str = simd_json::to_string(&auth_message)?;
456        self.send_raw_message(json_str).await?;
457
458        debug!("Sent authentication message with req_id: {req_id}");
459
460        // Wait for authentication response with timeout
461        match tokio::time::timeout(Duration::from_secs(10), auth_rx).await {
462            Ok(Ok(Ok(()))) => {
463                self.is_authenticated.store(true, Ordering::Relaxed);
464                info!("WebSocket authentication successful");
465                Ok(())
466            }
467            Ok(Ok(Err(e))) => {
468                error!("WebSocket authentication failed: {e}");
469                Err(e)
470            }
471            Ok(Err(_)) => {
472                error!("Authentication channel was closed unexpectedly");
473                bail!("Authentication channel closed")
474            }
475            Err(_) => {
476                error!("Authentication timed out after 10 seconds");
477                bail!("Authentication timeout")
478            }
479        }
480    }
481
482    /// Send raw message to WebSocket
483    async fn send_raw_message(&self, message: String) -> Result<()> {
484        if let Some(sink) = self.ws_sink.write().await.as_mut() {
485            sink.send(Message::Text(message.into()).to_frame_view())
486                .await?;
487
488            // Update health metrics
489            {
490                let mut health = self.connection_health.write();
491                health.messages_sent += 1;
492            }
493
494            Ok(())
495        } else {
496            bail!("WebSocket sink not available")
497        }
498    }
499
500    /// Start ping task for connection health
501    async fn start_ping_task(&self) {
502        let sink = self.ws_sink.clone();
503        let health = self.connection_health.clone();
504        let clock = self.clock.clone();
505        let is_connected = self.is_connected.clone();
506
507        let task = tokio::spawn(async move {
508            let mut interval = interval(Duration::from_secs(PING_INTERVAL_SECONDS));
509
510            loop {
511                interval.tick().await;
512
513                if !is_connected.load(Ordering::Relaxed) {
514                    break;
515                }
516
517                // Send WebSocket ping
518                if let Some(sink) = sink.write().await.as_mut() {
519                    let ping_message = simd_json::json!({
520                        "req_id": format!("ping_{}", Uuid::new_v4()),
521                        "op": "ping"
522                    });
523
524                    let json_str = simd_json::to_string(&ping_message).unwrap_or_default();
525                    if let Err(e) = sink
526                        .send(Message::Text(json_str.into()).to_frame_view())
527                        .await
528                    {
529                        error!("Failed to send ping: {e}");
530                        break;
531                    }
532
533                    // Update health
534                    {
535                        let mut health_guard = health.write();
536                        health_guard.last_ping_sent = clock.raw();
537                    }
538                }
539            }
540        });
541
542        *self.ping_task.write().await = Some(task);
543    }
544
545    /// Start message processing task
546    async fn start_message_task(&self, report_tx: Sender<ExecutionReport>) {
547        let stream = self.ws_stream.clone();
548        let health = self.connection_health.clone();
549        let clock = self.clock.clone();
550        let is_connected = self.is_connected.clone();
551        let auth_completion_tx = self.auth_completion_tx.clone();
552        let pending_requests = self.pending_requests.clone();
553        let position_state = self.position_state.clone();
554        let instrument_registry = self.instrument_registry.clone();
555
556        let task = tokio::spawn(async move {
557            loop {
558                if !is_connected.load(Ordering::Relaxed) {
559                    break;
560                }
561
562                if let Some(stream) = stream.write().await.as_mut() {
563                    if let Some(frame) = stream.next().await {
564                        let message = Message::from_frame_view(frame);
565
566                        // Update health
567                        {
568                            let mut health_guard = health.write();
569                            health_guard.messages_received += 1;
570                            health_guard.last_pong_received = clock.raw();
571                        }
572
573                        // Process message
574                        if let Err(e) = Self::process_message(
575                            message,
576                            &report_tx,
577                            &auth_completion_tx,
578                            &pending_requests,
579                            &position_state,
580                            &clock,
581                            &instrument_registry,
582                        )
583                        .await
584                        {
585                            error!("Failed to process message: {e}");
586                        }
587                    } else {
588                        debug!("WebSocket stream ended");
589                        break;
590                    }
591                }
592            }
593        });
594
595        *self.message_task.write().await = Some(task);
596    }
597
598    /// Process incoming WebSocket messages
599    async fn process_message(
600        message: Message,
601        report_tx: &Sender<ExecutionReport>,
602        auth_completion_tx: &Arc<AsyncRwLock<Option<oneshot::Sender<Result<()>>>>>,
603        pending_requests: &Arc<AsyncRwLock<FxHashMap<SmartString, PendingRequest>>>,
604        position_state: &Arc<RwLock<FxHashMap<PositionId, PositionUpdate>>>,
605        clock: &Clock,
606        instrument_registry: &Arc<dyn InstrumentRegistry>,
607    ) -> Result<()> {
608        match message {
609            Message::Text(text) => {
610                debug!("Received message: {text}");
611
612                // Parse JSON - text is already UTF-8 validated by WebSocket protocol
613                let mut json_str = text.clone();
614                // SAFETY: json_str comes from WebSocket text frame which is UTF-8 validated
615                let json: JsonValue = unsafe { simd_json::from_str(&mut json_str)? };
616
617                // Handle different message types
618                if let Some(op) = json.get("op").and_then(|v| v.as_str()) {
619                    match op {
620                        "auth" => {
621                            Self::handle_auth_response(&json, auth_completion_tx).await?;
622                        }
623                        "pong" => {
624                            debug!("Received pong response");
625                        }
626                        "order.create" | "order.cancel" | "order.amend" => {
627                            Self::handle_order_response(
628                                &json,
629                                report_tx,
630                                pending_requests,
631                                clock,
632                                instrument_registry,
633                            )
634                            .await?;
635                        }
636                        "order.create-batch" | "order.cancel-batch" | "order.amend-batch" => {
637                            Self::handle_batch_response(
638                                &json,
639                                report_tx,
640                                pending_requests,
641                                clock,
642                                instrument_registry,
643                            )
644                            .await?;
645                        }
646                        _ => {
647                            debug!("Unknown operation: {op}");
648                        }
649                    }
650                } else if let Some(topic) = json.get("topic").and_then(|v| v.as_str()) {
651                    match topic {
652                        "order" => {
653                            Self::handle_order_update(&json, report_tx, clock, instrument_registry)
654                                .await?;
655                        }
656                        "position" => {
657                            Self::handle_position_update(
658                                &json,
659                                position_state,
660                                Some(report_tx),
661                                clock,
662                            )
663                            .await?;
664                        }
665                        _ => {
666                            debug!("Unknown topic: {topic}");
667                        }
668                    }
669                }
670            }
671            Message::Binary(data) => {
672                debug!("Received binary message: {} bytes", data.len());
673            }
674            Message::Ping(data) => {
675                debug!("Received ping: {} bytes", data.len());
676            }
677            Message::Pong(data) => {
678                debug!("Received pong: {} bytes", data.len());
679            }
680            Message::Close(frame) => {
681                warn!("Received close frame: {frame:?}");
682            }
683            Message::Frame(_) => {
684                debug!("Received raw frame message");
685            }
686        }
687
688        Ok(())
689    }
690
691    /// Handle authentication response
692    async fn handle_auth_response(
693        json: &JsonValue,
694        auth_completion_tx: &Arc<AsyncRwLock<Option<oneshot::Sender<Result<()>>>>>,
695    ) -> Result<()> {
696        let result = if let Some(success) = json
697            .get("success")
698            .and_then(simd_json::prelude::ValueAsScalar::as_bool)
699        {
700            if success {
701                info!("WebSocket authentication successful");
702                Ok(())
703            } else {
704                let msg = json
705                    .get("ret_msg")
706                    .and_then(|v| v.as_str())
707                    .unwrap_or("Unknown error");
708                error!("WebSocket authentication failed: {msg}");
709                Err(anyhow::anyhow!("Authentication failed: {}", msg))
710            }
711        } else {
712            Err(anyhow::anyhow!("Invalid authentication response format"))
713        };
714
715        // Signal authentication completion
716        if let Some(tx) = auth_completion_tx.write().await.take() {
717            let signal_result = match &result {
718                Ok(()) => Ok(()),
719                Err(e) => Err(anyhow::anyhow!("{}", e)),
720            };
721            let _ = tx.send(signal_result);
722        }
723
724        result
725    }
726
727    /// Handle order response with request tracking
728    async fn handle_order_response(
729        json: &JsonValue,
730        report_tx: &Sender<ExecutionReport>,
731        pending_requests: &Arc<AsyncRwLock<FxHashMap<SmartString, PendingRequest>>>,
732        clock: &Clock,
733        instrument_registry: &Arc<dyn InstrumentRegistry>,
734    ) -> Result<()> {
735        debug!("Processing order response: {json:?}");
736
737        // Extract request ID to find tracked request
738        let req_id: SmartString = json
739            .get("req_id")
740            .and_then(|v| v.as_str())
741            .map_or_else(|| "unknown".into(), std::convert::Into::into);
742
743        // Look up the pending request
744        let pending_request = {
745            let pending = pending_requests.read().await;
746            pending.get(&req_id).cloned()
747        };
748
749        // Extract order information and create execution report
750        if let Some(ret_code) = json
751            .get("retCode")
752            .and_then(simd_json::prelude::ValueAsScalar::as_i64)
753        {
754            let success = ret_code == 0;
755            let status = if success {
756                OrderStatus::New
757            } else {
758                OrderStatus::Rejected
759            };
760
761            // Use tracked instrument ID or fallback to parsing from response
762            let instrument_id = if let Some(ref request) = pending_request {
763                request.instrument_id.clone()
764            } else {
765                // Fallback: try to extract symbol from response, then normalize
766                let symbol = json
767                    .get("result")
768                    .and_then(|r| r.get("symbol"))
769                    .and_then(|v| v.as_str())
770                    .map_or_else(
771                        || "BTC/USDT".into(),
772                        |s| instrument_registry.normalize_symbol(s, Venue::Bybit),
773                    ); // Default fallback
774                InstrumentId::new(&symbol, Venue::Bybit)
775            };
776
777            let order_id: SmartString = json
778                .get("result")
779                .and_then(|r| r.get("orderId"))
780                .and_then(|v| v.as_str())
781                .unwrap_or("unknown")
782                .into();
783
784            // Cache order mapping if order was successfully created
785            if success
786                && order_id != "unknown"
787                && let Some(ref request) = pending_request
788            {
789                instrument_registry.cache_order_mapping(
790                    &order_id,
791                    request.instrument_id.clone(),
792                    None, // No metadata for now
793                );
794            }
795
796            let report = ExecutionReport {
797                id: id_generation::generate_report_id_with_uuid("bybit"),
798                order_id: order_id.clone(),
799                exchange_timestamp: json
800                    .get("time")
801                    .and_then(simd_json::prelude::ValueAsScalar::as_u64)
802                    .unwrap_or(0)
803                    * 1_000_000,
804                system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
805                instrument_id,
806                status,
807                filled_quantity: Decimal::ZERO,
808                remaining_quantity: Decimal::ZERO,
809                execution_price: None,
810                reject_reason: if success {
811                    None
812                } else {
813                    Some(
814                        json.get("retMsg")
815                            .and_then(|v| v.as_str())
816                            .unwrap_or("Unknown error")
817                            .into(),
818                    )
819                },
820                exchange_execution_id: None,
821                is_final: !success,
822            };
823
824            if let Err(e) = report_tx.try_send(report) {
825                error!("Failed to send execution report: {e}");
826            }
827
828            // Clean up completed request
829            if success || status == OrderStatus::Rejected {
830                pending_requests.write().await.remove(&req_id);
831            }
832        }
833
834        Ok(())
835    }
836
837    /// Handle batch response with request tracking
838    async fn handle_batch_response(
839        json: &JsonValue,
840        report_tx: &Sender<ExecutionReport>,
841        pending_requests: &Arc<AsyncRwLock<FxHashMap<SmartString, PendingRequest>>>,
842        clock: &Clock,
843        instrument_registry: &Arc<dyn InstrumentRegistry>,
844    ) -> Result<()> {
845        debug!("Processing batch response: {json:?}");
846
847        // Extract request ID to find tracked request for default instrument info
848        let req_id: SmartString = json
849            .get("req_id")
850            .and_then(|v| v.as_str())
851            .map_or_else(|| "unknown".into(), std::convert::Into::into);
852
853        // Look up the pending request for category information
854        let pending_request = {
855            let pending = pending_requests.read().await;
856            pending.get(&req_id).cloned()
857        };
858
859        // Handle batch responses - each order in the batch gets its own execution report
860        if let Some(result) = json.get("result") {
861            if let Some(list) = result.get("list").and_then(|v| v.as_array()) {
862                for (i, order) in list.iter().enumerate() {
863                    // Use symbol from response if available, otherwise fallback to tracked request
864                    let symbol = order
865                        .get("symbol")
866                        .and_then(|v| v.as_str())
867                        .map(|s| instrument_registry.normalize_symbol(s, Venue::Bybit))
868                        .unwrap_or_else(|| {
869                            if let Some(ref request) = pending_request {
870                                request.instrument_id.symbol.clone()
871                            } else {
872                                // Use normalized fallback instead of UNKNOWN
873                                instrument_registry.normalize_symbol("BTCUSDT", Venue::Bybit)
874                            }
875                        });
876
877                    let report = ExecutionReport {
878                        id: id_generation::generate_batch_id("bybit", i),
879                        order_id: order
880                            .get("orderId")
881                            .and_then(|v| v.as_str())
882                            .unwrap_or("unknown")
883                            .into(),
884                        exchange_timestamp: order
885                            .get("createAt")
886                            .and_then(simd_json::prelude::ValueAsScalar::as_u64)
887                            .unwrap_or(0)
888                            * 1_000_000,
889                        system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
890                        instrument_id: InstrumentId::new(&symbol, Venue::Bybit),
891                        status: OrderStatus::New,
892                        filled_quantity: Decimal::ZERO,
893                        remaining_quantity: Decimal::ZERO,
894                        execution_price: None,
895                        reject_reason: None,
896                        exchange_execution_id: order
897                            .get("orderId")
898                            .and_then(|v| v.as_str())
899                            .map(std::convert::Into::into),
900                        is_final: false,
901                    };
902
903                    if let Err(e) = report_tx.try_send(report) {
904                        error!("Failed to send batch execution report: {e}");
905                    }
906                }
907            }
908
909            // Clean up completed batch request
910            pending_requests.write().await.remove(&req_id);
911        }
912
913        Ok(())
914    }
915
916    /// Handle order update
917    async fn handle_order_update(
918        json: &JsonValue,
919        report_tx: &Sender<ExecutionReport>,
920        clock: &Clock,
921        instrument_registry: &Arc<dyn InstrumentRegistry>,
922    ) -> Result<()> {
923        debug!("Processing order update: {json:?}");
924
925        if let Some(data) = json.get("data").and_then(|v| v.as_array()) {
926            for order in data {
927                let status = Self::map_order_status(
928                    order
929                        .get("orderStatus")
930                        .and_then(|v| v.as_str())
931                        .unwrap_or("Unknown"),
932                );
933
934                let report = ExecutionReport {
935                    id: id_generation::generate_report_id_with_uuid("bybit_update"),
936                    order_id: order
937                        .get("orderId")
938                        .and_then(|v| v.as_str())
939                        .unwrap_or("unknown")
940                        .into(),
941                    exchange_timestamp: order
942                        .get("updatedTime")
943                        .and_then(simd_json::prelude::ValueAsScalar::as_u64)
944                        .unwrap_or(0)
945                        * 1_000_000,
946                    system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
947                    instrument_id: InstrumentId::new(
948                        order.get("symbol").and_then(|v| v.as_str()).map_or_else(
949                            || instrument_registry.normalize_symbol("BTCUSDT", Venue::Bybit),
950                            |s| instrument_registry.normalize_symbol(s, Venue::Bybit),
951                        ),
952                        Venue::Bybit,
953                    ),
954                    status,
955                    filled_quantity: order
956                        .get("cumExecQty")
957                        .and_then(|v| v.as_str())
958                        .and_then(|s| s.parse().ok())
959                        .unwrap_or(Decimal::ZERO),
960                    remaining_quantity: order
961                        .get("leavesQty")
962                        .and_then(|v| v.as_str())
963                        .and_then(|s| s.parse().ok())
964                        .unwrap_or(Decimal::ZERO),
965                    execution_price: order
966                        .get("avgPrice")
967                        .and_then(|v| v.as_str())
968                        .and_then(|s| s.parse().ok()),
969                    reject_reason: None,
970                    exchange_execution_id: order
971                        .get("orderId")
972                        .and_then(|v| v.as_str())
973                        .map(std::convert::Into::into),
974                    is_final: matches!(
975                        status,
976                        OrderStatus::Filled | OrderStatus::Cancelled | OrderStatus::Rejected
977                    ),
978                };
979
980                if let Err(e) = report_tx.try_send(report) {
981                    error!("Failed to send order update report: {e}");
982                }
983            }
984        }
985
986        Ok(())
987    }
988
989    /// Handle position update
990    async fn handle_position_update(
991        json: &JsonValue,
992        position_state: &Arc<RwLock<FxHashMap<PositionId, PositionUpdate>>>,
993        report_tx: Option<&Sender<ExecutionReport>>,
994        clock: &Clock,
995    ) -> Result<()> {
996        debug!("Processing position update: {json:?}");
997
998        // Parse position updates from the data array
999        if let Some(data) = json.get("data").and_then(|v| v.as_array()) {
1000            let timestamp_ns = clock.raw();
1001
1002            for position_json in data {
1003                if let Some(position_update) =
1004                    Self::parse_position_update(position_json, timestamp_ns)
1005                {
1006                    // Update position state
1007                    {
1008                        let mut state = position_state.write();
1009                        state.insert(position_update.position_id, position_update.clone());
1010                    }
1011
1012                    // Log position update
1013                    info!(
1014                        "Position update: {} {} {} @ {}",
1015                        position_update.symbol,
1016                        position_update.side,
1017                        position_update.amount,
1018                        position_update.entry_price
1019                    );
1020
1021                    // Send position update via execution report if channel is available
1022                    if let Some(tx) = report_tx {
1023                        let report = ExecutionReport {
1024                            id: id_generation::generate_report_id_with_uuid("bybit_position"),
1025                            order_id: SmartString::default(), // Position updates don't have order IDs
1026                            exchange_timestamp: timestamp_ns,
1027                            system_timestamp: timestamp_ns,
1028                            instrument_id: InstrumentId::new(
1029                                position_update.symbol.clone(),
1030                                Venue::Bybit,
1031                            ),
1032                            status: OrderStatus::New, // Use a placeholder status for position updates
1033                            filled_quantity: position_update.amount,
1034                            remaining_quantity: Decimal::ZERO, // Position updates don't have remaining quantity
1035                            execution_price: Some(position_update.entry_price),
1036                            reject_reason: None,
1037                            exchange_execution_id: Some(
1038                                format!("position_{}", position_update.position_id).into(),
1039                            ),
1040                            is_final: false,
1041                        };
1042
1043                        if let Err(e) = tx.try_send(report) {
1044                            error!("Failed to send position update report: {e}");
1045                        }
1046                    }
1047                }
1048            }
1049        }
1050
1051        Ok(())
1052    }
1053
1054    /// Parse Bybit position update from JSON
1055    fn parse_position_update(
1056        position_json: &JsonValue,
1057        timestamp_ns: u64,
1058    ) -> Option<PositionUpdate> {
1059        // Extract position fields from Bybit position JSON
1060        let symbol = position_json.get("symbol").and_then(|v| v.as_str())?;
1061        let side_str = position_json.get("side").and_then(|v| v.as_str())?;
1062        let size_str = position_json.get("size").and_then(|v| v.as_str())?;
1063        let position_value_str = position_json
1064            .get("positionValue")
1065            .and_then(|v| v.as_str())?;
1066        let entry_price_str = position_json.get("entryPrice").and_then(|v| v.as_str())?;
1067        let unrealized_pnl_str = position_json
1068            .get("unrealisedPnl")
1069            .and_then(|v| v.as_str())?;
1070        let realized_pnl_str = position_json
1071            .get("cumRealisedPnl")
1072            .and_then(|v| v.as_str())?;
1073        let margin_mode_str = position_json.get("tradeMode").and_then(|v| v.as_str())?;
1074
1075        // Parse numeric values
1076        let amount = Decimal::from_str(size_str).ok()?;
1077        let position_value = Decimal::from_str(position_value_str).ok()?;
1078        let entry_price = Decimal::from_str(entry_price_str).ok()?;
1079        let unrealized_pnl = Decimal::from_str(unrealized_pnl_str).ok()?;
1080        let realized_pnl = Decimal::from_str(realized_pnl_str).ok()?;
1081
1082        // Parse side (Bybit uses "Buy" and "Sell")
1083        let position_side = match side_str {
1084            "Buy" => PositionSide::Long,
1085            "Sell" => PositionSide::Short,
1086            _ => {
1087                debug!("Unknown position side: {side_str}");
1088                return None;
1089            }
1090        };
1091
1092        // Parse margin type
1093        let margin_type = match margin_mode_str {
1094            "cross_margin" | "0" => MarginType::Cross,
1095            "isolated_margin" | "1" => MarginType::Isolated,
1096            _ => MarginType::Cross, // Default to cross margin
1097        };
1098
1099        // Skip positions with zero amount (closed positions)
1100        if amount.is_zero() {
1101            debug!("Skipping closed position for {symbol}");
1102            return None;
1103        }
1104
1105        // Generate stable position ID
1106        let position_id = Self::generate_stable_position_id(symbol, position_side, Venue::Bybit);
1107
1108        Some(PositionUpdate {
1109            position_id,
1110            venue: Venue::Bybit,
1111            symbol: SmartString::from(symbol),
1112            side: position_side,
1113            amount,
1114            entry_price,
1115            breakeven_price: entry_price, // Bybit doesn't provide separate breakeven price
1116            unrealized_pnl,
1117            realized_pnl,
1118            margin_type,
1119            isolated_wallet: position_value, // Use position value as isolated wallet
1120            timestamp_ns,
1121        })
1122    }
1123
1124    /// Generate stable position ID based on position characteristics
1125    fn generate_stable_position_id(symbol: &str, side: PositionSide, venue: Venue) -> PositionId {
1126        // Create a deterministic UUID based on position characteristics
1127        // This ensures the same position always gets the same ID
1128        let position_key = format!("{symbol}-{side}-{venue:?}");
1129
1130        // Use stable hashing algorithm to ensure consistent IDs across restarts
1131        // Using SHA-256 truncated to ensure deterministic results across different
1132        // compiler versions and architectures (unlike FxHasher)
1133        use sha2::{Digest, Sha256};
1134
1135        // Create primary hash from position key
1136        let mut hasher = Sha256::new();
1137        hasher.update(position_key.as_bytes());
1138        let primary_hash = hasher.finalize();
1139
1140        // Create secondary hash with additional entropy for uniqueness
1141        let mut secondary_hasher = Sha256::new();
1142        secondary_hasher.update((symbol.len() as u64).to_le_bytes());
1143        secondary_hasher.update((side as u8).to_le_bytes());
1144        secondary_hasher.update((venue as u8).to_le_bytes());
1145        secondary_hasher.update(b"position_id_v1"); // Version marker for future compatibility
1146        let secondary_hash = secondary_hasher.finalize();
1147
1148        // Create 128-bit UUID from the two hashes
1149        // Take first 8 bytes from each hash to create a 16-byte UUID
1150        let mut uuid_bytes = [0u8; 16];
1151
1152        // Use first 8 bytes of primary hash
1153        uuid_bytes[0..8].copy_from_slice(&primary_hash[0..8]);
1154
1155        // Use first 8 bytes of secondary hash
1156        uuid_bytes[8..16].copy_from_slice(&secondary_hash[0..8]);
1157
1158        // Create UUID from bytes
1159        let stable_uuid = Uuid::from_bytes(uuid_bytes);
1160        PositionId::from_uuid(stable_uuid)
1161    }
1162
1163    /// Map Bybit order status to internal `OrderStatus`
1164    fn map_order_status(status: &str) -> OrderStatus {
1165        match status {
1166            "Created" | "New" => OrderStatus::New,
1167            "PartiallyFilled" => OrderStatus::PartiallyFilled,
1168            "Filled" => OrderStatus::Filled,
1169            "Cancelled" => OrderStatus::Cancelled,
1170            "Rejected" => OrderStatus::Rejected,
1171            "PendingCancel" => OrderStatus::Pending,
1172            "Deactivated" => OrderStatus::Cancelled,
1173            "Active" => OrderStatus::Open,
1174            "Untriggered" => OrderStatus::Pending,
1175            "Triggered" => OrderStatus::Open,
1176            _ => OrderStatus::Unknown,
1177        }
1178    }
1179
1180    /// Map internal `OrderSide` to Bybit order side
1181    const fn map_order_side(side: OrderSide) -> &'static str {
1182        match side {
1183            OrderSide::Buy => "Buy",
1184            OrderSide::Sell => "Sell",
1185        }
1186    }
1187
1188    /// Map internal `OrderType` to Bybit order type
1189    const fn map_order_type(order_type: OrderType) -> &'static str {
1190        match order_type {
1191            OrderType::Market => "Market",
1192            OrderType::Limit => "Limit",
1193            _ => "Limit",
1194        }
1195    }
1196
1197    /// Map internal `TimeInForce` to Bybit time in force
1198    const fn map_time_in_force(tif: TimeInForce) -> &'static str {
1199        match tif {
1200            TimeInForce::GTC => "GTC",
1201            TimeInForce::IOC => "IOC",
1202            TimeInForce::FOK => "FOK",
1203            TimeInForce::GTX => "PostOnly",
1204            _ => "GTC",
1205        }
1206    }
1207
1208    /// Track a pending request
1209    async fn track_request(
1210        &self,
1211        req_id: SmartString,
1212        instrument_id: InstrumentId,
1213        category: BybitCategory,
1214        request_type: RequestType,
1215    ) {
1216        let pending_request = PendingRequest {
1217            instrument_id,
1218            category,
1219            request_type,
1220            timestamp: self.clock.raw(),
1221        };
1222
1223        self.pending_requests
1224            .write()
1225            .await
1226            .insert(req_id, pending_request);
1227    }
1228
1229    /// Find the category for an order ID by looking through tracked requests
1230    async fn find_order_category(&self, order_id: &str) -> Option<BybitCategory> {
1231        let pending = self.pending_requests.read().await;
1232
1233        // Search through pending requests to find matching order
1234        // This is a fallback for when we don't have the req_id directly
1235        for (_, request) in pending.iter() {
1236            if matches!(
1237                request.request_type,
1238                RequestType::CreateOrder | RequestType::BatchCreate
1239            ) {
1240                // In a production system, we'd maintain order_id -> category mapping
1241                // For now, return the first create request's category as a fallback
1242                return Some(request.category.clone());
1243            }
1244        }
1245
1246        None
1247    }
1248
1249    /// Clean up old pending requests (older than 5 minutes)
1250    async fn cleanup_old_requests(&self) {
1251        let now = self.clock.raw();
1252        let max_age_ns = 5 * 60 * 1_000_000_000; // 5 minutes in nanoseconds
1253
1254        let mut pending = self.pending_requests.write().await;
1255        pending.retain(|_, request| now.saturating_sub(request.timestamp) < max_age_ns);
1256    }
1257
1258    /// Create a single order with tracking
1259    pub async fn create_order(&self, order: &BybitOrderRequest) -> Result<()> {
1260        let req_id = id_generation::generate_exchange_request_id(
1261            "bybit",
1262            "create",
1263            self.request_counter.fetch_add(1, Ordering::Relaxed),
1264        );
1265        let timestamp = BybitAuth::get_timestamp();
1266        let header = self.auth.create_ws_trading_header(timestamp);
1267
1268        // Create instrument ID from order symbol
1269        let instrument_id = InstrumentId::new(&order.symbol, Venue::Bybit);
1270
1271        // Track the request
1272        self.track_request(
1273            req_id.clone(),
1274            instrument_id,
1275            order.category.clone(),
1276            RequestType::CreateOrder,
1277        )
1278        .await;
1279
1280        let mut order_data = simd_json::json!({
1281            "category": order.category.as_str(),
1282            "symbol": order.symbol,
1283            "side": Self::map_order_side(order.side),
1284            "orderType": Self::map_order_type(order.order_type),
1285            "qty": order.quantity.to_string(),
1286            "price": order.price.map_or("0".to_string(), |p| p.to_string()),
1287            "timeInForce": Self::map_time_in_force(order.time_in_force),
1288            "orderLinkId": order.order_link_id.as_ref().map_or(String::new(), std::string::ToString::to_string),
1289            "reduceOnly": order.reduce_only.unwrap_or(false),
1290            "closeOnTrigger": order.close_on_trigger.unwrap_or(false),
1291            "positionIdx": order.position_idx.unwrap_or(0)
1292        });
1293
1294        // Add TP/SL parameters if provided
1295        if let Some(tp) = order.take_profit {
1296            order_data["takeProfit"] = simd_json::json!(tp.to_string());
1297        }
1298        if let Some(sl) = order.stop_loss {
1299            order_data["stopLoss"] = simd_json::json!(sl.to_string());
1300        }
1301        if let Some(tp_trigger_by) = &order.tp_trigger_by {
1302            order_data["tpTriggerBy"] = simd_json::json!(tp_trigger_by);
1303        }
1304        if let Some(sl_trigger_by) = &order.sl_trigger_by {
1305            order_data["slTriggerBy"] = simd_json::json!(sl_trigger_by);
1306        }
1307        if let Some(tp_limit_price) = order.tp_limit_price {
1308            order_data["tpLimitPrice"] = simd_json::json!(tp_limit_price.to_string());
1309        }
1310        if let Some(sl_limit_price) = order.sl_limit_price {
1311            order_data["slLimitPrice"] = simd_json::json!(sl_limit_price.to_string());
1312        }
1313
1314        let message = BybitWsTradingMessage {
1315            req_id,
1316            header,
1317            op: "order.create".into(),
1318            args: vec![order_data],
1319        };
1320
1321        let json_str = simd_json::to_string(&message)?;
1322        self.send_raw_message(json_str).await?;
1323
1324        Ok(())
1325    }
1326
1327    /// Create multiple orders in a batch with tracking
1328    pub async fn create_batch_orders(&self, batch: &BybitBatchOrderRequest) -> Result<()> {
1329        let req_id = id_generation::generate_exchange_request_id(
1330            "bybit",
1331            "batch_create",
1332            self.request_counter.fetch_add(1, Ordering::Relaxed),
1333        );
1334        let timestamp = BybitAuth::get_timestamp();
1335        let header = self.auth.create_ws_trading_header(timestamp);
1336
1337        // Track the batch request with the first order's symbol as representative
1338        if let Some(first_order) = batch.orders.first() {
1339            let instrument_id = InstrumentId::new(&first_order.symbol, Venue::Bybit);
1340            self.track_request(
1341                req_id.clone(),
1342                instrument_id,
1343                batch.category.clone(),
1344                RequestType::BatchCreate,
1345            )
1346            .await;
1347        }
1348
1349        let mut batch_data = Vec::new();
1350        for order in &batch.orders {
1351            let mut order_data = simd_json::json!({
1352                "category": batch.category.as_str(),
1353                "symbol": order.symbol,
1354                "side": Self::map_order_side(order.side),
1355                "orderType": Self::map_order_type(order.order_type),
1356                "qty": order.quantity.to_string(),
1357                "price": order.price.map_or("0".to_string(), |p| p.to_string()),
1358                "timeInForce": Self::map_time_in_force(order.time_in_force),
1359                "orderLinkId": order.order_link_id.as_ref().map_or(String::new(), std::string::ToString::to_string),
1360                "reduceOnly": order.reduce_only.unwrap_or(false),
1361                "closeOnTrigger": order.close_on_trigger.unwrap_or(false),
1362                "positionIdx": order.position_idx.unwrap_or(0)
1363            });
1364
1365            // Add TP/SL parameters if provided
1366            if let Some(tp) = order.take_profit {
1367                order_data["takeProfit"] = simd_json::json!(tp.to_string());
1368            }
1369            if let Some(sl) = order.stop_loss {
1370                order_data["stopLoss"] = simd_json::json!(sl.to_string());
1371            }
1372            if let Some(tp_trigger_by) = &order.tp_trigger_by {
1373                order_data["tpTriggerBy"] = simd_json::json!(tp_trigger_by);
1374            }
1375            if let Some(sl_trigger_by) = &order.sl_trigger_by {
1376                order_data["slTriggerBy"] = simd_json::json!(sl_trigger_by);
1377            }
1378            if let Some(tp_limit_price) = order.tp_limit_price {
1379                order_data["tpLimitPrice"] = simd_json::json!(tp_limit_price.to_string());
1380            }
1381            if let Some(sl_limit_price) = order.sl_limit_price {
1382                order_data["slLimitPrice"] = simd_json::json!(sl_limit_price.to_string());
1383            }
1384
1385            batch_data.push(order_data);
1386        }
1387
1388        let message = BybitWsTradingMessage {
1389            req_id,
1390            header,
1391            op: "order.create-batch".into(),
1392            args: batch_data,
1393        };
1394
1395        let json_str = simd_json::to_string(&message)?;
1396        self.send_raw_message(json_str).await?;
1397
1398        Ok(())
1399    }
1400
1401    /// Cancel a single order with tracking
1402    pub async fn cancel_order(&self, category: BybitCategory, order_id: &str) -> Result<()> {
1403        let req_id = id_generation::generate_exchange_request_id(
1404            "bybit",
1405            "cancel",
1406            self.request_counter.fetch_add(1, Ordering::Relaxed),
1407        );
1408        let timestamp = BybitAuth::get_timestamp();
1409        let header = self.auth.create_ws_trading_header(timestamp);
1410
1411        // Track the cancellation request - lookup instrument from registry
1412        let instrument_id = self
1413            .instrument_registry
1414            .lookup_by_order_id(order_id)
1415            .unwrap_or_else(|| {
1416                // Fallback: create instrument with normalized symbol from order_id
1417                let normalized_symbol = self
1418                    .instrument_registry
1419                    .normalize_symbol(order_id, Venue::Bybit);
1420                InstrumentId::new(normalized_symbol, Venue::Bybit)
1421            });
1422        self.track_request(
1423            req_id.clone(),
1424            instrument_id,
1425            category.clone(),
1426            RequestType::CancelOrder,
1427        )
1428        .await;
1429
1430        let cancel_data = simd_json::json!({
1431            "category": category.as_str(),
1432            "orderId": order_id
1433        });
1434
1435        let message = BybitWsTradingMessage {
1436            req_id,
1437            header,
1438            op: "order.cancel".into(),
1439            args: vec![cancel_data],
1440        };
1441
1442        let json_str = simd_json::to_string(&message)?;
1443        self.send_raw_message(json_str).await?;
1444
1445        Ok(())
1446    }
1447
1448    /// Cancel multiple orders in a batch with tracking
1449    pub async fn cancel_batch_orders(
1450        &self,
1451        category: BybitCategory,
1452        order_ids: &[&str],
1453    ) -> Result<()> {
1454        let req_id = id_generation::generate_exchange_request_id(
1455            "bybit",
1456            "batch_cancel",
1457            self.request_counter.fetch_add(1, Ordering::Relaxed),
1458        );
1459        let timestamp = BybitAuth::get_timestamp();
1460        let header = self.auth.create_ws_trading_header(timestamp);
1461
1462        // Track the batch cancellation request - use first order for tracking
1463        let instrument_id = if let Some(first_order_id) = order_ids.first() {
1464            self.instrument_registry
1465                .lookup_by_order_id(first_order_id)
1466                .unwrap_or_else(|| {
1467                    let normalized_symbol = self
1468                        .instrument_registry
1469                        .normalize_symbol("BTCUSDT", Venue::Bybit);
1470                    InstrumentId::new(&normalized_symbol, Venue::Bybit)
1471                })
1472        } else {
1473            // Fallback for empty batch - use default symbol
1474            let normalized_symbol = self
1475                .instrument_registry
1476                .normalize_symbol("BTCUSDT", Venue::Bybit);
1477            InstrumentId::new(&normalized_symbol, Venue::Bybit)
1478        };
1479        self.track_request(
1480            req_id.clone(),
1481            instrument_id,
1482            category.clone(),
1483            RequestType::BatchCancel,
1484        )
1485        .await;
1486
1487        let mut batch_data = Vec::new();
1488        for order_id in order_ids {
1489            let cancel_data = simd_json::json!({
1490                "category": category.as_str(),
1491                "orderId": order_id
1492            });
1493            batch_data.push(cancel_data);
1494        }
1495
1496        let message = BybitWsTradingMessage {
1497            req_id,
1498            header,
1499            op: "order.cancel-batch".into(),
1500            args: batch_data,
1501        };
1502
1503        let json_str = simd_json::to_string(&message)?;
1504        self.send_raw_message(json_str).await?;
1505
1506        Ok(())
1507    }
1508
1509    /// Amend an existing order
1510    pub async fn amend_order(&self, request: &BybitAmendOrderRequest) -> Result<()> {
1511        let req_id = id_generation::generate_exchange_request_id(
1512            "bybit",
1513            "amend",
1514            self.request_counter.fetch_add(1, Ordering::Relaxed),
1515        );
1516        let timestamp = BybitAuth::get_timestamp();
1517        let header = self.auth.create_ws_trading_header(timestamp);
1518
1519        // Build amend data
1520        let mut amend_data = simd_json::json!({
1521            "category": request.category.as_str(),
1522            "symbol": request.symbol,
1523        });
1524
1525        // Add order identifier (either orderId or orderLinkId required)
1526        if let Some(order_id) = &request.order_id {
1527            amend_data["orderId"] = simd_json::json!(order_id);
1528        } else if let Some(order_link_id) = &request.order_link_id {
1529            amend_data["orderLinkId"] = simd_json::json!(order_link_id);
1530        } else {
1531            bail!("Either orderId or orderLinkId must be provided");
1532        }
1533
1534        // Add optional amendment fields
1535        if let Some(qty) = request.qty {
1536            amend_data["qty"] = simd_json::json!(qty.to_string());
1537        }
1538        if let Some(price) = request.price {
1539            amend_data["price"] = simd_json::json!(price.to_string());
1540        }
1541        if let Some(tp) = request.take_profit {
1542            amend_data["takeProfit"] = simd_json::json!(tp.to_string());
1543        }
1544        if let Some(sl) = request.stop_loss {
1545            amend_data["stopLoss"] = simd_json::json!(sl.to_string());
1546        }
1547        if let Some(tp_trigger_by) = &request.tp_trigger_by {
1548            amend_data["tpTriggerBy"] = simd_json::json!(tp_trigger_by);
1549        }
1550        if let Some(sl_trigger_by) = &request.sl_trigger_by {
1551            amend_data["slTriggerBy"] = simd_json::json!(sl_trigger_by);
1552        }
1553        if let Some(trigger_price) = request.trigger_price {
1554            amend_data["triggerPrice"] = simd_json::json!(trigger_price.to_string());
1555        }
1556        if let Some(tp_limit_price) = request.tp_limit_price {
1557            amend_data["tpLimitPrice"] = simd_json::json!(tp_limit_price.to_string());
1558        }
1559        if let Some(sl_limit_price) = request.sl_limit_price {
1560            amend_data["slLimitPrice"] = simd_json::json!(sl_limit_price.to_string());
1561        }
1562
1563        let message = BybitWsTradingMessage {
1564            req_id,
1565            header,
1566            op: "order.amend".into(),
1567            args: vec![amend_data],
1568        };
1569
1570        let json_str = simd_json::to_string(&message)?;
1571        self.send_raw_message(json_str).await?;
1572
1573        Ok(())
1574    }
1575
1576    /// Cancel all orders for a category
1577    pub async fn cancel_all_orders_internal(&self, request: &BybitCancelAllRequest) -> Result<()> {
1578        let req_id = id_generation::generate_exchange_request_id(
1579            "bybit",
1580            "cancel_all",
1581            self.request_counter.fetch_add(1, Ordering::Relaxed),
1582        );
1583        let timestamp = BybitAuth::get_timestamp();
1584        let header = self.auth.create_ws_trading_header(timestamp);
1585
1586        // Build cancel all data
1587        let mut cancel_data = simd_json::json!({
1588            "category": request.category.as_str(),
1589        });
1590
1591        // Add optional filters
1592        if let Some(symbol) = &request.symbol {
1593            cancel_data["symbol"] = simd_json::json!(symbol);
1594        }
1595        if let Some(base_coin) = &request.base_coin {
1596            cancel_data["baseCoin"] = simd_json::json!(base_coin);
1597        }
1598        if let Some(settle_coin) = &request.settle_coin {
1599            cancel_data["settleCoin"] = simd_json::json!(settle_coin);
1600        }
1601
1602        let message = BybitWsTradingMessage {
1603            req_id,
1604            header,
1605            op: "order.cancel-all".into(),
1606            args: vec![cancel_data],
1607        };
1608
1609        let json_str = simd_json::to_string(&message)?;
1610        self.send_raw_message(json_str).await?;
1611
1612        Ok(())
1613    }
1614
1615    /// Find order info (category and symbol) from order ID
1616    async fn find_order_info(&self, order_id: &str) -> Option<(BybitCategory, SmartString)> {
1617        let pending = self.pending_requests.read().await;
1618        for (_, request) in pending.iter() {
1619            // Try to match based on instrument symbol or order_id
1620            if request.instrument_id.symbol.as_str() == order_id {
1621                return Some((
1622                    request.category.clone(),
1623                    request.instrument_id.symbol.clone(),
1624                ));
1625            }
1626        }
1627        None
1628    }
1629
1630    /// Send JSON ping to test connection
1631    pub async fn send_ping(&self) -> Result<()> {
1632        let ping_message = simd_json::json!({
1633            "req_id": format!("ping_{}", Uuid::new_v4()),
1634            "op": "ping"
1635        });
1636
1637        let json_str = simd_json::to_string(&ping_message)?;
1638        self.send_raw_message(json_str).await?;
1639
1640        Ok(())
1641    }
1642
1643    /// Disconnect from WebSocket
1644    pub async fn disconnect(&self) -> Result<()> {
1645        self.is_connected.store(false, Ordering::Relaxed);
1646        self.is_authenticated.store(false, Ordering::Relaxed);
1647
1648        // Stop tasks
1649        if let Some(task) = self.ping_task.write().await.take() {
1650            task.abort();
1651        }
1652        if let Some(task) = self.message_task.write().await.take() {
1653            task.abort();
1654        }
1655        if let Some(task) = self.cleanup_task.write().await.take() {
1656            task.abort();
1657        }
1658
1659        // Close WebSocket connection
1660        if let Some(sink) = self.ws_sink.write().await.take() {
1661            drop(sink);
1662        }
1663        if let Some(stream) = self.ws_stream.write().await.take() {
1664            drop(stream);
1665        }
1666
1667        // Update health
1668        {
1669            let mut health = self.connection_health.write();
1670            health.is_healthy = false;
1671        }
1672
1673        info!("Disconnected from Bybit WebSocket");
1674        Ok(())
1675    }
1676}
1677
1678#[async_trait]
1679impl crate::execution_engine::Exchange for BybitWebSocketTrader {
1680    fn venue(&self) -> Venue {
1681        Venue::Bybit
1682    }
1683
1684    async fn place_order(
1685        &self,
1686        order: Order,
1687        report_sender: Sender<crate::execution_engine::ExecutionReport>,
1688    ) -> Result<()> {
1689        // Check if connected
1690        if !self.is_connected().await {
1691            bail!("WebSocket not connected to Bybit");
1692        }
1693
1694        // Clone order fields before moving order to convert_order
1695        let order_id = order.id;
1696        let symbol = order.symbol.clone();
1697        let venue = order.venue;
1698        let quantity = order.quantity;
1699
1700        // Convert Order to BybitOrderRequest
1701        let bybit_order = self.convert_order(order)?;
1702
1703        // Send immediate acknowledgment for order placement
1704        let ack_report = ExecutionReport {
1705            id: Uuid::new_v4().to_string().into(),
1706            order_id: order_id.to_string().into(),
1707            exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1708            system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1709            instrument_id: InstrumentId::new(symbol, venue),
1710            status: OrderStatus::New,
1711            filled_quantity: Decimal::ZERO,
1712            remaining_quantity: quantity,
1713            execution_price: None,
1714            reject_reason: None,
1715            exchange_execution_id: None,
1716            is_final: false,
1717        };
1718
1719        if let Err(e) = report_sender.send_async(ack_report).await {
1720            warn!("Failed to send order placement acknowledgment: {e}");
1721        }
1722
1723        // Send order via WebSocket
1724        self.create_order(&bybit_order).await
1725    }
1726
1727    async fn cancel_order(
1728        &self,
1729        order_id: SmartString,
1730        report_sender: Sender<crate::execution_engine::ExecutionReport>,
1731    ) -> Result<()> {
1732        // Check if connected
1733        if !self.is_connected().await {
1734            bail!("WebSocket not connected to Bybit");
1735        }
1736
1737        // Send immediate acknowledgment for order cancellation request
1738        let cancel_ack_report = ExecutionReport {
1739            id: Uuid::new_v4().to_string().into(),
1740            order_id: order_id.clone(),
1741            exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1742            system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1743            instrument_id: self
1744                .instrument_registry
1745                .lookup_by_order_id(&order_id)
1746                .unwrap_or_else(|| {
1747                    // Fallback: create instrument with normalized symbol
1748                    let normalized_symbol = self
1749                        .instrument_registry
1750                        .normalize_symbol(&order_id, Venue::Bybit);
1751                    InstrumentId::new(normalized_symbol, Venue::Bybit)
1752                }),
1753            status: OrderStatus::Pending,
1754            filled_quantity: Decimal::ZERO,
1755            remaining_quantity: Decimal::ZERO,
1756            execution_price: None,
1757            reject_reason: None,
1758            exchange_execution_id: None,
1759            is_final: false,
1760        };
1761
1762        if let Err(e) = report_sender.send_async(cancel_ack_report).await {
1763            warn!("Failed to send order cancellation acknowledgment: {e}");
1764        }
1765
1766        // Cancel via WebSocket
1767        // Try to find the category from tracked requests, fallback to Spot
1768        let category = self
1769            .find_order_category(&order_id)
1770            .await
1771            .unwrap_or(BybitCategory::Spot);
1772
1773        self.cancel_order(category, &order_id).await
1774    }
1775
1776    async fn modify_order(
1777        &self,
1778        order_id: SmartString,
1779        new_price: Option<Decimal>,
1780        new_quantity: Option<Decimal>,
1781        report_sender: Sender<crate::execution_engine::ExecutionReport>,
1782    ) -> Result<()> {
1783        // Check if connected
1784        if !self.is_connected().await {
1785            bail!("WebSocket not connected to Bybit");
1786        }
1787
1788        // Send immediate acknowledgment for order modification request
1789        let modify_ack_report = ExecutionReport {
1790            id: Uuid::new_v4().to_string().into(),
1791            order_id: order_id.clone(),
1792            exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1793            system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1794            instrument_id: self
1795                .instrument_registry
1796                .lookup_by_order_id(&order_id)
1797                .unwrap_or_else(|| {
1798                    // Fallback: create instrument with normalized symbol
1799                    let normalized_symbol = self
1800                        .instrument_registry
1801                        .normalize_symbol(&order_id, Venue::Bybit);
1802                    InstrumentId::new(normalized_symbol, Venue::Bybit)
1803                }),
1804            status: OrderStatus::Pending,
1805            filled_quantity: Decimal::ZERO,
1806            remaining_quantity: new_quantity.unwrap_or(Decimal::ZERO),
1807            execution_price: new_price,
1808            reject_reason: None,
1809            exchange_execution_id: None,
1810            is_final: false,
1811        };
1812
1813        if let Err(e) = report_sender.send_async(modify_ack_report).await {
1814            warn!("Failed to send order modification acknowledgment: {e}");
1815        }
1816
1817        // Find order category and symbol
1818        let (category, symbol) = self
1819            .find_order_info(&order_id)
1820            .await
1821            .ok_or_else(|| anyhow::anyhow!("Order not found in tracking"))?;
1822
1823        // Create amendment request
1824        let amend_request = BybitAmendOrderRequest {
1825            category,
1826            symbol,
1827            order_id: Some(order_id.clone()),
1828            order_link_id: None,
1829            qty: new_quantity,
1830            price: new_price,
1831            take_profit: None,
1832            stop_loss: None,
1833            tp_trigger_by: None,
1834            sl_trigger_by: None,
1835            trigger_price: None,
1836            tp_limit_price: None,
1837            sl_limit_price: None,
1838        };
1839
1840        self.amend_order(&amend_request).await
1841    }
1842
1843    async fn cancel_all_orders(
1844        &self,
1845        instrument_id: Option<InstrumentId>,
1846        report_sender: Sender<crate::execution_engine::ExecutionReport>,
1847    ) -> Result<()> {
1848        // Check if connected
1849        if !self.is_connected().await {
1850            bail!("WebSocket not connected to Bybit");
1851        }
1852
1853        // Clone instrument_id for use in both places
1854        let instrument_id_for_report = instrument_id.clone();
1855
1856        // Send immediate acknowledgment for cancel all request
1857        // Note: Individual order cancellations will come through WebSocket stream
1858        let cancel_all_ack_report = ExecutionReport {
1859            id: Uuid::new_v4().to_string().into(),
1860            order_id: "CANCEL_ALL".into(),
1861            exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1862            system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1863            instrument_id: instrument_id_for_report
1864                .unwrap_or_else(|| InstrumentId::new("ALL", rusty_model::venues::Venue::Bybit)),
1865            status: OrderStatus::New, // Indicating the cancel-all request was accepted
1866            filled_quantity: Decimal::ZERO,
1867            remaining_quantity: Decimal::ZERO,
1868            execution_price: None,
1869            reject_reason: None,
1870            exchange_execution_id: None,
1871            is_final: true, // Cancel-all is a one-time operation
1872        };
1873
1874        if let Err(e) = report_sender.send_async(cancel_all_ack_report).await {
1875            warn!("Failed to send cancel-all acknowledgment: {e}");
1876        }
1877
1878        // Create cancel all request
1879        let cancel_request = BybitCancelAllRequest {
1880            category: BybitCategory::Spot, // Default to spot, could be made configurable
1881            symbol: instrument_id.map(|id| id.symbol),
1882            base_coin: None,
1883            settle_coin: None,
1884        };
1885
1886        self.cancel_all_orders_internal(&cancel_request).await
1887    }
1888
1889    async fn get_order_status(&self, order_id: &str) -> Result<OrderStatus> {
1890        // Bybit WebSocket doesn't support order queries - must use REST API
1891        bail!("Order status query not supported via WebSocket - use REST API")
1892    }
1893
1894    async fn connect(
1895        &self,
1896        report_sender: Sender<crate::execution_engine::ExecutionReport>,
1897    ) -> Result<()> {
1898        // Use the proper report_sender channel instead of creating a dummy one
1899        // Call the inherent method directly to avoid recursion
1900        Self::connect(self, report_sender).await
1901    }
1902
1903    async fn disconnect(&self) -> Result<()> {
1904        self.disconnect().await
1905    }
1906
1907    async fn is_connected(&self) -> bool {
1908        self.is_connected.load(Ordering::Relaxed)
1909    }
1910
1911    async fn get_instruments(&self) -> Result<SmallVec<[InstrumentId; 32]>> {
1912        // Instrument discovery not supported via WebSocket
1913        bail!("Instrument discovery not supported via WebSocket - use REST API")
1914    }
1915
1916    async fn send_fix_message(&self, _message: Vec<u8>) -> Result<()> {
1917        anyhow::bail!("FIX protocol not supported on Bybit WebSocket")
1918    }
1919
1920    async fn receive_fix_message(&self) -> Result<Vec<u8>> {
1921        anyhow::bail!("FIX protocol not supported on Bybit WebSocket")
1922    }
1923}
1924
1925// Helper methods for Exchange trait implementation
1926impl BybitWebSocketTrader {
1927    /// Start cleanup task for old pending requests
1928    async fn start_cleanup_task(&self) {
1929        let pending_requests = self.pending_requests.clone();
1930        let clock = self.clock.clone();
1931
1932        let task = tokio::spawn(async move {
1933            let mut interval = tokio::time::interval(Duration::from_secs(60)); // Cleanup every minute
1934
1935            loop {
1936                interval.tick().await;
1937
1938                let now = clock.raw();
1939                let max_age_ns = 5 * 60 * 1_000_000_000; // 5 minutes in nanoseconds
1940
1941                let mut pending = pending_requests.write().await;
1942                let count_before = pending.len();
1943                pending.retain(|_, request| now.saturating_sub(request.timestamp) < max_age_ns);
1944                let count_after = pending.len();
1945
1946                if count_before > count_after {
1947                    debug!(
1948                        "Cleaned up {} old pending requests",
1949                        count_before - count_after
1950                    );
1951                }
1952            }
1953        });
1954
1955        *self.cleanup_task.write().await = Some(task);
1956    }
1957
1958    /// Convert generic Order to `BybitOrderRequest`
1959    fn convert_order(&self, order: Order) -> Result<BybitOrderRequest> {
1960        Ok(BybitOrderRequest {
1961            symbol: order.symbol.clone(),
1962            side: order.side,
1963            order_type: order.order_type,
1964            quantity: order.quantity,
1965            price: order.price,
1966            time_in_force: TimeInForce::GTC, // Default to GTC
1967            category: BybitCategory::Spot,   // Default to spot
1968            order_link_id: Some(order.id.to_string().into()),
1969            reduce_only: None,
1970            close_on_trigger: None,
1971            position_idx: None,
1972            // TP/SL fields - initialize as None
1973            take_profit: None,
1974            stop_loss: None,
1975            tp_trigger_by: None,
1976            sl_trigger_by: None,
1977            tp_limit_price: None,
1978            sl_limit_price: None,
1979        })
1980    }
1981}
1982
1983#[cfg(test)]
1984mod tests {
1985    use super::*;
1986    use crate::execution_engine::Exchange;
1987    use crate::instrument_registry::create_shared_registry;
1988    use rust_decimal::Decimal;
1989    use rusty_model::enums::OrderType;
1990    use rusty_model::trading_order::Order;
1991    use rusty_model::types::ClientId;
1992    use std::str::FromStr;
1993    use std::sync::Arc;
1994
1995    #[tokio::test]
1996    async fn test_exchange_trait_implementation() {
1997        // Create a mock auth
1998        let api_key = "test_api_key".into();
1999        let api_secret = "test_api_secret".into();
2000        let auth = Arc::new(BybitAuth::new(api_key, api_secret));
2001
2002        // Create trader instance (mainnet)
2003        let registry = create_shared_registry();
2004        let trader = BybitWebSocketTrader::new(auth, false, registry);
2005
2006        // Test venue method
2007        assert_eq!(trader.venue(), Venue::Bybit);
2008
2009        // Test is_connected when not connected
2010        assert!(!trader.is_connected().await);
2011
2012        // Create test order
2013        let test_order = Order::new(
2014            Venue::Bybit,
2015            "BTCUSDT",
2016            OrderSide::Buy,
2017            OrderType::Limit,
2018            Decimal::from_str("0.001").unwrap(),
2019            Some(Decimal::from_str("50000").unwrap()),
2020            ClientId::new("test_client"),
2021        );
2022
2023        // Test that place_order returns error when not connected
2024        let (report_tx, _report_rx) = flume::bounded(100);
2025        let result = trader.place_order(test_order, report_tx).await;
2026        assert!(result.is_err());
2027        assert!(result.unwrap_err().to_string().contains("not connected"));
2028    }
2029
2030    #[test]
2031    fn test_order_status_mapping() {
2032        assert_eq!(
2033            BybitWebSocketTrader::map_order_status("New"),
2034            OrderStatus::New
2035        );
2036        assert_eq!(
2037            BybitWebSocketTrader::map_order_status("PartiallyFilled"),
2038            OrderStatus::PartiallyFilled
2039        );
2040        assert_eq!(
2041            BybitWebSocketTrader::map_order_status("Filled"),
2042            OrderStatus::Filled
2043        );
2044        assert_eq!(
2045            BybitWebSocketTrader::map_order_status("Cancelled"),
2046            OrderStatus::Cancelled
2047        );
2048        assert_eq!(
2049            BybitWebSocketTrader::map_order_status("Rejected"),
2050            OrderStatus::Rejected
2051        );
2052        assert_eq!(
2053            BybitWebSocketTrader::map_order_status("Unknown"),
2054            OrderStatus::Unknown
2055        );
2056    }
2057
2058    #[test]
2059    fn test_order_side_mapping() {
2060        assert_eq!(BybitWebSocketTrader::map_order_side(OrderSide::Buy), "Buy");
2061        assert_eq!(
2062            BybitWebSocketTrader::map_order_side(OrderSide::Sell),
2063            "Sell"
2064        );
2065    }
2066
2067    #[test]
2068    fn test_order_type_mapping() {
2069        assert_eq!(
2070            BybitWebSocketTrader::map_order_type(OrderType::Market),
2071            "Market"
2072        );
2073        assert_eq!(
2074            BybitWebSocketTrader::map_order_type(OrderType::Limit),
2075            "Limit"
2076        );
2077    }
2078
2079    #[test]
2080    fn test_time_in_force_mapping() {
2081        assert_eq!(
2082            BybitWebSocketTrader::map_time_in_force(TimeInForce::GTC),
2083            "GTC"
2084        );
2085        assert_eq!(
2086            BybitWebSocketTrader::map_time_in_force(TimeInForce::IOC),
2087            "IOC"
2088        );
2089        assert_eq!(
2090            BybitWebSocketTrader::map_time_in_force(TimeInForce::FOK),
2091            "FOK"
2092        );
2093        assert_eq!(
2094            BybitWebSocketTrader::map_time_in_force(TimeInForce::GTX),
2095            "PostOnly"
2096        );
2097    }
2098
2099    #[test]
2100    fn test_category_conversion() {
2101        assert_eq!(BybitCategory::Spot.as_str(), "spot");
2102        assert_eq!(BybitCategory::Linear.as_str(), "linear");
2103        assert_eq!(BybitCategory::Inverse.as_str(), "inverse");
2104        assert_eq!(BybitCategory::Options.as_str(), "option");
2105    }
2106
2107    #[tokio::test]
2108    async fn test_request_tracking() {
2109        // Create a mock auth
2110        let api_key = "test_api_key".into();
2111        let api_secret = "test_api_secret".into();
2112        let auth = Arc::new(BybitAuth::new(api_key, api_secret));
2113
2114        // Create trader instance (mainnet)
2115        let registry = create_shared_registry();
2116        let trader = BybitWebSocketTrader::new(auth, false, registry);
2117
2118        // Test tracking a request
2119        let req_id: SmartString = "test_request_123".into();
2120        let instrument_id = InstrumentId::new("BTCUSDT", Venue::Bybit);
2121        let category = BybitCategory::Spot;
2122
2123        trader
2124            .track_request(
2125                req_id.clone(),
2126                instrument_id.clone(),
2127                category.clone(),
2128                RequestType::CreateOrder,
2129            )
2130            .await;
2131
2132        // Verify request was tracked
2133        let pending = trader.pending_requests.read().await;
2134        assert!(pending.contains_key(&req_id));
2135
2136        let request = pending.get(&req_id).unwrap();
2137        assert_eq!(request.instrument_id, instrument_id);
2138        assert_eq!(request.category, category);
2139        assert_eq!(request.request_type, RequestType::CreateOrder);
2140
2141        // Test finding order category
2142        drop(pending);
2143        let found_category = trader.find_order_category("some_order_id").await;
2144        assert_eq!(found_category, Some(BybitCategory::Spot));
2145    }
2146
2147    #[tokio::test]
2148    async fn test_request_cleanup() {
2149        // Create a mock auth
2150        let api_key = "test_api_key".into();
2151        let api_secret = "test_api_secret".into();
2152        let auth = Arc::new(BybitAuth::new(api_key, api_secret));
2153
2154        // Create trader instance (mainnet)
2155        let registry = create_shared_registry();
2156        let trader = BybitWebSocketTrader::new(auth, false, registry);
2157
2158        // Add a request
2159        let req_id: SmartString = "test_request_456".into();
2160        let instrument_id = InstrumentId::new("ETHUSDT", Venue::Bybit);
2161
2162        trader
2163            .track_request(
2164                req_id.clone(),
2165                instrument_id,
2166                BybitCategory::Linear,
2167                RequestType::CancelOrder,
2168            )
2169            .await;
2170
2171        // Verify request exists
2172        {
2173            let pending = trader.pending_requests.read().await;
2174            assert_eq!(pending.len(), 1);
2175        }
2176
2177        // Run cleanup (should not remove recent requests)
2178        trader.cleanup_old_requests().await;
2179
2180        // Verify request still exists
2181        {
2182            let pending = trader.pending_requests.read().await;
2183            assert_eq!(pending.len(), 1);
2184        }
2185    }
2186
2187    #[test]
2188    fn test_batch_order_request() {
2189        let orders = vec![
2190            BybitOrderRequest {
2191                symbol: "BTCUSDT".into(),
2192                side: OrderSide::Buy,
2193                order_type: OrderType::Limit,
2194                quantity: Decimal::from(1),
2195                price: Some(Decimal::from(50000)),
2196                time_in_force: TimeInForce::GTC,
2197                category: BybitCategory::Spot,
2198                order_link_id: Some("test_1".into()),
2199                reduce_only: None,
2200                close_on_trigger: None,
2201                position_idx: None,
2202                take_profit: None,
2203                stop_loss: None,
2204                tp_trigger_by: None,
2205                sl_trigger_by: None,
2206                tp_limit_price: None,
2207                sl_limit_price: None,
2208            },
2209            BybitOrderRequest {
2210                symbol: "ETHUSDT".into(),
2211                side: OrderSide::Sell,
2212                order_type: OrderType::Market,
2213                quantity: Decimal::from(2),
2214                price: None,
2215                time_in_force: TimeInForce::IOC,
2216                category: BybitCategory::Spot,
2217                order_link_id: Some("test_2".into()),
2218                reduce_only: Some(false),
2219                close_on_trigger: Some(false),
2220                position_idx: Some(0),
2221                take_profit: None,
2222                stop_loss: None,
2223                tp_trigger_by: None,
2224                sl_trigger_by: None,
2225                tp_limit_price: None,
2226                sl_limit_price: None,
2227            },
2228        ];
2229
2230        let batch = BybitBatchOrderRequest {
2231            category: BybitCategory::Spot,
2232            orders,
2233        };
2234
2235        assert_eq!(batch.orders.len(), 2);
2236        assert_eq!(batch.category, BybitCategory::Spot);
2237    }
2238}