rusty_ems/exchanges/bybit/
websocket_trading.rs

1use anyhow::{Result, anyhow, bail};
2use async_trait::async_trait;
3use flume::Sender;
4use rust_decimal::Decimal;
5use simd_json::prelude::*;
6use simd_json::{json, value::owned::Value as JsonValue};
7use std::collections::VecDeque;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::time::Duration;
11
12use futures::{SinkExt, StreamExt};
13use log::{debug, error, info, warn};
14use parking_lot::RwLock;
15use quanta::Clock;
16use rusty_common::collections::FxHashMap;
17use rusty_common::websocket::Message;
18use rusty_common::websocket::connector::{WebSocketSink, WebSocketStream};
19use rusty_common::{SmartString, time};
20use rusty_model::{
21    enums::{OrderSide, OrderStatus, OrderType, TimeInForce},
22    instruments::InstrumentId,
23    trading_order::Order,
24    venues::Venue,
25};
26use smallvec::SmallVec;
27use tokio::sync::RwLock as AsyncRwLock;
28use tokio::task::JoinHandle;
29use tokio::time::interval;
30use uuid::Uuid;
31
32// Removed unused import
33use crate::exchanges::bybit_rest::{BybitAccountInfo, BybitAccountType, BybitRestClient};
34use crate::execution_engine::{Exchange, ExecutionReport};
35use rusty_common::auth::exchanges::bybit::BybitAuth;
36
37// Bybit WebSocket Trading Implementation
38//
39// Provides WebSocket trading functionality for Bybit exchange with support for:
40// - V5 unified WebSocket API
41// - Order placement, amendment, and cancellation via WebSocket
42// - Batch order operations
43// - TP/SL order management
44// - Real-time order status updates
45// - Automatic reconnection with exponential backoff
46// - Connection health monitoring
47//
48// # Features
49//
50// - **Unified Trading**: Support for spot, linear, inverse, and options
51// - **Batch Operations**: Up to 20 orders per batch request
52// - **Order Amendment**: Modify price and quantity of open orders
53// - **Account Types**: Handles both UNIFIED and CONTRACT accounts
54// - **Rate Limiting**: Built-in rate limit handling (3000 reqs/sec per IP)
55// - **Automatic Reconnection**: Configurable backoff strategy
56
57/// WebSocket API URLs for different environments
58const BYBIT_WS_TRADE_URL: &str = "wss://stream.bybit.com/v5/trade";
59const BYBIT_WS_TRADE_TESTNET_URL: &str = "wss://stream-testnet.bybit.com/v5/trade";
60const BYBIT_WS_PRIVATE_URL: &str = "wss://stream.bybit.com/v5/private";
61const BYBIT_WS_PRIVATE_TESTNET_URL: &str = "wss://stream-testnet.bybit.com/v5/private";
62
63/// Connection management constants
64const PING_INTERVAL_SECONDS: u64 = 30;
65const PONG_TIMEOUT_SECONDS: u64 = 10;
66const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024; // 10MB
67const MAX_PENDING_REQUESTS: usize = 10000;
68const MAX_RECONNECTION_ATTEMPTS: u32 = 10;
69
70/// Rate limiting constants
71const MAX_REQUESTS_PER_SECOND: u32 = 3000; // IP rate limit
72const RATE_LIMIT_WINDOW_MS: u64 = 1_000; // 1 second
73
74/// Bybit product categories
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum BybitCategory {
77    /// Spot trading category
78    Spot,
79    /// Linear futures/perpetuals category
80    Linear,
81    /// Inverse futures/perpetuals category
82    Inverse,
83    /// Options trading category
84    Option,
85}
86
87impl BybitCategory {
88    /// Parse from string representation
89    #[must_use]
90    pub fn parse_category(s: &str) -> Option<Self> {
91        match s {
92            "spot" => Some(Self::Spot),
93            "linear" => Some(Self::Linear),
94            "inverse" => Some(Self::Inverse),
95            "option" => Some(Self::Option),
96            _ => None,
97        }
98    }
99
100    /// Convert to string representation
101    #[must_use]
102    pub const fn as_str(&self) -> &'static str {
103        match self {
104            Self::Spot => "spot",
105            Self::Linear => "linear",
106            Self::Inverse => "inverse",
107            Self::Option => "option",
108        }
109    }
110
111    /// Get maximum batch size for this category
112    #[must_use]
113    pub const fn max_batch_size(&self) -> usize {
114        match self {
115            Self::Spot => 10,
116            Self::Linear => 20,
117            Self::Inverse => 20,
118            Self::Option => 20,
119        }
120    }
121}
122
123/// Instrument information cache entry
124#[derive(Debug, Clone)]
125pub struct InstrumentInfo {
126    /// Symbol identifier
127    pub symbol: SmartString,
128    /// Trading category (spot, linear, inverse, option)
129    pub category: BybitCategory,
130    /// Trading status (Trading, Settled, etc.)
131    pub status: SmartString,
132    /// Base currency or coin
133    pub base_coin: SmartString,
134    /// Quote currency or coin
135    pub quote_coin: SmartString,
136    /// Contract type for derivatives (LinearPerpetual, etc.)
137    pub contract_type: Option<SmartString>,
138    /// Timestamp when cached (nanoseconds)
139    pub cached_at: u64, // Timestamp when cached
140}
141
142impl InstrumentInfo {
143    /// Create new instrument info
144    #[must_use]
145    pub fn new(
146        symbol: SmartString,
147        category: BybitCategory,
148        status: SmartString,
149        base_coin: SmartString,
150        quote_coin: SmartString,
151        contract_type: Option<SmartString>,
152    ) -> Self {
153        Self {
154            symbol,
155            category,
156            status,
157            base_coin,
158            quote_coin,
159            contract_type,
160            cached_at: rusty_common::time::get_epoch_timestamp_ns(),
161        }
162    }
163
164    /// Check if cached entry is expired (older than 1 hour)
165    #[must_use]
166    pub fn is_expired(&self) -> bool {
167        let now = rusty_common::time::get_epoch_timestamp_ns();
168        let one_hour_ns = 3600 * 1_000_000_000;
169        now.saturating_sub(self.cached_at) > one_hour_ns
170    }
171}
172
173/// Batch order limits per category
174const MAX_BATCH_SPOT: usize = 10;
175const MAX_BATCH_LINEAR: usize = 20;
176const MAX_BATCH_INVERSE: usize = 20;
177const MAX_BATCH_OPTION: usize = 20;
178
179/// Order categories - backward compatibility constants
180const CATEGORY_SPOT: &str = "spot";
181const CATEGORY_LINEAR: &str = "linear";
182const CATEGORY_INVERSE: &str = "inverse";
183const CATEGORY_OPTION: &str = "option";
184
185/// Connection health tracking
186#[derive(Debug, Clone)]
187pub struct ConnectionHealth {
188    /// Whether the connection is healthy
189    pub is_healthy: bool,
190    /// Timestamp of last ping sent (nanoseconds)
191    pub last_ping_sent: u64,
192    /// Timestamp of last pong received (nanoseconds)
193    pub last_pong_received: u64,
194    /// Number of pending ping requests
195    pub pending_pings: u32,
196    /// Number of reconnection attempts made
197    pub reconnection_attempts: u32,
198    /// Total messages sent
199    pub messages_sent: u64,
200    /// Total messages received
201    pub messages_received: u64,
202}
203
204impl ConnectionHealth {
205    const fn new() -> Self {
206        Self {
207            is_healthy: false,
208            last_ping_sent: 0,
209            last_pong_received: 0,
210            pending_pings: 0,
211            reconnection_attempts: 0,
212            messages_sent: 0,
213            messages_received: 0,
214        }
215    }
216}
217
218/// Rate limiter for request throttling
219struct RateLimiter {
220    request_times: VecDeque<u64>,
221    clock: Clock,
222}
223
224impl RateLimiter {
225    const fn new(clock: Clock) -> Self {
226        Self {
227            request_times: VecDeque::new(),
228            clock,
229        }
230    }
231
232    fn cleanup_expired(&mut self) {
233        let now = self.clock.raw() / 1_000_000; // Convert to milliseconds
234        let window_start = now.saturating_sub(RATE_LIMIT_WINDOW_MS);
235
236        while let Some(&front) = self.request_times.front() {
237            if front < window_start {
238                self.request_times.pop_front();
239            } else {
240                break;
241            }
242        }
243    }
244
245    fn can_send_request(&mut self) -> bool {
246        self.cleanup_expired();
247        self.request_times.len() < MAX_REQUESTS_PER_SECOND as usize
248    }
249
250    fn record_request(&mut self) {
251        let now = self.clock.raw() / 1_000_000;
252        self.request_times.push_back(now);
253    }
254}
255
256/// Pending request tracking
257struct PendingRequest {
258    op_type: SmartString,
259    timestamp: u64,
260    report_sender: Sender<ExecutionReport>,
261}
262
263/// Bybit WebSocket trading client
264///
265/// Provides high-performance WebSocket trading functionality for Bybit exchange
266/// with support for V5 unified API, batch operations, and connection health monitoring.
267pub struct BybitWebsocketTrading {
268    /// Authentication handler
269    auth: Arc<BybitAuth>,
270
271    /// Optional REST client for account info and validation (V5 features)
272    rest_client: Option<Arc<BybitRestClient>>,
273
274    /// WebSocket connections
275    trade_sink: Arc<AsyncRwLock<Option<WebSocketSink>>>,
276    trade_stream: Arc<AsyncRwLock<Option<WebSocketStream>>>,
277    private_sink: Arc<AsyncRwLock<Option<WebSocketSink>>>,
278    private_stream: Arc<AsyncRwLock<Option<WebSocketStream>>>,
279
280    /// Configuration
281    account_type: SmartString,
282    testnet: bool,
283
284    /// V5 Account info cache
285    account_info: Arc<RwLock<Option<BybitAccountInfo>>>,
286
287    /// Connection state
288    is_connected: Arc<AtomicBool>,
289    is_authenticated: Arc<AtomicBool>,
290
291    /// Monitoring
292    clock: Clock,
293    health: Arc<RwLock<ConnectionHealth>>,
294    rate_limiter: Arc<RwLock<RateLimiter>>,
295
296    /// Request tracking
297    pending_requests: Arc<RwLock<FxHashMap<SmartString, PendingRequest>>>,
298
299    /// Order tracking: maps `order_id` -> symbol
300    order_symbol_map: Arc<RwLock<FxHashMap<SmartString, SmartString>>>,
301
302    /// Instrument info cache: maps symbol -> instrument info
303    instrument_cache: Arc<RwLock<FxHashMap<SmartString, InstrumentInfo>>>,
304
305    /// Task handles
306    ping_handle: Arc<RwLock<Option<JoinHandle<()>>>>,
307    message_handle: Arc<RwLock<Option<JoinHandle<()>>>>,
308
309    /// Execution report channel
310    report_sender: Arc<RwLock<Option<Sender<ExecutionReport>>>>,
311}
312
313impl BybitWebsocketTrading {
314    /// Create a new Bybit WebSocket trading instance
315    pub fn new(auth: Arc<BybitAuth>, account_type: SmartString, testnet: bool) -> Self {
316        let clock = Clock::new();
317
318        Self {
319            auth,
320            rest_client: None,
321            trade_sink: Arc::new(AsyncRwLock::new(None)),
322            trade_stream: Arc::new(AsyncRwLock::new(None)),
323            private_sink: Arc::new(AsyncRwLock::new(None)),
324            private_stream: Arc::new(AsyncRwLock::new(None)),
325            account_type,
326            testnet,
327            account_info: Arc::new(RwLock::new(None)),
328            is_connected: Arc::new(AtomicBool::new(false)),
329            is_authenticated: Arc::new(AtomicBool::new(false)),
330            clock: clock.clone(),
331            health: Arc::new(RwLock::new(ConnectionHealth::new())),
332            rate_limiter: Arc::new(RwLock::new(RateLimiter::new(clock))),
333            pending_requests: Arc::new(RwLock::new(FxHashMap::default())),
334            order_symbol_map: Arc::new(RwLock::new(FxHashMap::default())),
335            instrument_cache: Arc::new(RwLock::new(FxHashMap::default())),
336            ping_handle: Arc::new(RwLock::new(None)),
337            message_handle: Arc::new(RwLock::new(None)),
338            report_sender: Arc::new(RwLock::new(None)),
339        }
340    }
341
342    /// Get connection health status
343    #[must_use]
344    pub fn get_connection_health(&self) -> ConnectionHealth {
345        self.health.read().clone()
346    }
347
348    /// Set REST client for V5 account features (optional)
349    pub fn set_rest_client(&mut self, rest_client: Arc<BybitRestClient>) {
350        self.rest_client = Some(rest_client);
351    }
352
353    /// Get and cache account information for V5 compliance
354    pub async fn get_account_info(&self) -> Result<BybitAccountInfo> {
355        // Check cache first
356        if let Some(cached_info) = self.account_info.read().as_ref() {
357            return Ok(cached_info.clone());
358        }
359
360        // Fetch from REST API if available
361        let Some(rest_client) = &self.rest_client else {
362            bail!("REST client not available - cannot fetch account info");
363        };
364
365        let account_info = rest_client
366            .get_account_info()
367            .await
368            .map_err(|e| anyhow!("Failed to get account info: {e}"))?;
369
370        // Cache the result
371        *self.account_info.write() = Some(account_info.clone());
372
373        Ok(account_info)
374    }
375
376    /// Check if account supports V5 unified features
377    pub async fn supports_unified_features(&self) -> Result<bool> {
378        let account_info = self.get_account_info().await?;
379        Ok(account_info.supports_unified_features())
380    }
381
382    /// Check if account supports UTA 2.0 advanced features
383    pub async fn supports_uta2_features(&self) -> Result<bool> {
384        let account_info = self.get_account_info().await?;
385        Ok(account_info.supports_uta2_features())
386    }
387
388    /// Validate order parameters for account type (if REST client available)
389    async fn validate_order_for_account(&self, order: &Order, category: &str) -> Result<()> {
390        // Skip validation if REST client not available (graceful degradation)
391        if self.rest_client.is_none() {
392            debug!("REST client not available - skipping V5 account validation");
393            return Ok(());
394        }
395
396        let account_info = match self.get_account_info().await {
397            Ok(info) => info,
398            Err(e) => {
399                warn!("Failed to get account info for validation: {e} - proceeding with order");
400                return Ok(());
401            }
402        };
403
404        let account_type = account_info.get_account_type();
405
406        if let Some(account_type) = account_type {
407            // UTA 2.0: Inverse futures only support one-way mode
408            if account_type.supports_uta2_features() && category == "inverse" {
409                // For WebSocket, we assume one-way mode (positionIdx = 0)
410                // This is automatically handled in order creation
411                debug!("UTA 2.0 account: Using one-way mode for inverse futures");
412            }
413
414            // Classic account: No unified margin features
415            if account_type == BybitAccountType::Classic {
416                // WebSocket doesn't directly handle margin params, but we can log info
417                if category == "spot" {
418                    debug!("Classic account spot trading via WebSocket");
419                }
420            }
421
422            debug!("V5 Account validation passed for account type: {account_type:?}");
423        }
424
425        Ok(())
426    }
427
428    /// Map Bybit V5 order status strings to internal OrderStatus enum
429    /// Comprehensive mapping covering all V5 status values
430    fn map_v5_order_status(status_str: &str) -> OrderStatus {
431        match status_str {
432            // V5 Standard Statuses
433            "New" => OrderStatus::New,
434            "PartiallyFilled" => OrderStatus::PartiallyFilled,
435            "Filled" => OrderStatus::Filled,
436            "Cancelled" => OrderStatus::Cancelled,
437            "Rejected" => OrderStatus::Rejected,
438
439            // V5 Additional Statuses
440            "PendingCancel" => OrderStatus::PendingCancel,
441            "Untriggered" => OrderStatus::Pending, // Conditional orders waiting to trigger
442            "Triggered" => OrderStatus::New,       // Conditional order just triggered
443            "Deactivated" => OrderStatus::Cancelled, // Conditional order deactivated
444            "Active" => OrderStatus::Open,         // Order is actively working
445
446            // V5 Spot-specific Statuses
447            "PENDING" => OrderStatus::Pending, // Order pending execution
448            "OPEN" => OrderStatus::Open,       // Order is open
449            "PARTIALLY_FILLED" => OrderStatus::PartiallyFilled, // Alternative format
450            "ORDER_FILLED" => OrderStatus::Filled, // Alternative format
451            "CANCELED" => OrderStatus::Cancelled, // Alternative spelling
452            "ORDER_FAILED" => OrderStatus::Rejected, // Order failed
453
454            // V5 Futures/Perps-specific Statuses
455            "Created" => OrderStatus::New,     // Order just created
456            "Expired" => OrderStatus::Expired, // Order expired (GTD orders)
457
458            // V5 Error/Unknown Statuses
459            "Unknown" => OrderStatus::Unknown,
460            "" => OrderStatus::Unknown, // Empty status
461
462            // Fallback for any unmapped statuses
463            _ => {
464                warn!("Unknown Bybit V5 order status: '{status_str}' - mapping to Unknown");
465                OrderStatus::Unknown
466            }
467        }
468    }
469
470    /// Map internal OrderStatus to Bybit V5 execution type for reporting
471    const fn map_order_status_to_exec_type(status: OrderStatus) -> &'static str {
472        match status {
473            OrderStatus::New => "New",
474            OrderStatus::Open => "New", // Open orders reported as New in execution
475            OrderStatus::PartiallyFilled => "Trade",
476            OrderStatus::Filled => "Trade",
477            OrderStatus::Cancelled => "Canceled",
478            OrderStatus::Rejected => "Rejected",
479            OrderStatus::Expired => "Expired",
480            OrderStatus::Pending => "PendingNew",
481            OrderStatus::PendingCancel => "PendingCancel",
482            OrderStatus::Unknown => "Unknown",
483        }
484    }
485
486    /// Send WebSocket ping
487    pub async fn send_ping(&self) -> Result<()> {
488        let req_id = Uuid::new_v4().to_string();
489        let ping_msg = json!({
490            "req_id": req_id,
491            "op": "ping"
492        })
493        .to_string();
494
495        if let Some(sink) = &mut *self.trade_sink.write().await {
496            sink.send(Message::text(ping_msg).to_frame_view()).await?;
497
498            let mut health = self.health.write();
499            health.last_ping_sent = self.clock.raw();
500            health.pending_pings += 1;
501            health.messages_sent += 1;
502        } else {
503            bail!("WebSocket trade connection not established");
504        }
505
506        Ok(())
507    }
508
509    /// Authenticate WebSocket connections
510    async fn authenticate(&self) -> Result<()> {
511        let expires = time::get_timestamp_ms() + 5000;
512        let signature = self.auth.generate_ws_signature(expires)?;
513
514        let auth_msg = json!({
515            "op": "auth",
516            "args": [
517                self.auth.api_key(),
518                expires,
519                signature
520            ]
521        })
522        .to_string();
523
524        // Authenticate trade connection
525        if let Some(sink) = &mut *self.trade_sink.write().await {
526            sink.send(Message::text(auth_msg.clone()).to_frame_view())
527                .await?;
528            debug!("Sent auth message to trade connection");
529        }
530
531        // Authenticate private connection and subscribe to order updates
532        if let Some(sink) = &mut *self.private_sink.write().await {
533            sink.send(Message::text(auth_msg).to_frame_view()).await?;
534            debug!("Sent auth message to private connection");
535
536            // Subscribe to order stream
537            let subscribe_msg = json!({
538                "op": "subscribe",
539                "args": ["order"]
540            })
541            .to_string();
542
543            sink.send(Message::text(subscribe_msg).to_frame_view())
544                .await?;
545            debug!("Subscribed to order stream");
546        }
547
548        Ok(())
549    }
550
551    /// Connect and start message processing
552    async fn connect_internal(&self, report_sender: Sender<ExecutionReport>) -> Result<()> {
553        // Store the report sender
554        *self.report_sender.write() = Some(report_sender.clone());
555
556        let trade_url = if self.testnet {
557            BYBIT_WS_TRADE_TESTNET_URL
558        } else {
559            BYBIT_WS_TRADE_URL
560        };
561
562        let private_url = if self.testnet {
563            BYBIT_WS_PRIVATE_TESTNET_URL
564        } else {
565            BYBIT_WS_PRIVATE_URL
566        };
567
568        // Connect to trade WebSocket using yawc directly
569        use yawc::WebSocket;
570        let trade_parsed_url = trade_url
571            .parse()
572            .map_err(|e| anyhow::anyhow!("Invalid trade URL: {}", e))?;
573        let trade_ws = WebSocket::connect(trade_parsed_url)
574            .await
575            .map_err(|e| anyhow::anyhow!("Trade connection failed: {}", e))?;
576        let (trade_sink, trade_stream) = trade_ws.split();
577        *self.trade_sink.write().await = Some(trade_sink);
578        *self.trade_stream.write().await = Some(trade_stream);
579
580        // Connect to private WebSocket
581        let private_parsed_url = private_url
582            .parse()
583            .map_err(|e| anyhow::anyhow!("Invalid private URL: {}", e))?;
584        let private_ws = WebSocket::connect(private_parsed_url)
585            .await
586            .map_err(|e| anyhow::anyhow!("Private connection failed: {}", e))?;
587        let (private_sink, private_stream) = private_ws.split();
588        *self.private_sink.write().await = Some(private_sink);
589        *self.private_stream.write().await = Some(private_stream);
590
591        self.is_connected.store(true, Ordering::Relaxed);
592
593        // Authenticate
594        self.authenticate().await?;
595
596        // Start ping task
597        self.start_ping_task();
598
599        // Start message processing
600        self.start_message_processing();
601
602        info!("Bybit WebSocket trading connected and authenticated");
603        Ok(())
604    }
605
606    /// Start ping task to keep connection alive
607    fn start_ping_task(&self) {
608        let trade_sink = self.trade_sink.clone();
609        let health = self.health.clone();
610        let clock = self.clock.clone();
611        let is_connected = self.is_connected.clone();
612
613        let handle = tokio::spawn(async move {
614            let mut interval = interval(Duration::from_secs(PING_INTERVAL_SECONDS));
615
616            loop {
617                interval.tick().await;
618
619                if !is_connected.load(Ordering::Relaxed) {
620                    break;
621                }
622
623                // Check if we need to send ping
624                let should_send = {
625                    let h = health.read();
626                    let now = clock.raw();
627                    let time_since_last_ping = (now - h.last_ping_sent) / 1_000_000_000; // Convert to seconds
628
629                    time_since_last_ping >= PING_INTERVAL_SECONDS && h.pending_pings < 3
630                };
631
632                if should_send {
633                    let req_id = Uuid::new_v4().to_string();
634                    let ping_msg = json!({
635                        "req_id": req_id,
636                        "op": "ping"
637                    })
638                    .to_string();
639
640                    if let Some(sink) = &mut *trade_sink.write().await {
641                        if let Err(e) = sink.send(Message::text(ping_msg).to_frame_view()).await {
642                            error!("Failed to send ping: {e}");
643                            is_connected.store(false, Ordering::Relaxed);
644                            break;
645                        }
646
647                        let mut h = health.write();
648                        h.last_ping_sent = clock.raw();
649                        h.pending_pings += 1;
650                        h.messages_sent += 1;
651                    }
652                }
653
654                // Check for pong timeout
655                {
656                    let h = health.read();
657                    let now = clock.raw();
658                    let time_since_last_pong = (now - h.last_pong_received) / 1_000_000_000;
659
660                    if h.pending_pings > 0 && time_since_last_pong > PONG_TIMEOUT_SECONDS {
661                        warn!("Pong timeout detected, connection may be dead");
662                        is_connected.store(false, Ordering::Relaxed);
663                        break;
664                    }
665                }
666            }
667
668            debug!("Ping task terminated");
669        });
670
671        *self.ping_handle.write() = Some(handle);
672    }
673
674    /// Start processing incoming messages
675    fn start_message_processing(&self) {
676        let trade_stream = self.trade_stream.clone();
677        let private_stream = self.private_stream.clone();
678        let health = self.health.clone();
679        let clock = self.clock.clone();
680        let is_authenticated = self.is_authenticated.clone();
681        let is_connected = self.is_connected.clone();
682        let pending_requests = self.pending_requests.clone();
683        let report_sender = self.report_sender.clone();
684        let order_symbol_map = self.order_symbol_map.clone();
685
686        let handle = tokio::spawn(async move {
687            loop {
688                if !is_connected.load(Ordering::Relaxed) {
689                    break;
690                }
691
692                // Process trade stream messages
693                if let Some(stream) = &mut *trade_stream.write().await {
694                    if let Some(frame) = stream.next().await {
695                        let msg = Message::from_frame_view(frame);
696                        match msg {
697                            Message::Text(text) => {
698                                health.write().messages_received += 1;
699
700                                if let Err(e) = Self::process_trade_message(
701                                    &text,
702                                    &health,
703                                    &clock,
704                                    &is_authenticated,
705                                    &pending_requests,
706                                    &report_sender,
707                                    &order_symbol_map,
708                                )
709                                .await
710                                {
711                                    error!("Failed to process trade message: {e}");
712                                }
713                            }
714                            Message::Binary(_) => {
715                                warn!("Received unexpected binary message");
716                            }
717                            _ => {}
718                        }
719                    } else {
720                        warn!("Trade stream closed");
721                        is_connected.store(false, Ordering::Relaxed);
722                        break;
723                    }
724                }
725
726                // Process private stream messages
727                if let Some(stream) = &mut *private_stream.write().await {
728                    match stream.next().await {
729                        Some(frame) => {
730                            let msg = Message::from_frame_view(frame);
731                            match msg {
732                                Message::Text(text) => {
733                                    health.write().messages_received += 1;
734
735                                    if let Err(e) =
736                                        Self::process_private_message(&text, &report_sender).await
737                                    {
738                                        error!("Failed to process private message: {e}");
739                                    }
740                                }
741                                Message::Binary(_) => {
742                                    warn!("Received unexpected binary message on private stream");
743                                }
744                                _ => {}
745                            }
746                        }
747                        None => {
748                            warn!("Private stream closed");
749                        }
750                    }
751                }
752            }
753
754            debug!("Message processing task terminated");
755        });
756
757        *self.message_handle.write() = Some(handle);
758    }
759
760    /// Process messages from trade stream
761    async fn process_trade_message(
762        text: &str,
763        health: &Arc<RwLock<ConnectionHealth>>,
764        clock: &Clock,
765        is_authenticated: &Arc<AtomicBool>,
766        pending_requests: &Arc<RwLock<FxHashMap<SmartString, PendingRequest>>>,
767        report_sender: &Arc<RwLock<Option<Sender<ExecutionReport>>>>,
768        order_symbol_map: &Arc<RwLock<FxHashMap<SmartString, SmartString>>>,
769    ) -> Result<()> {
770        let mut text_bytes = text.as_bytes().to_vec();
771        let msg = simd_json::from_slice::<JsonValue>(&mut text_bytes)?;
772
773        if let Some(op) = msg.get("op").and_then(|v| v.as_str()) {
774            match op {
775                "auth" => {
776                    let ret_code = msg
777                        .get("retCode")
778                        .and_then(simd_json::prelude::ValueAsScalar::as_i64)
779                        .unwrap_or(-1);
780
781                    if ret_code == 0 {
782                        is_authenticated.store(true, Ordering::Relaxed);
783                        info!("WebSocket authentication successful");
784                    } else {
785                        let ret_msg = msg
786                            .get("retMsg")
787                            .and_then(|v| v.as_str())
788                            .unwrap_or("Unknown error");
789                        error!("WebSocket authentication failed: {ret_msg}");
790                    }
791                }
792                "ping" => {
793                    if let Some(ret_msg) = msg.get("ret_msg").and_then(|v| v.as_str())
794                        && ret_msg == "pong"
795                    {
796                        let mut h = health.write();
797                        h.last_pong_received = clock.raw();
798                        h.pending_pings = h.pending_pings.saturating_sub(1);
799                        h.is_healthy = true;
800                    }
801                }
802                "order.create" | "order.amend" | "order.cancel" => {
803                    Self::process_order_response(
804                        msg.clone(),
805                        op,
806                        pending_requests,
807                        report_sender,
808                        order_symbol_map,
809                    )
810                    .await?;
811                }
812                "order.create-batch" | "order.amend-batch" | "order.cancel-batch" => {
813                    Self::process_batch_order_response(
814                        msg.clone(),
815                        op,
816                        pending_requests,
817                        report_sender,
818                        order_symbol_map,
819                    )
820                    .await?;
821                }
822                "order.cancel-all" => {
823                    Self::process_cancel_all_response(msg, pending_requests, report_sender).await?;
824                }
825                _ => {
826                    debug!("Unhandled op type: {op}");
827                }
828            }
829        }
830
831        Ok(())
832    }
833
834    /// Process messages from private stream (order updates)
835    async fn process_private_message(
836        text: &str,
837        report_sender: &Arc<RwLock<Option<Sender<ExecutionReport>>>>,
838    ) -> Result<()> {
839        let mut text_bytes = text.as_bytes().to_vec();
840        let msg = simd_json::from_slice::<JsonValue>(&mut text_bytes)?;
841
842        if let Some(topic) = msg.get("topic").and_then(|v| v.as_str())
843            && (topic == "order" || topic.starts_with("order."))
844        {
845            Self::process_order_update(msg, report_sender).await?;
846        }
847
848        Ok(())
849    }
850
851    /// Process order response from trade stream
852    async fn process_order_response(
853        msg: JsonValue,
854        op: &str,
855        pending_requests: &Arc<RwLock<FxHashMap<SmartString, PendingRequest>>>,
856        report_sender: &Arc<RwLock<Option<Sender<ExecutionReport>>>>,
857        order_symbol_map: &Arc<RwLock<FxHashMap<SmartString, SmartString>>>,
858    ) -> Result<()> {
859        let req_id: Option<SmartString> = msg
860            .get("reqId")
861            .and_then(|v| v.as_str())
862            .map(std::convert::Into::into);
863
864        let ret_code = msg
865            .get("retCode")
866            .and_then(simd_json::prelude::ValueAsScalar::as_i64)
867            .unwrap_or(-1);
868
869        if let Some(req_id) = req_id
870            && let Some(pending) = pending_requests.write().remove(&req_id)
871        {
872            if ret_code == 0 {
873                // Success - order accepted
874                if let Some(data) = msg.get("data") {
875                    let order_id = data.get("orderId").and_then(|v| v.as_str()).unwrap_or("");
876                    let client_order_id = data
877                        .get("orderLinkId")
878                        .and_then(|v| v.as_str())
879                        .unwrap_or("");
880
881                    // Update our order symbol mapping with the exchange order ID
882                    if !order_id.is_empty()
883                        && !client_order_id.is_empty()
884                        && let Some(symbol) = order_symbol_map.read().get(client_order_id)
885                    {
886                        order_symbol_map
887                            .write()
888                            .insert(order_id.into(), symbol.clone());
889                    }
890
891                    let report = ExecutionReport {
892                        id: Uuid::new_v4().to_string().into(),
893                        order_id: order_id.into(),
894                        exchange_timestamp: time::get_epoch_timestamp_ns(),
895                        system_timestamp: time::get_epoch_timestamp_ns(),
896                        instrument_id: InstrumentId::new(
897                            "UNKNOWN",
898                            rusty_model::venues::Venue::Bybit,
899                        ),
900                        status: OrderStatus::New,
901                        filled_quantity: Decimal::ZERO,
902                        remaining_quantity: Decimal::ZERO,
903                        execution_price: None,
904                        reject_reason: None,
905                        exchange_execution_id: None,
906                        is_final: false,
907                    };
908
909                    if let Err(e) = pending.report_sender.send(report) {
910                        error!("Failed to send execution report: {e}");
911                    }
912                }
913            } else {
914                // Error - order rejected
915                let ret_msg = msg
916                    .get("retMsg")
917                    .and_then(|v| v.as_str())
918                    .unwrap_or("Unknown error");
919
920                let report = ExecutionReport {
921                    id: Uuid::new_v4().to_string().into(),
922                    order_id: SmartString::default(),
923                    exchange_timestamp: time::get_epoch_timestamp_ns(),
924                    system_timestamp: time::get_epoch_timestamp_ns(),
925                    instrument_id: InstrumentId::new("UNKNOWN", rusty_model::venues::Venue::Bybit),
926                    status: OrderStatus::Rejected,
927                    filled_quantity: Decimal::ZERO,
928                    remaining_quantity: Decimal::ZERO,
929                    execution_price: None,
930                    reject_reason: Some(ret_msg.into()),
931                    exchange_execution_id: None,
932                    is_final: true,
933                };
934
935                if let Err(e) = pending.report_sender.send(report) {
936                    error!("Failed to send rejection report: {e}");
937                }
938            }
939        }
940
941        Ok(())
942    }
943
944    /// Process batch order response
945    async fn process_batch_order_response(
946        msg: JsonValue,
947        op: &str,
948        pending_requests: &Arc<RwLock<FxHashMap<SmartString, PendingRequest>>>,
949        report_sender: &Arc<RwLock<Option<Sender<ExecutionReport>>>>,
950        order_symbol_map: &Arc<RwLock<FxHashMap<SmartString, SmartString>>>,
951    ) -> Result<()> {
952        let req_id: Option<SmartString> = msg
953            .get("reqId")
954            .and_then(|v| v.as_str())
955            .map(std::convert::Into::into);
956
957        let ret_code = msg
958            .get("retCode")
959            .and_then(simd_json::prelude::ValueAsScalar::as_i64)
960            .unwrap_or(-1);
961
962        if let Some(req_id) = req_id
963            && let Some(pending) = pending_requests.write().remove(&req_id)
964        {
965            if ret_code == 0 {
966                // Process batch results
967                if let Some(data) = msg.get("data")
968                    && let Some(list) = data.get("list").and_then(|v| v.as_array())
969                {
970                    for item in list {
971                        let order_id = item.get("orderId").and_then(|v| v.as_str()).unwrap_or("");
972
973                        let order_link_id = item
974                            .get("orderLinkId")
975                            .and_then(|v| v.as_str())
976                            .unwrap_or("");
977
978                        let report = ExecutionReport {
979                            id: Uuid::new_v4().to_string().into(),
980                            order_id: order_id.into(),
981                            exchange_timestamp: time::get_epoch_timestamp_ns(),
982                            system_timestamp: time::get_epoch_timestamp_ns(),
983                            instrument_id: InstrumentId::new(
984                                "UNKNOWN",
985                                rusty_model::venues::Venue::Bybit,
986                            ),
987                            status: OrderStatus::New,
988                            filled_quantity: Decimal::ZERO,
989                            remaining_quantity: Decimal::ZERO,
990                            execution_price: None,
991                            reject_reason: None,
992                            exchange_execution_id: Some(order_link_id.into()),
993                            is_final: false,
994                        };
995
996                        if let Err(e) = pending.report_sender.send(report) {
997                            error!("Failed to send batch execution report: {e}");
998                        }
999                    }
1000                }
1001            } else {
1002                // Batch request failed
1003                let ret_msg = msg
1004                    .get("retMsg")
1005                    .and_then(|v| v.as_str())
1006                    .unwrap_or("Batch order failed");
1007
1008                error!("Batch order failed: {ret_msg}");
1009            }
1010        }
1011
1012        Ok(())
1013    }
1014
1015    /// Process cancel all response
1016    async fn process_cancel_all_response(
1017        msg: JsonValue,
1018        pending_requests: &Arc<RwLock<FxHashMap<SmartString, PendingRequest>>>,
1019        report_sender: &Arc<RwLock<Option<Sender<ExecutionReport>>>>,
1020    ) -> Result<()> {
1021        let req_id: Option<SmartString> = msg
1022            .get("reqId")
1023            .and_then(|v| v.as_str())
1024            .map(std::convert::Into::into);
1025
1026        let ret_code = msg
1027            .get("retCode")
1028            .and_then(simd_json::prelude::ValueAsScalar::as_i64)
1029            .unwrap_or(-1);
1030
1031        if let Some(req_id) = req_id
1032            && let Some(pending) = pending_requests.write().remove(&req_id)
1033        {
1034            if ret_code == 0 {
1035                // Cancel all successful
1036                let report = ExecutionReport {
1037                    id: Uuid::new_v4().to_string().into(),
1038                    order_id: req_id.clone(),
1039                    exchange_timestamp: time::get_epoch_timestamp_ns(),
1040                    system_timestamp: time::get_epoch_timestamp_ns(),
1041                    instrument_id: InstrumentId::new("UNKNOWN", rusty_model::venues::Venue::Bybit),
1042                    status: OrderStatus::New,
1043                    filled_quantity: Decimal::ZERO,
1044                    remaining_quantity: Decimal::ZERO,
1045                    execution_price: None,
1046                    reject_reason: None,
1047                    exchange_execution_id: None,
1048                    is_final: false,
1049                };
1050
1051                if let Err(e) = pending.report_sender.send(report) {
1052                    error!("Failed to send cancel all report: {e}");
1053                }
1054            } else {
1055                let ret_msg = msg
1056                    .get("retMsg")
1057                    .and_then(|v| v.as_str())
1058                    .unwrap_or("Cancel all failed");
1059
1060                error!("Cancel all failed: {ret_msg}");
1061            }
1062        }
1063
1064        Ok(())
1065    }
1066
1067    /// Process order update from private stream
1068    async fn process_order_update(
1069        msg: JsonValue,
1070        report_sender: &Arc<RwLock<Option<Sender<ExecutionReport>>>>,
1071    ) -> Result<()> {
1072        if let Some(data) = msg.get("data").and_then(|v| v.as_array()) {
1073            for order_data in data {
1074                let order_id = order_data
1075                    .get("orderId")
1076                    .and_then(|v| v.as_str())
1077                    .unwrap_or("");
1078
1079                let order_link_id = order_data
1080                    .get("orderLinkId")
1081                    .and_then(|v| v.as_str())
1082                    .unwrap_or("");
1083
1084                let symbol = order_data
1085                    .get("symbol")
1086                    .and_then(|v| v.as_str())
1087                    .unwrap_or("");
1088
1089                let side_str = order_data
1090                    .get("side")
1091                    .and_then(|v| v.as_str())
1092                    .unwrap_or("Buy");
1093
1094                let side = match side_str {
1095                    "Sell" => OrderSide::Sell,
1096                    _ => OrderSide::Buy,
1097                };
1098
1099                let order_status_str = order_data
1100                    .get("orderStatus")
1101                    .and_then(|v| v.as_str())
1102                    .unwrap_or("New");
1103
1104                let order_status = Self::map_v5_order_status(order_status_str);
1105
1106                let qty = order_data
1107                    .get("qty")
1108                    .and_then(|v| v.as_str())
1109                    .and_then(|s| Decimal::from_str_exact(s).ok())
1110                    .unwrap_or(Decimal::ZERO);
1111
1112                let price = order_data
1113                    .get("price")
1114                    .and_then(|v| v.as_str())
1115                    .and_then(|s| Decimal::from_str_exact(s).ok())
1116                    .unwrap_or(Decimal::ZERO);
1117
1118                let cum_exec_qty = order_data
1119                    .get("cumExecQty")
1120                    .and_then(|v| v.as_str())
1121                    .and_then(|s| Decimal::from_str_exact(s).ok())
1122                    .unwrap_or(Decimal::ZERO);
1123
1124                let avg_price = order_data
1125                    .get("avgPrice")
1126                    .and_then(|v| v.as_str())
1127                    .and_then(|s| Decimal::from_str_exact(s).ok())
1128                    .unwrap_or(Decimal::ZERO);
1129
1130                let leaves_qty = order_data
1131                    .get("leavesQty")
1132                    .and_then(|v| v.as_str())
1133                    .and_then(|s| Decimal::from_str_exact(s).ok())
1134                    .unwrap_or(qty - cum_exec_qty);
1135
1136                let exec_type = Self::map_order_status_to_exec_type(order_status);
1137
1138                let report = ExecutionReport {
1139                    id: Uuid::new_v4().to_string().into(),
1140                    order_id: order_id.into(),
1141                    exchange_timestamp: time::get_epoch_timestamp_ns(),
1142                    system_timestamp: time::get_epoch_timestamp_ns(),
1143                    instrument_id: InstrumentId::new("UNKNOWN", rusty_model::venues::Venue::Bybit),
1144                    status: order_status,
1145                    filled_quantity: Decimal::ZERO,
1146                    remaining_quantity: Decimal::ZERO,
1147                    execution_price: None,
1148                    reject_reason: None,
1149                    exchange_execution_id: Some(order_link_id.into()),
1150                    is_final: false,
1151                };
1152
1153                if let Some(sender) = &*report_sender.read()
1154                    && let Err(e) = sender.send(report)
1155                {
1156                    error!("Failed to send order update report: {e}");
1157                }
1158            }
1159        }
1160
1161        Ok(())
1162    }
1163
1164    /// Place an order via WebSocket
1165    async fn place_order_internal(&self, order: &Order) -> Result<SmartString> {
1166        // Check rate limit
1167        if !self.rate_limiter.write().can_send_request() {
1168            bail!("Rate limit exceeded");
1169        }
1170
1171        let req_id = Uuid::new_v4().to_string();
1172        let timestamp = time::get_timestamp_ms();
1173
1174        // Determine category based on symbol
1175        let category = self.get_category_from_symbol(&order.symbol);
1176
1177        // Build V5-compliant order args
1178        let mut order_args = json!({
1179            "symbol": order.symbol.as_str(),
1180            "side": match order.side {
1181                OrderSide::Buy => "Buy",
1182                OrderSide::Sell => "Sell",
1183            },
1184            "orderType": match order.order_type {
1185                OrderType::Market => "Market",
1186                OrderType::Limit => "Limit",
1187                OrderType::PostOnly => "Limit", // PostOnly is handled via timeInForce
1188                _ => "Limit",
1189            },
1190            "qty": order.quantity.to_string(),
1191            "category": category,
1192            "timeInForce": match order.time_in_force {
1193                TimeInForce::GTC => "GTC",
1194                TimeInForce::IOC => "IOC",
1195                TimeInForce::FOK => "FOK",
1196                TimeInForce::GTX => "PostOnly", // V5: GTX maps to PostOnly
1197                TimeInForce::GTD => {
1198                    warn!("GTD (Good Till Date) not supported by Bybit, using GTC instead for order {}", order.id);
1199                    "GTC"  // Use GTC as closest alternative - orders remain active until cancelled
1200                },
1201            }
1202        });
1203
1204        // V5: Add positionIdx for futures/perps (default to one-way mode)
1205        if category == "linear" || category == "inverse" {
1206            order_args["positionIdx"] = json!(0); // One-way mode
1207        }
1208
1209        // Add price for limit orders
1210        if order.order_type == OrderType::Limit
1211            && let Some(price) = order.price
1212        {
1213            order_args["price"] = json!(price.to_string());
1214        }
1215
1216        // Use our internal order ID as orderLinkId for tracking
1217        // This allows us to correlate responses with our orders
1218        order_args["orderLinkId"] = json!(order.id.to_string());
1219
1220        // Add stop loss if provided
1221        if let Some(stop_price) = order.stop_price
1222            && stop_price > Decimal::ZERO
1223        {
1224            order_args["stopLoss"] = json!(stop_price.to_string());
1225            order_args["slTriggerBy"] = json!("LastPrice");
1226        }
1227
1228        // Note: take_profit_price field doesn't exist in Order struct
1229        // This feature would need to be added to the Order model if needed
1230
1231        // Create the request
1232        let request = json!({
1233            "reqId": &req_id,
1234            "header": {
1235                "X-BAPI-TIMESTAMP": timestamp.to_string(),
1236                "X-BAPI-RECV-WINDOW": "5000"
1237            },
1238            "op": "order.create",
1239            "args": [order_args]
1240        })
1241        .to_string();
1242
1243        // Send the request
1244        if let Some(sink) = &mut *self.trade_sink.write().await {
1245            sink.send(Message::text(request).to_frame_view()).await?;
1246            self.rate_limiter.write().record_request();
1247        } else {
1248            bail!("WebSocket trade connection not established");
1249        }
1250
1251        Ok(req_id.into())
1252    }
1253
1254    /// Amend an order via WebSocket
1255    async fn amend_order_internal(
1256        &self,
1257        order_id: &str,
1258        symbol: &str,
1259        new_price: Option<Decimal>,
1260        new_quantity: Option<Decimal>,
1261    ) -> Result<SmartString> {
1262        // Check rate limit
1263        if !self.rate_limiter.write().can_send_request() {
1264            bail!("Rate limit exceeded");
1265        }
1266
1267        let req_id = Uuid::new_v4().to_string();
1268        let timestamp = time::get_timestamp_ms();
1269
1270        // Determine category
1271        let category = self.get_category_from_symbol(symbol);
1272
1273        // Build amend args
1274        let mut amend_args = json!({
1275            "orderId": order_id,
1276            "symbol": symbol,
1277            "category": category,
1278        });
1279
1280        // Add new price if provided
1281        if let Some(price) = new_price {
1282            amend_args["price"] = json!(price.to_string());
1283        }
1284
1285        // Add new quantity if provided
1286        if let Some(qty) = new_quantity {
1287            amend_args["qty"] = json!(qty.to_string());
1288        }
1289
1290        // Create the request
1291        let request = json!({
1292            "reqId": &req_id,
1293            "header": {
1294                "X-BAPI-TIMESTAMP": timestamp.to_string(),
1295                "X-BAPI-RECV-WINDOW": "5000"
1296            },
1297            "op": "order.amend",
1298            "args": [amend_args]
1299        })
1300        .to_string();
1301
1302        // Send the request
1303        if let Some(sink) = &mut *self.trade_sink.write().await {
1304            sink.send(Message::text(request).to_frame_view()).await?;
1305            self.rate_limiter.write().record_request();
1306        } else {
1307            bail!("WebSocket trade connection not established");
1308        }
1309
1310        Ok(req_id.into())
1311    }
1312
1313    /// Cancel an order via WebSocket
1314    async fn cancel_order_internal(&self, order_id: &str, symbol: &str) -> Result<SmartString> {
1315        // Check rate limit
1316        if !self.rate_limiter.write().can_send_request() {
1317            bail!("Rate limit exceeded");
1318        }
1319
1320        let req_id = Uuid::new_v4().to_string();
1321        let timestamp = time::get_timestamp_ms();
1322
1323        // Determine category
1324        let category = self.get_category_from_symbol(symbol);
1325
1326        // Build cancel args
1327        let cancel_args = json!({
1328            "orderId": order_id,
1329            "symbol": symbol,
1330            "category": category,
1331        });
1332
1333        // Create the request
1334        let request = json!({
1335            "reqId": &req_id,
1336            "header": {
1337                "X-BAPI-TIMESTAMP": timestamp.to_string(),
1338                "X-BAPI-RECV-WINDOW": "5000"
1339            },
1340            "op": "order.cancel",
1341            "args": [cancel_args]
1342        })
1343        .to_string();
1344
1345        // Send the request
1346        if let Some(sink) = &mut *self.trade_sink.write().await {
1347            sink.send(Message::text(request).to_frame_view()).await?;
1348            self.rate_limiter.write().record_request();
1349        } else {
1350            bail!("WebSocket trade connection not established");
1351        }
1352
1353        Ok(req_id.into())
1354    }
1355
1356    /// Cancel all orders via WebSocket
1357    async fn cancel_all_orders_internal(
1358        &self,
1359        category: &str,
1360        symbol: Option<&str>,
1361    ) -> Result<SmartString> {
1362        // Check rate limit
1363        if !self.rate_limiter.write().can_send_request() {
1364            bail!("Rate limit exceeded");
1365        }
1366
1367        let req_id = Uuid::new_v4().to_string();
1368        let timestamp = time::get_timestamp_ms();
1369
1370        // Build cancel all args
1371        let mut cancel_args = json!({
1372            "category": category,
1373        });
1374
1375        // Add symbol if provided
1376        if let Some(sym) = symbol {
1377            cancel_args["symbol"] = json!(sym);
1378        }
1379
1380        // Create the request
1381        let request = json!({
1382            "reqId": &req_id,
1383            "header": {
1384                "X-BAPI-TIMESTAMP": timestamp.to_string(),
1385                "X-BAPI-RECV-WINDOW": "5000"
1386            },
1387            "op": "order.cancel-all",
1388            "args": [cancel_args]
1389        })
1390        .to_string();
1391
1392        // Send the request
1393        if let Some(sink) = &mut *self.trade_sink.write().await {
1394            sink.send(Message::text(request).to_frame_view()).await?;
1395            self.rate_limiter.write().record_request();
1396        } else {
1397            bail!("WebSocket trade connection not established");
1398        }
1399
1400        Ok(req_id.into())
1401    }
1402
1403    /// Determine category from symbol using cached instrument info
1404    fn get_category_from_symbol(&self, symbol: &str) -> &'static str {
1405        // First, try to get from cache
1406        if let Some(info) = self.instrument_cache.read().get(symbol)
1407            && !info.is_expired()
1408        {
1409            return info.category.as_str();
1410        }
1411
1412        // If not in cache or expired, use fallback heuristic and potentially trigger cache refresh
1413        let fallback_category = self.get_category_from_symbol_heuristic(symbol);
1414
1415        // Trigger async cache refresh for this symbol (fire and forget)
1416        let symbol_owned = symbol.to_owned();
1417        let cache = self.instrument_cache.clone();
1418        let auth = self.auth.clone();
1419        let testnet = self.testnet;
1420        tokio::spawn(async move {
1421            if let Err(e) =
1422                Self::refresh_instrument_info_for_symbol(&cache, &auth, &symbol_owned, testnet)
1423                    .await
1424            {
1425                debug!("Failed to refresh instrument info for {symbol_owned}: {e}");
1426            }
1427        });
1428
1429        fallback_category
1430    }
1431
1432    /// Fallback heuristic-based category determination
1433    fn get_category_from_symbol_heuristic(&self, symbol: &str) -> &'static str {
1434        if symbol.ends_with("USDT") || symbol.ends_with("USDC") {
1435            if symbol.contains('-') {
1436                CATEGORY_OPTION
1437            } else {
1438                CATEGORY_LINEAR
1439            }
1440        } else if symbol.ends_with("USD") {
1441            CATEGORY_INVERSE
1442        } else {
1443            CATEGORY_SPOT
1444        }
1445    }
1446
1447    /// Refresh instrument info for a specific symbol
1448    async fn refresh_instrument_info_for_symbol(
1449        cache: &Arc<RwLock<FxHashMap<SmartString, InstrumentInfo>>>,
1450        auth: &BybitAuth,
1451        symbol: &str,
1452        testnet: bool,
1453    ) -> Result<()> {
1454        // Try each category until we find the symbol
1455        let categories = ["spot", "linear", "inverse", "option"];
1456
1457        for category in &categories {
1458            if let Ok(info) = Self::fetch_instrument_info(auth, symbol, category, testnet).await {
1459                cache.write().insert(symbol.into(), info);
1460                debug!("Cached instrument info for symbol: {symbol} in category: {category}");
1461                return Ok(());
1462            }
1463        }
1464
1465        // If not found in any category, cache with fallback heuristic
1466        let fallback_category = if symbol.ends_with("USDT") || symbol.ends_with("USDC") {
1467            if symbol.contains('-') {
1468                BybitCategory::Option
1469            } else {
1470                BybitCategory::Linear
1471            }
1472        } else if symbol.ends_with("USD") {
1473            BybitCategory::Inverse
1474        } else {
1475            BybitCategory::Spot
1476        };
1477
1478        let info = InstrumentInfo::new(
1479            symbol.into(),
1480            fallback_category,
1481            "Unknown".into(),
1482            "Unknown".into(),
1483            "Unknown".into(),
1484            None,
1485        );
1486
1487        cache.write().insert(symbol.into(), info);
1488        warn!(
1489            "Symbol {} not found via API, cached with heuristic category: {}",
1490            symbol,
1491            fallback_category.as_str()
1492        );
1493
1494        Ok(())
1495    }
1496
1497    /// Fetch instrument info from Bybit REST API
1498    async fn fetch_instrument_info(
1499        auth: &BybitAuth,
1500        symbol: &str,
1501        category: &str,
1502        testnet: bool,
1503    ) -> Result<InstrumentInfo> {
1504        let base_url = if testnet {
1505            "https://api-testnet.bybit.com"
1506        } else {
1507            "https://api.bybit.com"
1508        };
1509
1510        let url =
1511            format!("{base_url}/v5/market/instruments-info?category={category}&symbol={symbol}");
1512
1513        // Create HTTP client with timeout
1514        let client = reqwest::Client::builder()
1515            .timeout(std::time::Duration::from_secs(10))
1516            .build()?;
1517
1518        let response = client
1519            .get(&url)
1520            .header("Content-Type", "application/json")
1521            .send()
1522            .await?;
1523
1524        if !response.status().is_success() {
1525            bail!("HTTP request failed: {}", response.status());
1526        }
1527
1528        let text = response.text().await?;
1529        let mut text_bytes = text.as_bytes().to_vec();
1530        let data = simd_json::from_slice::<JsonValue>(&mut text_bytes)?;
1531
1532        // Parse response
1533        let ret_code = data
1534            .get("retCode")
1535            .and_then(simd_json::prelude::ValueAsScalar::as_i64)
1536            .unwrap_or(-1);
1537        if ret_code != 0 {
1538            let ret_msg = data
1539                .get("retMsg")
1540                .and_then(|v| v.as_str())
1541                .unwrap_or("Unknown error");
1542            bail!("API error: {}", ret_msg);
1543        }
1544
1545        let result = data
1546            .get("result")
1547            .ok_or_else(|| anyhow!("Missing result field"))?;
1548        let list = result
1549            .get("list")
1550            .and_then(|v| v.as_array())
1551            .ok_or_else(|| anyhow!("Missing or invalid list field"))?;
1552
1553        if list.is_empty() {
1554            bail!("Symbol not found in category {}", category);
1555        }
1556
1557        let instrument = &list[0];
1558
1559        let symbol_str: SmartString = instrument
1560            .get("symbol")
1561            .and_then(|v| v.as_str())
1562            .unwrap_or(symbol)
1563            .into();
1564
1565        let category_enum = BybitCategory::parse_category(category)
1566            .ok_or_else(|| anyhow!("Invalid category: {}", category))?;
1567
1568        let status: SmartString = instrument
1569            .get("status")
1570            .and_then(|v| v.as_str())
1571            .unwrap_or("Unknown")
1572            .into();
1573
1574        let base_coin: SmartString = instrument
1575            .get("baseCoin")
1576            .and_then(|v| v.as_str())
1577            .unwrap_or("Unknown")
1578            .into();
1579
1580        let quote_coin: SmartString = instrument
1581            .get("quoteCoin")
1582            .and_then(|v| v.as_str())
1583            .unwrap_or("Unknown")
1584            .into();
1585
1586        let contract_type: Option<SmartString> = instrument
1587            .get("contractType")
1588            .and_then(|v| v.as_str())
1589            .map(std::convert::Into::into);
1590
1591        Ok(InstrumentInfo::new(
1592            symbol_str,
1593            category_enum,
1594            status,
1595            base_coin,
1596            quote_coin,
1597            contract_type,
1598        ))
1599    }
1600
1601    /// Refresh instrument cache for all symbols in use
1602    pub async fn refresh_instrument_cache(&self) -> Result<()> {
1603        let symbols: Vec<SmartString> = self.order_symbol_map.read().values().cloned().collect();
1604
1605        if symbols.is_empty() {
1606            debug!("No symbols to refresh in instrument cache");
1607            return Ok(());
1608        }
1609
1610        info!("Refreshing instrument cache for {} symbols", symbols.len());
1611
1612        // Refresh in batches to avoid overwhelming the API
1613        for chunk in symbols.chunks(10) {
1614            let mut tasks = Vec::new();
1615
1616            for symbol in chunk {
1617                let cache = self.instrument_cache.clone();
1618                let auth = self.auth.clone();
1619                let symbol_owned = symbol.clone();
1620                let testnet = self.testnet;
1621
1622                let task = tokio::spawn(async move {
1623                    Self::refresh_instrument_info_for_symbol(&cache, &auth, &symbol_owned, testnet)
1624                        .await
1625                });
1626
1627                tasks.push(task);
1628            }
1629
1630            // Wait for this batch to complete
1631            for task in tasks {
1632                if let Err(e) = task.await {
1633                    error!("Failed to refresh instrument info: {e}");
1634                }
1635            }
1636
1637            // Rate limit: wait 100ms between batches
1638            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1639        }
1640
1641        info!("Instrument cache refresh completed");
1642        Ok(())
1643    }
1644
1645    /// Get cached instrument info for a symbol
1646    #[must_use]
1647    pub fn get_instrument_info(&self, symbol: &str) -> Option<InstrumentInfo> {
1648        self.instrument_cache.read().get(symbol).cloned()
1649    }
1650
1651    /// Clear expired entries from instrument cache
1652    pub fn cleanup_instrument_cache(&self) {
1653        let mut cache = self.instrument_cache.write();
1654        let initial_count = cache.len();
1655
1656        cache.retain(|_, info| !info.is_expired());
1657
1658        let removed_count = initial_count - cache.len();
1659        if removed_count > 0 {
1660            debug!("Removed {removed_count} expired entries from instrument cache");
1661        }
1662    }
1663
1664    /// Get maximum batch size for category (string version for backward compatibility)
1665    fn get_max_batch_size(&self, category: &str) -> usize {
1666        match category {
1667            CATEGORY_SPOT => MAX_BATCH_SPOT,
1668            CATEGORY_LINEAR => MAX_BATCH_LINEAR,
1669            CATEGORY_INVERSE => MAX_BATCH_INVERSE,
1670            CATEGORY_OPTION => MAX_BATCH_OPTION,
1671            _ => MAX_BATCH_SPOT,
1672        }
1673    }
1674
1675    /// Get maximum batch size for category (enum version)
1676    const fn get_max_batch_size_for_category(&self, category: BybitCategory) -> usize {
1677        category.max_batch_size()
1678    }
1679
1680    /// Get category enum from symbol
1681    fn get_category_enum_from_symbol(&self, symbol: &str) -> BybitCategory {
1682        // First, try to get from cache
1683        if let Some(info) = self.instrument_cache.read().get(symbol)
1684            && !info.is_expired()
1685        {
1686            return info.category;
1687        }
1688
1689        // Fallback to heuristic
1690        if symbol.ends_with("USDT") || symbol.ends_with("USDC") {
1691            if symbol.contains('-') {
1692                BybitCategory::Option
1693            } else {
1694                BybitCategory::Linear
1695            }
1696        } else if symbol.ends_with("USD") {
1697            BybitCategory::Inverse
1698        } else {
1699            BybitCategory::Spot
1700        }
1701    }
1702}
1703#[async_trait]
1704impl Exchange for BybitWebsocketTrading {
1705    fn venue(&self) -> Venue {
1706        Venue::Bybit
1707    }
1708
1709    async fn place_order(
1710        &self,
1711        order: Order,
1712        report_sender: Sender<ExecutionReport>,
1713    ) -> Result<()> {
1714        // V5 Compliance: Validate order for account type (if possible)
1715        let category = self.get_category_from_symbol(&order.symbol);
1716        if let Err(e) = self.validate_order_for_account(&order, category).await {
1717            warn!("V5 validation failed but continuing with order: {e}");
1718        }
1719
1720        // Store the pending request
1721        let req_id = self.place_order_internal(&order).await?;
1722
1723        // Track order symbol mapping using our internal order ID
1724        // We'll update this when we get the exchange order ID
1725        self.order_symbol_map
1726            .write()
1727            .insert(order.id.to_string().into(), order.symbol.clone());
1728
1729        let pending = PendingRequest {
1730            op_type: SmartString::from("order.create"),
1731            timestamp: self.clock.raw(),
1732            report_sender,
1733        };
1734
1735        self.pending_requests.write().insert(req_id, pending);
1736
1737        Ok(())
1738    }
1739
1740    async fn cancel_order(
1741        &self,
1742        order_id: SmartString,
1743        report_sender: Sender<ExecutionReport>,
1744    ) -> Result<()> {
1745        // Get the symbol from our order tracking
1746        let symbol = self
1747            .order_symbol_map
1748            .read()
1749            .get(&order_id)
1750            .cloned()
1751            .ok_or_else(|| anyhow!("Symbol not found for order_id: {}", order_id))?;
1752
1753        let req_id = self.cancel_order_internal(&order_id, &symbol).await?;
1754
1755        let pending = PendingRequest {
1756            op_type: SmartString::from("order.cancel"),
1757            timestamp: self.clock.raw(),
1758            report_sender,
1759        };
1760
1761        self.pending_requests.write().insert(req_id, pending);
1762
1763        Ok(())
1764    }
1765
1766    async fn modify_order(
1767        &self,
1768        order_id: SmartString,
1769        new_price: Option<Decimal>,
1770        new_quantity: Option<Decimal>,
1771        report_sender: Sender<ExecutionReport>,
1772    ) -> Result<()> {
1773        // Get the symbol from our order tracking
1774        let symbol = self
1775            .order_symbol_map
1776            .read()
1777            .get(&order_id)
1778            .cloned()
1779            .ok_or_else(|| anyhow!("Symbol not found for order_id: {}", order_id))?;
1780
1781        let req_id = self
1782            .amend_order_internal(&order_id, &symbol, new_price, new_quantity)
1783            .await?;
1784
1785        let pending = PendingRequest {
1786            op_type: SmartString::from("order.amend"),
1787            timestamp: self.clock.raw(),
1788            report_sender,
1789        };
1790
1791        self.pending_requests.write().insert(req_id, pending);
1792
1793        Ok(())
1794    }
1795
1796    async fn cancel_all_orders(
1797        &self,
1798        instrument_id: Option<InstrumentId>,
1799        report_sender: Sender<ExecutionReport>,
1800    ) -> Result<()> {
1801        let (category, symbol_str) = if let Some(id) = &instrument_id {
1802            let category = self.get_category_from_symbol(&id.symbol);
1803            (category, Some(id.symbol.as_str()))
1804        } else {
1805            // Cancel all orders across all categories
1806            (CATEGORY_LINEAR, None) // TODO: Handle multiple categories
1807        };
1808
1809        let req_id = self
1810            .cancel_all_orders_internal(category, symbol_str)
1811            .await?;
1812
1813        let pending = PendingRequest {
1814            op_type: SmartString::from("order.cancel-all"),
1815            timestamp: self.clock.raw(),
1816            report_sender,
1817        };
1818
1819        self.pending_requests.write().insert(req_id, pending);
1820
1821        Ok(())
1822    }
1823
1824    async fn get_order_status(&self, order_id: &str) -> Result<OrderStatus> {
1825        // WebSocket doesn't provide query functionality
1826        // You would need to track order status from the order stream
1827        bail!("Order status query not supported via WebSocket - track from order stream")
1828    }
1829
1830    async fn connect(&self, report_sender: Sender<ExecutionReport>) -> Result<()> {
1831        // Use the proper report_sender channel instead of creating a dummy one
1832        self.connect_internal(report_sender).await
1833    }
1834
1835    async fn disconnect(&self) -> Result<()> {
1836        self.is_connected.store(false, Ordering::Relaxed);
1837        self.is_authenticated.store(false, Ordering::Relaxed);
1838
1839        // Close WebSocket connections
1840        if let Some(sink) = &mut *self.trade_sink.write().await {
1841            let _ = sink.close().await;
1842        }
1843        *self.trade_sink.write().await = None;
1844        *self.trade_stream.write().await = None;
1845
1846        if let Some(sink) = &mut *self.private_sink.write().await {
1847            let _ = sink.close().await;
1848        }
1849        *self.private_sink.write().await = None;
1850        *self.private_stream.write().await = None;
1851
1852        // Cancel background tasks
1853        if let Some(handle) = self.ping_handle.write().take() {
1854            handle.abort();
1855        }
1856
1857        if let Some(handle) = self.message_handle.write().take() {
1858            handle.abort();
1859        }
1860
1861        // Clear pending requests
1862        self.pending_requests.write().clear();
1863
1864        info!("Bybit WebSocket trading disconnected");
1865        Ok(())
1866    }
1867
1868    async fn is_connected(&self) -> bool {
1869        self.is_connected.load(Ordering::Relaxed) && self.is_authenticated.load(Ordering::Relaxed)
1870    }
1871
1872    async fn get_instruments(&self) -> Result<SmallVec<[InstrumentId; 32]>> {
1873        // WebSocket doesn't provide instrument query functionality
1874        // You would need to use REST API or maintain a cached list
1875        bail!("Instrument query not supported via WebSocket")
1876    }
1877
1878    async fn send_fix_message(&self, message: Vec<u8>) -> Result<()> {
1879        // Bybit WebSocket doesn't support FIX protocol
1880        bail!("FIX protocol not supported on Bybit WebSocket")
1881    }
1882
1883    async fn receive_fix_message(&self) -> Result<Vec<u8>> {
1884        // Bybit WebSocket doesn't support FIX protocol
1885        bail!("FIX protocol not supported on Bybit WebSocket")
1886    }
1887}