rusty_feeder/exchange/bybit/futures/
provider.rs

1//! Bybit Futures market provider implementation
2//!
3//! High-performance provider for Bybit Futures markets with WebSocket streaming
4//! and REST API support. Optimized for HFT applications with nanosecond precision.
5
6use rusty_common::collections::FxHashMap;
7use smartstring::alias::String;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use super::data::{MarketTradeResponse, orderbook::OrderbookResponse};
12use anyhow::{Context, Result, anyhow};
13use async_trait::async_trait;
14use parking_lot::RwLock;
15use quanta::Clock;
16use reqwest::header::{HeaderMap, HeaderValue};
17use rusty_common::json::Value;
18use rusty_common::websocket::{Message, WebSocketClient, WebSocketConfig, WebSocketError};
19use rusty_model::{
20    data::{
21        book_snapshot::OrderBookSnapshot, market_trade::MarketTrade,
22        simd_orderbook::SharedSimdOrderBook,
23    },
24    instruments::{Instrument, InstrumentId},
25    venues::Venue,
26};
27use simd_json::prelude::{ValueAsArray, ValueAsScalar, ValueObjectAccess};
28use smallvec::SmallVec;
29use tokio::sync::{mpsc, watch};
30use tokio::task::JoinHandle;
31
32use crate::provider::prelude::*;
33
34// Constants for Bybit Futures API
35const BYBIT_FUTURES_API_URL: &str = "https://api.bybit.com";
36const BYBIT_FUTURES_WS_URL: &str = "wss://stream.bybit.com/v5/public/linear";
37const _BYBIT_FUTURES_TESTNET_API_URL: &str = "https://api-testnet.bybit.com";
38const _BYBIT_FUTURES_TESTNET_WS_URL: &str = "wss://stream-testnet.bybit.com/v5/public/linear";
39
40// Rate limits for Bybit Futures API
41const BYBIT_FUTURES_RATE_LIMITS: &[RateLimit] = &[
42    RateLimit {
43        limit_type: "REQUEST_WEIGHT",
44        interval: "MINUTE",
45        interval_num: 1,
46        limit: 120,
47    },
48    RateLimit {
49        limit_type: "ORDERS",
50        interval: "MINUTE",
51        interval_num: 1,
52        limit: 100,
53    },
54];
55
56/// Bybit Futures instrument implementation
57#[derive(Debug, Clone)]
58pub struct BybitFuturesInstrument {
59    /// Unique instrument identifier
60    pub id: InstrumentId,
61}
62
63impl Instrument for BybitFuturesInstrument {
64    fn id(&self) -> InstrumentId {
65        self.id.clone()
66    }
67
68    fn symbol(&self) -> String {
69        self.id.symbol.clone()
70    }
71
72    fn venue(&self) -> Venue {
73        self.id.venue
74    }
75
76    fn as_any(&self) -> &dyn std::any::Any {
77        self
78    }
79
80    fn clone_box(&self) -> Box<dyn Instrument> {
81        Box::new(self.clone())
82    }
83}
84
85/// Subscription message for Bybit Futures WebSocket
86#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
87struct SubscriptionMessage {
88    op: String,
89    args: Vec<String>,
90}
91
92impl SubscriptionMessage {
93    /// Create subscription for trades
94    fn trades(symbols: Vec<String>) -> Self {
95        let args = symbols
96            .into_iter()
97            .map(|s| format!("publicTrade.{s}").into())
98            .collect();
99
100        Self {
101            op: "subscribe".into(),
102            args,
103        }
104    }
105
106    /// Create subscription for orderbook
107    fn orderbook(symbols: Vec<String>) -> Self {
108        let args = symbols
109            .into_iter()
110            .map(|s| format!("orderbook.50.{s}").into())
111            .collect();
112
113        Self {
114            op: "subscribe".into(),
115            args,
116        }
117    }
118}
119
120/// Bybit Futures market data provider
121///
122/// High-performance provider for Bybit Futures markets with WebSocket streaming
123/// and REST API support. Optimized for HFT applications with nanosecond precision.
124#[derive(Debug)]
125pub struct BybitFuturesProvider {
126    /// Connection configuration
127    config: ConnectionConfig,
128
129    /// WebSocket connection status
130    connection_status: Arc<RwLock<ConnectionState>>,
131
132    /// Connection statistics
133    stats: Arc<RwLock<ConnectionStats>>,
134
135    /// Shared clock for time synchronization
136    clock: Clock,
137
138    /// Active subscriptions
139    subscriptions: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
140
141    /// Task handles for WebSocket connections
142    ws_handles: Arc<RwLock<FxHashMap<String, JoinHandle<()>>>>,
143
144    /// Last connection attempt timestamp
145    last_connection_attempt: Arc<RwLock<Instant>>,
146
147    /// HTTP client for REST API requests
148    http_client: reqwest::Client,
149}
150
151impl Default for BybitFuturesProvider {
152    fn default() -> Self {
153        Self::new()
154    }
155}
156
157impl BybitFuturesProvider {
158    /// Create a new Bybit Futures provider with default configuration
159    #[inline]
160    #[must_use]
161    pub fn new() -> Self {
162        Self::with_config(None)
163    }
164
165    /// Create a new Bybit Futures provider with custom configuration
166    #[inline]
167    #[must_use]
168    pub fn with_config(config: Option<ConnectionConfig>) -> Self {
169        let mut default_config = ConnectionConfig::default();
170
171        // Set default WebSocket configuration
172        default_config.websocket_config.base_url = BYBIT_FUTURES_WS_URL.into();
173        default_config.websocket_config.ping_interval_milliseconds = 20000; // 20 seconds
174        default_config.websocket_config.use_compression = false;
175
176        // Set default REST API configuration
177        default_config.rest_config.base_url = BYBIT_FUTURES_API_URL.into();
178        default_config.rest_config.timeout_milliseconds = 5000; // 5 seconds
179
180        // Use custom config if provided
181        let config = config.unwrap_or(default_config);
182        let clock = config.clock.clone();
183
184        // Create HTTP client with appropriate settings
185        let mut headers = HeaderMap::new();
186        headers.insert(
187            "User-Agent",
188            HeaderValue::from_str(&config.rest_config.user_agent)
189                .unwrap_or_else(|_| HeaderValue::from_static("RustyHFT/1.0")),
190        );
191
192        let http_client = reqwest::Client::builder()
193            .timeout(Duration::from_millis(
194                config.rest_config.timeout_milliseconds,
195            ))
196            .connect_timeout(Duration::from_millis(
197                config.rest_config.timeout_milliseconds / 2,
198            ))
199            .pool_max_idle_per_host(config.rest_config.connection_pool_size)
200            .pool_idle_timeout(Duration::from_millis(
201                config.rest_config.keep_alive_milliseconds,
202            ))
203            .default_headers(headers)
204            .build()
205            .unwrap_or_default();
206
207        Self {
208            config,
209            connection_status: Arc::new(RwLock::new(ConnectionState::Disconnected)),
210            stats: Arc::new(RwLock::new(ConnectionStats::default())),
211            clock,
212            subscriptions: Arc::new(RwLock::new(FxHashMap::default())),
213            ws_handles: Arc::new(RwLock::new(FxHashMap::default())),
214            last_connection_attempt: Arc::new(RwLock::new(Instant::now())),
215            http_client,
216        }
217    }
218
219    /// Helper function to update connection statistics upon message reception
220    #[inline]
221    fn update_receive_stats(
222        stats: Arc<RwLock<ConnectionStats>>,
223        message_size: usize,
224        local_time: u64,
225    ) {
226        let mut s = stats.write();
227        s.messages_received += 1;
228        s.bytes_received += message_size as u64;
229        s.last_message_time = local_time;
230    }
231
232    /// Helper function to create WebSocket request with proper headers
233    fn create_websocket_config(
234        url: String,
235        use_compression: bool,
236        ping_interval_milliseconds: u64,
237        timeout_milliseconds: u64,
238    ) -> WebSocketConfig {
239        WebSocketConfig::builder(rusty_common::types::Exchange::Bybit, url.to_string())
240            .ping_interval(Duration::from_millis(ping_interval_milliseconds))
241            .timeout(Duration::from_millis(timeout_milliseconds))
242            .compression(if use_compression {
243                rusty_common::websocket::CompressionConfig::default()
244            } else {
245                rusty_common::websocket::CompressionConfig::disabled()
246            })
247            .build()
248    }
249
250    /// Connect to the WebSocket API
251    #[inline]
252    async fn connect(&self) -> Result<()> {
253        if *self.connection_status.read() == ConnectionState::Connected {
254            return Ok(());
255        }
256
257        // Check if we need to respect reconnection backoff
258        {
259            let now = Instant::now();
260            let last_attempt = *self.last_connection_attempt.read();
261            let backoff_ms = self
262                .config
263                .websocket_config
264                .reconnect
265                .backoff_initial_milliseconds;
266
267            if now.duration_since(last_attempt) < Duration::from_millis(backoff_ms) {
268                // Wait until backoff period is over
269                tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
270            }
271
272            // Update last attempt time
273            *self.last_connection_attempt.write() = Instant::now();
274        }
275
276        *self.connection_status.write() = ConnectionState::Connecting;
277
278        // Note: Connection is now handled by individual trade/orderbook connections
279        // using WebSocketClient pattern. This method is kept for compatibility.
280        *self.connection_status.write() = ConnectionState::Connected;
281        self.stats.write().connected_time = self.clock.raw();
282        Ok(())
283    }
284
285    /// Create a WebSocket connection for trade data
286    #[inline]
287    async fn create_trade_connection(
288        &self,
289        symbols: SmallVec<[String; 8]>,
290        tx: mpsc::Sender<MarketTradeResponse>,
291        subscription: SubscriptionMessage,
292    ) -> Result<JoinHandle<()>> {
293        let connection_id = format!("trade-{}", symbols.join(","));
294
295        // Create stop signal
296        let (stop_tx, stop_rx) = watch::channel(false);
297        self.subscriptions
298            .write()
299            .insert(connection_id.clone().into(), stop_tx);
300
301        // Clone parameters for the task
302        let url = self.config.websocket_config.base_url.clone();
303        let clock = self.clock.clone();
304        let timeout_milliseconds = self.config.websocket_config.timeout_milliseconds;
305        let ping_interval_milliseconds = self.config.websocket_config.ping_interval_milliseconds;
306        let stats = self.stats.clone();
307        let connection_status = self.connection_status.clone();
308        let use_compression = self.config.websocket_config.use_compression;
309
310        // Spawn a task for the WebSocket connection
311        let handle = tokio::spawn(async move {
312            loop {
313                // Check for stop signal
314                if *stop_rx.borrow() {
315                    break;
316                }
317
318                // Create WebSocket configuration
319                let ws_config = Self::create_websocket_config(
320                    url.clone(),
321                    use_compression,
322                    ping_interval_milliseconds,
323                    timeout_milliseconds,
324                );
325
326                // Create WebSocket client
327                let mut client = WebSocketClient::new(ws_config);
328
329                // Update connection status
330                *connection_status.write() = ConnectionState::Connecting;
331
332                // Create a simple message handler
333                let handler = BybitFuturesMessageHandler::new_trade_handler(
334                    clock.clone(),
335                    stats.clone(),
336                    tx.clone(),
337                    subscription.clone(),
338                );
339
340                // Run the WebSocket client
341                if let Err(e) = client.run(handler).await {
342                    log::error!("WebSocket client error: {e}");
343                    *connection_status.write() = ConnectionState::Error;
344                }
345
346                // Check for stop signal before reconnecting
347                if *stop_rx.borrow() {
348                    break;
349                }
350
351                // Reconnect after a delay
352                tokio::time::sleep(Duration::from_millis(1000)).await;
353            }
354        });
355
356        Ok(handle)
357    }
358
359    /// Create a WebSocket connection for orderbook data
360    #[inline]
361    async fn create_orderbook_connection(
362        &self,
363        symbols: SmallVec<[String; 8]>,
364        tx: mpsc::Sender<OrderbookResponse>,
365        subscription: SubscriptionMessage,
366    ) -> Result<JoinHandle<()>> {
367        let connection_id = format!("orderbook-{}", symbols.join(","));
368
369        // Create stop signal
370        let (stop_tx, stop_rx) = watch::channel(false);
371        self.subscriptions
372            .write()
373            .insert(connection_id.clone().into(), stop_tx);
374
375        // Clone parameters for the task
376        let url = self.config.websocket_config.base_url.clone();
377        let clock = self.clock.clone();
378        let timeout_milliseconds = self.config.websocket_config.timeout_milliseconds;
379        let ping_interval_milliseconds = self.config.websocket_config.ping_interval_milliseconds;
380        let stats = self.stats.clone();
381        let connection_status = self.connection_status.clone();
382        let use_compression = self.config.websocket_config.use_compression;
383
384        // Spawn a task for the WebSocket connection
385        let handle = tokio::spawn(async move {
386            loop {
387                // Check for stop signal
388                if *stop_rx.borrow() {
389                    break;
390                }
391
392                // Create WebSocket configuration
393                let ws_config = Self::create_websocket_config(
394                    url.clone(),
395                    use_compression,
396                    ping_interval_milliseconds,
397                    timeout_milliseconds,
398                );
399
400                // Create WebSocket client
401                let mut client = WebSocketClient::new(ws_config);
402
403                // Update connection status
404                *connection_status.write() = ConnectionState::Connecting;
405
406                // Create a simple message handler
407                let handler = BybitFuturesMessageHandler::new_orderbook_handler(
408                    clock.clone(),
409                    stats.clone(),
410                    tx.clone(),
411                    subscription.clone(),
412                );
413
414                // Run the WebSocket client
415                if let Err(e) = client.run(handler).await {
416                    log::error!("WebSocket client error: {e}");
417                    *connection_status.write() = ConnectionState::Error;
418                }
419
420                // Check for stop signal before reconnecting
421                if *stop_rx.borrow() {
422                    break;
423                }
424
425                // Reconnect after a delay
426                tokio::time::sleep(Duration::from_millis(1000)).await;
427            }
428        });
429
430        Ok(handle)
431    }
432
433    /// Fetch orderbook snapshot for a symbol
434    async fn fetch_orderbook_snapshot(
435        &self,
436        symbol: &str,
437        depth: Option<u32>,
438    ) -> Result<OrderBookSnapshot> {
439        let limit = depth.unwrap_or(50);
440        let url = format!(
441            "{}/v5/market/orderbook?category=linear&symbol={}&limit={}",
442            self.config.rest_config.base_url, symbol, limit
443        );
444
445        let response = self.http_client.get(&url).send().await?;
446
447        if !response.status().is_success() {
448            let error_text = response.text().await?;
449            return Err(anyhow!(
450                "Failed to fetch orderbook snapshot: {}",
451                error_text
452            ));
453        }
454
455        // Parse the response using simd_json
456        let bytes = response
457            .bytes()
458            .await
459            .context("Failed to get response bytes")?;
460        let mut bytes_vec = bytes.to_vec();
461        let response_json: simd_json::OwnedValue =
462            simd_json::from_slice(&mut bytes_vec).context("Failed to parse JSON")?;
463
464        // Extract data
465        let result = response_json
466            .get("result")
467            .ok_or_else(|| anyhow!("Invalid response format"))?;
468
469        let update_id = result.get("u").and_then(|v| v.as_u64()).unwrap_or(0);
470
471        let bids = result
472            .get("b")
473            .and_then(|b| b.as_array())
474            .ok_or_else(|| anyhow!("Missing bids"))?;
475
476        let asks = result
477            .get("a")
478            .and_then(|a| a.as_array())
479            .ok_or_else(|| anyhow!("Missing asks"))?;
480
481        // Create OrderBookSnapshot with current timestamp (nanoseconds since epoch)
482        let timestamp = self.clock.raw();
483        let instrument_id = InstrumentId::new(symbol, Venue::Bybit);
484
485        let mut order_book_snapshot =
486            OrderBookSnapshot::new_empty(instrument_id, timestamp, update_id);
487
488        // Parse bids
489        for bid in bids {
490            if let Some(bid_array) = bid.as_array()
491                && bid_array.len() >= 2
492                && let (Some(price_str), Some(quantity_str)) =
493                    (bid_array[0].as_str(), bid_array[1].as_str())
494                && let (Ok(price), Ok(quantity)) = (
495                    price_str.parse::<rust_decimal::Decimal>(),
496                    quantity_str.parse::<rust_decimal::Decimal>(),
497                )
498            {
499                order_book_snapshot.add_bid(price, quantity);
500            }
501        }
502
503        // Parse asks
504        for ask in asks {
505            if let Some(ask_array) = ask.as_array()
506                && ask_array.len() >= 2
507                && let (Some(price_str), Some(quantity_str)) =
508                    (ask_array[0].as_str(), ask_array[1].as_str())
509                && let (Ok(price), Ok(quantity)) = (
510                    price_str.parse::<rust_decimal::Decimal>(),
511                    quantity_str.parse::<rust_decimal::Decimal>(),
512                )
513            {
514                order_book_snapshot.add_ask(price, quantity);
515            }
516        }
517
518        Ok(order_book_snapshot)
519    }
520}
521
522#[async_trait]
523impl Provider for BybitFuturesProvider {
524    type TradeMessage = MarketTradeResponse;
525    type DepthMessage = OrderbookResponse;
526    type InstrumentMessage = Value;
527
528    fn name(&self) -> &'static str {
529        "Bybit Futures"
530    }
531
532    fn venue(&self) -> Venue {
533        Venue::Bybit
534    }
535
536    fn config(&self) -> &ConnectionConfig {
537        &self.config
538    }
539
540    async fn init(&mut self) -> Result<()> {
541        self.connect().await
542    }
543
544    async fn subscribe_trades(
545        &self,
546        symbols: SmallVec<[String; 8]>,
547    ) -> Result<mpsc::Receiver<Self::TradeMessage>> {
548        // Ensure connection is established
549        self.connect().await?;
550
551        // Create channel for trade messages
552        let (tx, rx) = mpsc::channel(1024);
553
554        // Create subscription message
555        let subscription = SubscriptionMessage::trades(symbols.to_vec());
556        let connection_id = format!("trade-{}", symbols.join(","));
557
558        // Create WebSocket connection and handle
559        let handle = self
560            .create_trade_connection(symbols, tx, subscription)
561            .await?;
562
563        // Save task handle
564        self.ws_handles.write().insert(connection_id.into(), handle);
565
566        Ok(rx)
567    }
568
569    async fn unsubscribe_trades(&self) -> Result<()> {
570        // Find all trade subscriptions
571        let mut keys_to_remove = Vec::new();
572
573        for (key, _) in self.subscriptions.read().iter() {
574            if key.starts_with("trade-") {
575                keys_to_remove.push(key.clone());
576            }
577        }
578
579        // Send stop signal for each subscription
580        for key in keys_to_remove {
581            if let Some(tx) = self.subscriptions.write().remove(&key) {
582                let _ = tx.send(true);
583            }
584
585            // Abort task
586            if let Some(handle) = self.ws_handles.write().remove(&key) {
587                handle.abort();
588            }
589        }
590
591        Ok(())
592    }
593
594    async fn subscribe_orderbook(
595        &self,
596        symbols: SmallVec<[String; 8]>,
597    ) -> Result<mpsc::Receiver<Self::DepthMessage>> {
598        // Ensure connection is established
599        self.connect().await?;
600
601        // Create channel for orderbook messages
602        let (tx, rx) = mpsc::channel(1024);
603
604        // Create subscription message for level2 updates
605        let subscription = SubscriptionMessage::orderbook(symbols.to_vec());
606        let connection_id = format!("orderbook-{}", symbols.join(","));
607
608        // Create WebSocket connection and handle
609        let handle = self
610            .create_orderbook_connection(symbols, tx, subscription)
611            .await?;
612
613        // Save task handle
614        self.ws_handles.write().insert(connection_id.into(), handle);
615
616        Ok(rx)
617    }
618
619    async fn unsubscribe_orderbook(&self) -> Result<()> {
620        // Find all orderbook subscriptions
621        let mut keys_to_remove = Vec::new();
622
623        for (key, _) in self.subscriptions.read().iter() {
624            if key.starts_with("orderbook-") {
625                keys_to_remove.push(key.clone());
626            }
627        }
628
629        // Send stop signal for each subscription
630        for key in keys_to_remove {
631            if let Some(tx) = self.subscriptions.write().remove(&key) {
632                let _ = tx.send(true);
633            }
634
635            // Abort task
636            if let Some(handle) = self.ws_handles.write().remove(&key) {
637                handle.abort();
638            }
639        }
640
641        Ok(())
642    }
643
644    #[inline]
645    async fn get_instruments(&self) -> Result<Vec<Box<dyn Instrument>>> {
646        // Fetch instruments from Bybit API
647        let url = format!(
648            "{}/v5/market/instruments-info?category=linear",
649            self.config.rest_config.base_url
650        );
651
652        let response = self
653            .http_client
654            .get(&url)
655            .send()
656            .await
657            .context("Failed to fetch Bybit instruments info")?;
658
659        // Check if the request was successful
660        if !response.status().is_success() {
661            return Err(anyhow!(
662                "Failed to fetch Bybit instruments info: HTTP {}",
663                response.status()
664            ));
665        }
666
667        // Parse the response using simd_json
668        let bytes = response
669            .bytes()
670            .await
671            .context("Failed to get response bytes")?;
672        let mut bytes_vec = bytes.to_vec();
673        let instruments_info: simd_json::OwnedValue = simd_json::from_slice(&mut bytes_vec)
674            .context("Failed to parse Bybit instruments info response")?;
675
676        // Extract the instruments from the response
677        let result = instruments_info
678            .get("result")
679            .ok_or_else(|| anyhow!("Invalid response format: missing result"))?;
680
681        let list = result
682            .get("list")
683            .and_then(|l| l.as_array())
684            .ok_or_else(|| anyhow!("Invalid response format: expected array of instruments"))?;
685
686        // Convert to instruments
687        let mut instruments = Vec::with_capacity(list.len());
688
689        for instrument_data in list {
690            // Skip instruments that are not trading
691            let status = instrument_data
692                .get("status")
693                .and_then(|s| s.as_str())
694                .unwrap_or("");
695            if status != "Trading" {
696                continue;
697            }
698
699            let symbol = instrument_data
700                .get("symbol")
701                .and_then(|s| s.as_str())
702                .ok_or_else(|| {
703                    anyhow!("Invalid instrument data: 'symbol' field not found or not a String")
704                })?;
705
706            // Create a basic instrument implementation
707            let instrument_id = InstrumentId::new(symbol, Venue::Bybit);
708            let instrument =
709                Box::new(BybitFuturesInstrument { id: instrument_id }) as Box<dyn Instrument>;
710            instruments.push(instrument);
711        }
712
713        Ok(instruments)
714    }
715
716    #[inline]
717    async fn get_historical_trades(
718        &self,
719        _symbol: &str,
720        _limit: Option<u32>,
721    ) -> Result<Vec<MarketTrade>> {
722        // Fetch historical trades from Bybit API
723        // Not fully implemented yet
724        Err(anyhow!("Historical trades not implemented yet"))
725    }
726
727    async fn get_orderbook_snapshot(
728        &self,
729        symbol: &str,
730        depth: Option<u32>,
731    ) -> Result<OrderBookSnapshot> {
732        self.fetch_orderbook_snapshot(symbol, depth).await
733    }
734
735    #[inline]
736    async fn get_realtime_orderbook(&self, _symbol: &str) -> Result<SharedSimdOrderBook> {
737        // Not yet implemented
738        Err(anyhow!("Real-time orderbook not implemented yet"))
739    }
740
741    #[inline]
742    async fn is_connected(&self) -> bool {
743        *self.connection_status.read() == ConnectionState::Connected
744    }
745
746    #[inline]
747    async fn connection_status(&self) -> ConnectionState {
748        *self.connection_status.read()
749    }
750
751    #[inline]
752    async fn get_stats(&self) -> ConnectionStats {
753        self.stats.read().clone()
754    }
755
756    #[inline]
757    async fn ping(&self) -> Result<u64> {
758        // For WebSocketClient pattern, ping/pong is handled automatically
759        // Return a simple network latency test via HTTP
760        let start = self.clock.raw();
761
762        let response = self
763            .http_client
764            .get(format!(
765                "{}/v5/market/time",
766                self.config.rest_config.base_url
767            ))
768            .send()
769            .await?;
770
771        if response.status().is_success() {
772            Ok(self.clock.raw().saturating_sub(start))
773        } else {
774            Err(anyhow!("Ping failed with status: {}", response.status()))
775        }
776    }
777
778    #[inline]
779    async fn reset_connection(&self) -> Result<()> {
780        // First, unsubscribe all active subscriptions
781        let mut keys_to_remove = Vec::new();
782
783        for (key, _) in self.subscriptions.read().iter() {
784            keys_to_remove.push(key.clone());
785        }
786
787        // Send stop signal for each subscription
788        for key in &keys_to_remove {
789            if let Some(tx) = self.subscriptions.write().remove(key) {
790                let _ = tx.send(true);
791            }
792
793            // Abort task
794            if let Some(handle) = self.ws_handles.write().remove(key) {
795                handle.abort();
796            }
797        }
798
799        // Set status to disconnected
800        *self.connection_status.write() = ConnectionState::Disconnected;
801
802        // Reset stats
803        *self.stats.write() = ConnectionStats::default();
804
805        // Connect again
806        self.connect().await
807    }
808
809    fn get_rate_limits(&self) -> Vec<RateLimit> {
810        BYBIT_FUTURES_RATE_LIMITS.to_vec()
811    }
812
813    async fn shutdown(&mut self) -> Result<()> {
814        // Unsubscribe from all active subscriptions
815        let mut keys_to_remove = Vec::new();
816
817        for (key, _) in self.subscriptions.read().iter() {
818            keys_to_remove.push(key.clone());
819        }
820
821        // Send stop signal for each subscription
822        for key in keys_to_remove {
823            if let Some(tx) = self.subscriptions.write().remove(&key) {
824                let _ = tx.send(true);
825            }
826
827            // Abort task
828            if let Some(handle) = self.ws_handles.write().remove(&key) {
829                handle.abort();
830            }
831        }
832
833        // Set connection status to disconnected
834        *self.connection_status.write() = ConnectionState::Disconnected;
835
836        Ok(())
837    }
838}
839
840/// Simple message handler for Bybit Futures WebSocket messages
841#[derive(Debug)]
842struct BybitFuturesMessageHandler {
843    handler_type: HandlerType,
844    clock: Clock,
845    stats: Arc<RwLock<ConnectionStats>>,
846    trade_tx: Option<mpsc::Sender<MarketTradeResponse>>,
847    orderbook_tx: Option<mpsc::Sender<OrderbookResponse>>,
848    subscription: SubscriptionMessage,
849}
850
851#[derive(Debug)]
852enum HandlerType {
853    Trade,
854    Orderbook,
855}
856
857impl BybitFuturesMessageHandler {
858    const fn new_trade_handler(
859        clock: Clock,
860        stats: Arc<RwLock<ConnectionStats>>,
861        tx: mpsc::Sender<MarketTradeResponse>,
862        subscription: SubscriptionMessage,
863    ) -> Self {
864        Self {
865            handler_type: HandlerType::Trade,
866            clock,
867            stats,
868            trade_tx: Some(tx),
869            orderbook_tx: None,
870            subscription,
871        }
872    }
873
874    const fn new_orderbook_handler(
875        clock: Clock,
876        stats: Arc<RwLock<ConnectionStats>>,
877        tx: mpsc::Sender<OrderbookResponse>,
878        subscription: SubscriptionMessage,
879    ) -> Self {
880        Self {
881            handler_type: HandlerType::Orderbook,
882            clock,
883            stats,
884            trade_tx: None,
885            orderbook_tx: Some(tx),
886            subscription,
887        }
888    }
889}
890
891#[async_trait]
892impl rusty_common::websocket::MessageHandler for BybitFuturesMessageHandler {
893    async fn on_message(&mut self, message: Message) -> std::result::Result<(), WebSocketError> {
894        let local_time = self.clock.raw();
895
896        // Only process text messages
897        if let Some(text) = message.as_text() {
898            BybitFuturesProvider::update_receive_stats(self.stats.clone(), text.len(), local_time);
899
900            // Parse the message as JSON first
901            let mut message_bytes = text.as_bytes().to_vec();
902            let json_value: simd_json::OwnedValue = simd_json::from_slice(&mut message_bytes)
903                .map_err(|e| {
904                    WebSocketError::MessageProcessingError(format!("Failed to parse JSON: {e}"))
905                })?;
906
907            // Check if this is a data message or a response
908            if let Some(topic) = json_value["topic"].as_str() {
909                match self.handler_type {
910                    HandlerType::Trade if topic.starts_with("publicTrade.") => {
911                        if let Ok(trade_response) =
912                            simd_json::from_slice::<MarketTradeResponse>(&mut message_bytes)
913                            && let Some(ref tx) = self.trade_tx
914                            && let Err(e) = tx.send(trade_response).await
915                        {
916                            log::warn!("Failed to send trade message: {e}");
917                        }
918                    }
919                    HandlerType::Orderbook if topic.starts_with("orderbook.") => {
920                        if let Ok(orderbook_response) =
921                            simd_json::from_slice::<OrderbookResponse>(&mut message_bytes)
922                            && let Some(ref tx) = self.orderbook_tx
923                            && let Err(e) = tx.send(orderbook_response).await
924                        {
925                            log::warn!("Failed to send orderbook message: {e}");
926                        }
927                    }
928                    _ => {
929                        // Handle other message types (subscriptions, heartbeat, error)
930                        if let Some(op) = json_value["op"].as_str() {
931                            match op {
932                                "subscribe" => {
933                                    log::info!("Bybit Futures subscription confirmed: {text}");
934                                }
935                                "pong" => {
936                                    log::trace!("Bybit Futures pong received");
937                                }
938                                _ => {
939                                    log::debug!("Unhandled Bybit Futures message op: {op}");
940                                }
941                            }
942                        }
943                    }
944                }
945            }
946        }
947
948        Ok(())
949    }
950
951    async fn on_connected(&mut self) -> std::result::Result<(), WebSocketError> {
952        // Send subscription message after connection
953        let subscription_json = simd_json::to_string(&self.subscription).map_err(|e| {
954            WebSocketError::MessageProcessingError(format!("Failed to serialize subscription: {e}"))
955        })?;
956
957        log::info!("Sending Bybit Futures subscription: {subscription_json}");
958
959        // Note: The actual sending would be handled by the WebSocketClient
960        // This is just for logging purposes
961        Ok(())
962    }
963
964    async fn on_disconnected(&mut self) -> std::result::Result<(), WebSocketError> {
965        log::info!("Disconnected from Bybit Futures WebSocket");
966        Ok(())
967    }
968
969    async fn on_error(&mut self, error: WebSocketError) -> std::result::Result<(), WebSocketError> {
970        log::error!("Bybit Futures WebSocket error: {error}");
971        Ok(())
972    }
973}