rusty_feeder/exchange/bybit/
provider.rs

1use rusty_common::collections::FxHashMap;
2use smartstring::alias::String;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use super::data::{
7    orderbook::OrderbookResponse, subscription::SubscriptionRequest, trade::TradeResponse,
8};
9use super::types::{
10    BYBIT_API_URL, BYBIT_RATE_LIMITS, BYBIT_WS_URL_INVERSE, BYBIT_WS_URL_LINEAR,
11    BYBIT_WS_URL_OPTION, BYBIT_WS_URL_SPOT,
12};
13use anyhow::{Context, Result, anyhow};
14use async_trait::async_trait;
15use parking_lot::RwLock;
16use quanta::Clock;
17use reqwest::header::{HeaderMap, HeaderValue};
18use rusty_common::json::Value;
19use rusty_common::websocket::{Message, WebSocketClient, WebSocketConfig, WebSocketError};
20use rusty_model::{
21    data::{
22        book_snapshot::OrderBookSnapshot, market_trade::MarketTrade,
23        simd_orderbook::SharedSimdOrderBook,
24    },
25    instruments::{Instrument, InstrumentId},
26    venues::Venue,
27};
28use simd_json::prelude::{ValueAsArray, ValueAsObject, ValueAsScalar};
29use smallvec::SmallVec;
30use tokio::sync::{mpsc, watch};
31use tokio::task::JoinHandle;
32
33use crate::provider::prelude::*;
34
35/// Bybit V5 product categories
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum BybitCategory {
38    /// Spot trading
39    Spot,
40    /// Linear perpetuals and USDT futures
41    Linear,
42    /// Inverse contracts
43    Inverse,
44    /// Options
45    Option,
46}
47
48impl BybitCategory {
49    /// Get the appropriate WebSocket URL for this category
50    #[must_use]
51    pub const fn websocket_url(&self) -> &'static str {
52        match self {
53            Self::Spot => BYBIT_WS_URL_SPOT,
54            Self::Linear => BYBIT_WS_URL_LINEAR,
55            Self::Inverse => BYBIT_WS_URL_INVERSE,
56            Self::Option => BYBIT_WS_URL_OPTION,
57        }
58    }
59
60    /// Get the category string for V5 API calls
61    #[must_use]
62    pub const fn as_str(&self) -> &'static str {
63        match self {
64            Self::Spot => "spot",
65            Self::Linear => "linear",
66            Self::Inverse => "inverse",
67            Self::Option => "option",
68        }
69    }
70}
71
72impl Default for BybitCategory {
73    fn default() -> Self {
74        Self::Spot
75    }
76}
77
78/// Bybit V5 provider implementation
79///
80/// High-performance provider for Bybit V5 markets with WebSocket streaming
81/// and REST API support. Supports all V5 categories: spot, linear, inverse, option.
82/// Optimized for HFT applications with nanosecond precision.
83#[derive(Debug)]
84pub struct BybitProvider {
85    /// Connection configuration
86    config: ConnectionConfig,
87
88    /// WebSocket connection status
89    connection_status: Arc<RwLock<ConnectionState>>,
90
91    /// Connection statistics
92    stats: Arc<RwLock<ConnectionStats>>,
93
94    /// Shared clock for time synchronization
95    clock: Clock,
96
97    /// Active subscriptions
98    subscriptions: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
99
100    /// Task handles for WebSocket connections
101    ws_handles: Arc<RwLock<FxHashMap<String, JoinHandle<()>>>>,
102
103    /// Last connection attempt timestamp
104    last_connection_attempt: Arc<RwLock<Instant>>,
105
106    /// HTTP client for REST API requests
107    http_client: reqwest::Client,
108
109    /// V5 category (spot, linear, inverse, option)
110    category: BybitCategory,
111}
112
113impl Default for BybitProvider {
114    fn default() -> Self {
115        Self::new()
116    }
117}
118
119impl BybitProvider {
120    /// Get the current category
121    #[inline]
122    #[must_use]
123    pub const fn category(&self) -> BybitCategory {
124        self.category
125    }
126
127    /// Create a new Bybit provider with default configuration (Spot category)
128    ///
129    /// This method provides backward compatibility for code expecting a parameterless constructor.
130    /// For specific categories, use `new_with_category()` or the category-specific constructors.
131    #[inline]
132    #[must_use]
133    pub fn new() -> Self {
134        Self::new_with_category(BybitCategory::Spot)
135    }
136
137    /// Create a new Bybit provider with specified category and default configuration
138    #[inline]
139    #[must_use]
140    pub fn new_with_category(category: BybitCategory) -> Self {
141        Self::with_config(category, None)
142    }
143
144    /// Create a new Bybit spot provider with default configuration
145    #[inline]
146    #[must_use]
147    pub fn new_spot() -> Self {
148        Self::new_with_category(BybitCategory::Spot)
149    }
150
151    /// Create a new Bybit linear provider with default configuration
152    #[inline]
153    #[must_use]
154    pub fn new_linear() -> Self {
155        Self::new_with_category(BybitCategory::Linear)
156    }
157
158    /// Create a new Bybit inverse provider with default configuration
159    #[inline]
160    #[must_use]
161    pub fn new_inverse() -> Self {
162        Self::new_with_category(BybitCategory::Inverse)
163    }
164
165    /// Create a new Bybit option provider with default configuration
166    #[inline]
167    #[must_use]
168    pub fn new_option() -> Self {
169        Self::new_with_category(BybitCategory::Option)
170    }
171
172    /// Create a new Bybit provider with specified category and custom configuration
173    #[inline]
174    #[must_use]
175    pub fn with_config(category: BybitCategory, config: Option<ConnectionConfig>) -> Self {
176        let mut default_config = ConnectionConfig::default();
177
178        // Set default WebSocket configuration based on category
179        default_config.websocket_config.base_url = category.websocket_url().into();
180        default_config.websocket_config.ping_interval_milliseconds = 20000; // 20 seconds
181        default_config.websocket_config.use_compression = false;
182
183        // Set default REST API configuration
184        default_config.rest_config.base_url = BYBIT_API_URL.into();
185        default_config.rest_config.timeout_milliseconds = 5000; // 5 seconds
186
187        // Use custom config if provided
188        let config = config.unwrap_or(default_config);
189        let clock = config.clock.clone();
190
191        // Create HTTP client with appropriate settings
192        let mut headers = HeaderMap::new();
193        headers.insert(
194            "User-Agent",
195            HeaderValue::from_str(&config.rest_config.user_agent)
196                .unwrap_or_else(|_| HeaderValue::from_static("RustyHFT/1.0")),
197        );
198
199        let http_client = reqwest::Client::builder()
200            .timeout(Duration::from_millis(
201                config.rest_config.timeout_milliseconds,
202            ))
203            .connect_timeout(Duration::from_millis(
204                config.rest_config.timeout_milliseconds / 2,
205            ))
206            .pool_max_idle_per_host(config.rest_config.connection_pool_size)
207            .pool_idle_timeout(Duration::from_millis(
208                config.rest_config.keep_alive_milliseconds,
209            ))
210            .default_headers(headers)
211            .build()
212            .unwrap_or_default();
213
214        Self {
215            config,
216            connection_status: Arc::new(RwLock::new(ConnectionState::Disconnected)),
217            stats: Arc::new(RwLock::new(ConnectionStats::default())),
218            clock,
219            subscriptions: Arc::new(RwLock::new(FxHashMap::default())),
220            ws_handles: Arc::new(RwLock::new(FxHashMap::default())),
221            last_connection_attempt: Arc::new(RwLock::new(Instant::now())),
222            http_client,
223            category,
224        }
225    }
226
227    /// Helper function to update connection statistics upon message reception
228    #[inline]
229    fn update_receive_stats(
230        stats: Arc<RwLock<ConnectionStats>>,
231        message_size: usize,
232        local_time: u64,
233    ) {
234        let mut s = stats.write();
235        s.messages_received += 1;
236        s.bytes_received += message_size as u64;
237        s.last_message_time = local_time;
238    }
239
240    /// Helper function to create WebSocket request with proper headers
241    fn create_websocket_config(
242        url: String,
243        use_compression: bool,
244        ping_interval_milliseconds: u64,
245        timeout_milliseconds: u64,
246    ) -> WebSocketConfig {
247        WebSocketConfig::builder(rusty_common::types::Exchange::Bybit, url.to_string())
248            .ping_interval(Duration::from_millis(ping_interval_milliseconds))
249            .timeout(Duration::from_millis(timeout_milliseconds))
250            .compression(if use_compression {
251                rusty_common::websocket::CompressionConfig::default()
252            } else {
253                rusty_common::websocket::CompressionConfig::disabled()
254            })
255            .build()
256    }
257
258    /// Connect to the WebSocket API
259    #[inline]
260    async fn connect(&self) -> Result<()> {
261        if *self.connection_status.read() == ConnectionState::Connected {
262            return Ok(());
263        }
264
265        // Check if we need to respect reconnection backoff
266        {
267            let now = Instant::now();
268            let last_attempt = *self.last_connection_attempt.read();
269            let backoff_ms = self
270                .config
271                .websocket_config
272                .reconnect
273                .backoff_initial_milliseconds;
274
275            if now.duration_since(last_attempt) < Duration::from_millis(backoff_ms) {
276                // Wait until backoff period is over
277                tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
278            }
279
280            // Update last attempt time
281            *self.last_connection_attempt.write() = Instant::now();
282        }
283
284        *self.connection_status.write() = ConnectionState::Connecting;
285
286        // Note: Connection is now handled by individual trade/orderbook connections
287        // using WebSocketClient pattern. This method is kept for compatibility.
288        *self.connection_status.write() = ConnectionState::Connected;
289        self.stats.write().connected_time = self.clock.raw();
290        Ok(())
291    }
292}
293
294#[async_trait]
295impl Provider for BybitProvider {
296    type TradeMessage = TradeResponse;
297    type DepthMessage = OrderbookResponse;
298    type InstrumentMessage = Value;
299
300    fn name(&self) -> &'static str {
301        "Bybit"
302    }
303
304    fn venue(&self) -> Venue {
305        Venue::Bybit
306    }
307
308    fn config(&self) -> &ConnectionConfig {
309        &self.config
310    }
311
312    async fn init(&mut self) -> Result<()> {
313        self.connect().await
314    }
315
316    async fn shutdown(&mut self) -> Result<()> {
317        // Unsubscribe from all active subscriptions
318        let mut keys_to_remove = Vec::new();
319
320        for (key, _) in self.subscriptions.read().iter() {
321            keys_to_remove.push(key.clone());
322        }
323
324        // Send stop signal for each subscription
325        for key in keys_to_remove {
326            if let Some(tx) = self.subscriptions.write().remove(&key) {
327                let _ = tx.send(true);
328            }
329
330            // Abort task
331            if let Some(handle) = self.ws_handles.write().remove(&key) {
332                handle.abort();
333            }
334        }
335
336        // Set connection status to disconnected
337        *self.connection_status.write() = ConnectionState::Disconnected;
338
339        Ok(())
340    }
341
342    #[inline]
343    async fn subscribe_trades(
344        &self,
345        symbols: SmallVec<[String; 8]>,
346    ) -> Result<mpsc::Receiver<Self::TradeMessage>> {
347        // Ensure connection is established
348        self.connect().await?;
349
350        // Create channel for trade messages
351        let (tx, rx) = mpsc::channel(1024);
352
353        // Create subscription message
354        let subscription = SubscriptionRequest::trades(symbols.to_vec());
355        let connection_id = format!("trade-{}", symbols.join(","));
356
357        // Create WebSocket connection and handle
358        let handle = self
359            .create_trade_connection(symbols, tx, subscription)
360            .await?;
361
362        // Save task handle
363        self.ws_handles.write().insert(connection_id.into(), handle);
364
365        Ok(rx)
366    }
367
368    #[inline]
369    async fn unsubscribe_trades(&self) -> Result<()> {
370        // Find all trade subscriptions
371        let mut keys_to_remove = Vec::new();
372
373        for (key, _) in self.subscriptions.read().iter() {
374            if key.starts_with("trade-") {
375                keys_to_remove.push(key.clone());
376            }
377        }
378
379        // Send stop signal for each subscription
380        for key in keys_to_remove {
381            if let Some(tx) = self.subscriptions.write().remove(&key) {
382                let _ = tx.send(true);
383            }
384
385            // Abort task
386            if let Some(handle) = self.ws_handles.write().remove(&key) {
387                handle.abort();
388            }
389        }
390
391        Ok(())
392    }
393
394    #[inline]
395    async fn subscribe_orderbook(
396        &self,
397        symbols: SmallVec<[String; 8]>,
398    ) -> Result<mpsc::Receiver<Self::DepthMessage>> {
399        // Ensure connection is established
400        self.connect().await?;
401
402        // Create channel for orderbook messages
403        let (tx, rx) = mpsc::channel(1024);
404
405        // Create subscription message (depth 50 levels)
406        let subscription = SubscriptionRequest::orderbook(symbols.to_vec(), 50);
407        let connection_id = format!("orderbook-{}", symbols.join(","));
408
409        // Create WebSocket connection and handle
410        let handle = self
411            .create_orderbook_connection(symbols, tx, subscription)
412            .await?;
413
414        // Save task handle
415        self.ws_handles.write().insert(connection_id.into(), handle);
416
417        Ok(rx)
418    }
419
420    #[inline]
421    async fn unsubscribe_orderbook(&self) -> Result<()> {
422        // Find all orderbook subscriptions
423        let mut keys_to_remove = Vec::new();
424
425        for (key, _) in self.subscriptions.read().iter() {
426            if key.starts_with("orderbook-") {
427                keys_to_remove.push(key.clone());
428            }
429        }
430
431        // Send stop signal for each subscription
432        for key in keys_to_remove {
433            if let Some(tx) = self.subscriptions.write().remove(&key) {
434                let _ = tx.send(true);
435            }
436
437            // Abort task
438            if let Some(handle) = self.ws_handles.write().remove(&key) {
439                handle.abort();
440            }
441        }
442
443        Ok(())
444    }
445
446    #[inline]
447    async fn get_instruments(&self) -> Result<Vec<Box<dyn Instrument>>> {
448        // Fetch instruments from Bybit V5 API with category parameter
449        let url = format!(
450            "{}/v5/market/instruments-info?category={}",
451            self.config.rest_config.base_url,
452            self.category.as_str()
453        );
454
455        let response = self
456            .http_client
457            .get(&url)
458            .send()
459            .await
460            .context("Failed to fetch Bybit instruments info")?;
461
462        // Check if the request was successful
463        if !response.status().is_success() {
464            return Err(anyhow!(
465                "Failed to fetch Bybit instruments info: HTTP {}",
466                response.status()
467            ));
468        }
469
470        // Parse the response using simd_json
471        let bytes = response
472            .bytes()
473            .await
474            .context("Failed to get response bytes")?;
475        let mut bytes_vec = bytes.to_vec();
476        let instruments_info: simd_json::OwnedValue = simd_json::from_slice(&mut bytes_vec)
477            .context("Failed to parse Bybit instruments info response")?;
478
479        // Extract the instruments from the response
480        let result = instruments_info["result"].as_object().ok_or_else(|| {
481            anyhow!("Invalid response format: 'result' field not found or not an object")
482        })?;
483
484        let list = result["list"].as_array().ok_or_else(|| {
485            anyhow!("Invalid response format: 'list' field not found or not an array")
486        })?;
487
488        // Convert to instruments
489        let mut instruments = Vec::with_capacity(list.len());
490
491        for instrument_data in list {
492            // Skip instruments that are not trading
493            let status = instrument_data["status"].as_str().unwrap_or("");
494            if status != "Trading" {
495                continue;
496            }
497
498            let symbol = instrument_data["symbol"].as_str().ok_or_else(|| {
499                anyhow!("Invalid instrument data: 'symbol' field not found or not a String")
500            })?;
501
502            // Create a basic instrument implementation
503            let instrument_id = InstrumentId::new(symbol, Venue::Bybit);
504            let instrument = Box::new(BybitInstrument { id: instrument_id }) as Box<dyn Instrument>;
505            instruments.push(instrument);
506        }
507
508        Ok(instruments)
509    }
510
511    #[inline]
512    async fn get_historical_trades(
513        &self,
514        _symbol: &str,
515        _limit: Option<u32>,
516    ) -> Result<Vec<MarketTrade>> {
517        // Fetch historical trades from Bybit API
518        // Not fully implemented yet
519        Err(anyhow!("Historical trades not implemented yet"))
520    }
521
522    #[inline]
523    async fn get_orderbook_snapshot(
524        &self,
525        symbol: &str,
526        depth: Option<u32>,
527    ) -> Result<OrderBookSnapshot> {
528        let limit = depth.unwrap_or(50);
529        let url = format!(
530            "{}/v5/market/orderbook?category={}&symbol={}&limit={}",
531            self.config.rest_config.base_url,
532            self.category.as_str(),
533            symbol,
534            limit
535        );
536
537        let response = self.http_client.get(&url).send().await?;
538
539        if !response.status().is_success() {
540            let error_text = response.text().await?;
541            return Err(anyhow!(
542                "Failed to fetch orderbook snapshot: {}",
543                error_text
544            ));
545        }
546
547        // Parse the response using simd_json
548        let bytes = response
549            .bytes()
550            .await
551            .map_err(|e| anyhow!("Failed to get response bytes: {}", e))?;
552        let mut bytes_vec = bytes.to_vec();
553        let snapshot_data: simd_json::OwnedValue = simd_json::from_slice(&mut bytes_vec)
554            .map_err(|e| anyhow!("Failed to parse JSON: {}", e))?;
555
556        // Create OrderBookSnapshot with current timestamp (nanoseconds since epoch)
557        let timestamp = self.clock.raw();
558        let instrument_id = InstrumentId::new(symbol, Venue::Bybit);
559
560        let result = snapshot_data["result"]
561            .as_object()
562            .ok_or_else(|| anyhow!("Invalid response format: 'result' field not found"))?;
563
564        let update_id = result["u"].as_u64().unwrap_or(0);
565        let mut order_book_snapshot =
566            OrderBookSnapshot::new_empty(instrument_id, timestamp, update_id);
567
568        // Parse bids
569        if let Some(bids) = result["b"].as_array() {
570            for bid in bids {
571                if let Some(bid_array) = bid.as_array()
572                    && bid_array.len() >= 2
573                    && let (Some(price_str), Some(quantity_str)) =
574                        (bid_array[0].as_str(), bid_array[1].as_str())
575                    && let (Ok(price), Ok(quantity)) = (
576                        price_str.parse::<rust_decimal::Decimal>(),
577                        quantity_str.parse::<rust_decimal::Decimal>(),
578                    )
579                {
580                    order_book_snapshot.add_bid(price, quantity);
581                }
582            }
583        }
584
585        // Parse asks
586        if let Some(asks) = result["a"].as_array() {
587            for ask in asks {
588                if let Some(ask_array) = ask.as_array()
589                    && ask_array.len() >= 2
590                    && let (Some(price_str), Some(quantity_str)) =
591                        (ask_array[0].as_str(), ask_array[1].as_str())
592                    && let (Ok(price), Ok(quantity)) = (
593                        price_str.parse::<rust_decimal::Decimal>(),
594                        quantity_str.parse::<rust_decimal::Decimal>(),
595                    )
596                {
597                    order_book_snapshot.add_ask(price, quantity);
598                }
599            }
600        }
601
602        Ok(order_book_snapshot)
603    }
604
605    #[inline]
606    async fn get_realtime_orderbook(&self, _symbol: &str) -> Result<SharedSimdOrderBook> {
607        // Not yet implemented
608        Err(anyhow!("Real-time orderbook not implemented yet"))
609    }
610
611    #[inline]
612    async fn is_connected(&self) -> bool {
613        *self.connection_status.read() == ConnectionState::Connected
614    }
615
616    #[inline]
617    async fn connection_status(&self) -> ConnectionState {
618        *self.connection_status.read()
619    }
620
621    #[inline]
622    async fn get_stats(&self) -> ConnectionStats {
623        self.stats.read().clone()
624    }
625
626    #[inline]
627    async fn ping(&self) -> Result<u64> {
628        // For WebSocketClient pattern, ping/pong is handled automatically
629        // Return a simple network latency test via HTTP
630        let start = self.clock.raw();
631
632        let response = self
633            .http_client
634            .get(format!(
635                "{}/v5/market/time",
636                self.config.rest_config.base_url
637            ))
638            .send()
639            .await?;
640
641        if response.status().is_success() {
642            Ok(self.clock.raw().saturating_sub(start))
643        } else {
644            Err(anyhow!("Ping failed with status: {}", response.status()))
645        }
646    }
647
648    #[inline]
649    async fn reset_connection(&self) -> Result<()> {
650        // First, unsubscribe all active subscriptions
651        let mut keys_to_remove = Vec::new();
652
653        for (key, _) in self.subscriptions.read().iter() {
654            keys_to_remove.push(key.clone());
655        }
656
657        // Send stop signal for each subscription
658        for key in &keys_to_remove {
659            if let Some(tx) = self.subscriptions.write().remove(key) {
660                let _ = tx.send(true);
661            }
662
663            // Abort task
664            if let Some(handle) = self.ws_handles.write().remove(key) {
665                handle.abort();
666            }
667        }
668
669        // Set status to disconnected
670        *self.connection_status.write() = ConnectionState::Disconnected;
671
672        // Reset stats
673        *self.stats.write() = ConnectionStats::default();
674
675        // Connect again
676        self.connect().await
677    }
678
679    #[inline]
680    fn get_rate_limits(&self) -> Vec<RateLimit> {
681        BYBIT_RATE_LIMITS.to_vec()
682    }
683}
684
685/// Simple instrument implementation for Bybit
686#[derive(Debug, Clone)]
687pub struct BybitInstrument {
688    /// Unique instrument identifier
689    pub id: InstrumentId,
690}
691
692impl Instrument for BybitInstrument {
693    fn id(&self) -> InstrumentId {
694        self.id.clone()
695    }
696
697    fn symbol(&self) -> String {
698        self.id.symbol.clone()
699    }
700
701    fn venue(&self) -> Venue {
702        self.id.venue
703    }
704
705    fn as_any(&self) -> &dyn std::any::Any {
706        self
707    }
708
709    fn clone_box(&self) -> Box<dyn Instrument> {
710        Box::new(self.clone())
711    }
712}
713
714impl BybitProvider {
715    /// Create a WebSocket connection for trade data
716    #[inline]
717    async fn create_trade_connection(
718        &self,
719        symbols: SmallVec<[String; 8]>,
720        tx: mpsc::Sender<TradeResponse>,
721        subscription: SubscriptionRequest,
722    ) -> Result<JoinHandle<()>> {
723        let connection_id = format!("trade-{}", symbols.join(","));
724
725        // Create stop signal
726        let (stop_tx, stop_rx) = watch::channel(false);
727        self.subscriptions
728            .write()
729            .insert(connection_id.clone().into(), stop_tx);
730
731        // Clone parameters for the task
732        let url = self.config.websocket_config.base_url.clone();
733        let clock = self.clock.clone();
734        let timeout_milliseconds = self.config.websocket_config.timeout_milliseconds;
735        let ping_interval_milliseconds = self.config.websocket_config.ping_interval_milliseconds;
736        let stats = self.stats.clone();
737        let connection_status = self.connection_status.clone();
738        let use_compression = self.config.websocket_config.use_compression;
739
740        // Spawn a task for the WebSocket connection
741        let handle = tokio::spawn(async move {
742            loop {
743                // Check for stop signal
744                if *stop_rx.borrow() {
745                    break;
746                }
747
748                // Create WebSocket configuration
749                let ws_config = Self::create_websocket_config(
750                    url.clone(),
751                    use_compression,
752                    ping_interval_milliseconds,
753                    timeout_milliseconds,
754                );
755
756                // Create WebSocket client
757                let mut client = WebSocketClient::new(ws_config);
758
759                // Update connection status
760                *connection_status.write() = ConnectionState::Connecting;
761
762                // Create a simple message handler
763                let handler = BybitMessageHandler::new_trade_handler(
764                    clock.clone(),
765                    stats.clone(),
766                    tx.clone(),
767                    subscription.clone(),
768                );
769
770                // Run the WebSocket client
771                if let Err(e) = client.run(handler).await {
772                    log::error!("WebSocket client error: {e}");
773                    *connection_status.write() = ConnectionState::Error;
774                }
775
776                // Check for stop signal before reconnecting
777                if *stop_rx.borrow() {
778                    break;
779                }
780
781                // Reconnect after a delay
782                tokio::time::sleep(Duration::from_millis(1000)).await;
783            }
784        });
785
786        Ok(handle)
787    }
788
789    /// Create a WebSocket connection for orderbook data
790    #[inline]
791    async fn create_orderbook_connection(
792        &self,
793        symbols: SmallVec<[String; 8]>,
794        tx: mpsc::Sender<OrderbookResponse>,
795        subscription: SubscriptionRequest,
796    ) -> Result<JoinHandle<()>> {
797        let connection_id = format!("orderbook-{}", symbols.join(","));
798
799        // Create stop signal
800        let (stop_tx, stop_rx) = watch::channel(false);
801        self.subscriptions
802            .write()
803            .insert(connection_id.clone().into(), stop_tx);
804
805        // Clone parameters for the task
806        let url = self.config.websocket_config.base_url.clone();
807        let clock = self.clock.clone();
808        let timeout_milliseconds = self.config.websocket_config.timeout_milliseconds;
809        let ping_interval_milliseconds = self.config.websocket_config.ping_interval_milliseconds;
810        let stats = self.stats.clone();
811        let connection_status = self.connection_status.clone();
812        let use_compression = self.config.websocket_config.use_compression;
813
814        // Spawn a task for the WebSocket connection
815        let handle = tokio::spawn(async move {
816            loop {
817                // Check for stop signal
818                if *stop_rx.borrow() {
819                    break;
820                }
821
822                // Create WebSocket configuration
823                let ws_config = Self::create_websocket_config(
824                    url.clone(),
825                    use_compression,
826                    ping_interval_milliseconds,
827                    timeout_milliseconds,
828                );
829
830                // Create WebSocket client
831                let mut client = WebSocketClient::new(ws_config);
832
833                // Update connection status
834                *connection_status.write() = ConnectionState::Connecting;
835
836                // Create a simple message handler
837                let handler = BybitMessageHandler::new_orderbook_handler(
838                    clock.clone(),
839                    stats.clone(),
840                    tx.clone(),
841                    subscription.clone(),
842                );
843
844                // Run the WebSocket client
845                if let Err(e) = client.run(handler).await {
846                    log::error!("WebSocket client error: {e}");
847                    *connection_status.write() = ConnectionState::Error;
848                }
849
850                // Check for stop signal before reconnecting
851                if *stop_rx.borrow() {
852                    break;
853                }
854
855                // Reconnect after a delay
856                tokio::time::sleep(Duration::from_millis(1000)).await;
857            }
858        });
859
860        Ok(handle)
861    }
862}
863
864/// Simple message handler for Bybit WebSocket messages
865#[derive(Debug)]
866struct BybitMessageHandler {
867    handler_type: HandlerType,
868    clock: Clock,
869    stats: Arc<RwLock<ConnectionStats>>,
870    trade_tx: Option<mpsc::Sender<TradeResponse>>,
871    orderbook_tx: Option<mpsc::Sender<OrderbookResponse>>,
872    subscription: SubscriptionRequest,
873    sender: Option<mpsc::UnboundedSender<Message>>,
874}
875
876#[derive(Debug)]
877enum HandlerType {
878    Trade,
879    Orderbook,
880}
881
882impl BybitMessageHandler {
883    const fn new_trade_handler(
884        clock: Clock,
885        stats: Arc<RwLock<ConnectionStats>>,
886        tx: mpsc::Sender<TradeResponse>,
887        subscription: SubscriptionRequest,
888    ) -> Self {
889        Self {
890            handler_type: HandlerType::Trade,
891            clock,
892            stats,
893            trade_tx: Some(tx),
894            orderbook_tx: None,
895            subscription,
896            sender: None,
897        }
898    }
899
900    const fn new_orderbook_handler(
901        clock: Clock,
902        stats: Arc<RwLock<ConnectionStats>>,
903        tx: mpsc::Sender<OrderbookResponse>,
904        subscription: SubscriptionRequest,
905    ) -> Self {
906        Self {
907            handler_type: HandlerType::Orderbook,
908            clock,
909            stats,
910            trade_tx: None,
911            orderbook_tx: Some(tx),
912            subscription,
913            sender: None,
914        }
915    }
916}
917
918#[async_trait]
919impl rusty_common::websocket::MessageHandler for BybitMessageHandler {
920    async fn on_message(&mut self, message: Message) -> std::result::Result<(), WebSocketError> {
921        let local_time = self.clock.raw();
922
923        // Only process text messages
924        if let Some(text) = message.as_text() {
925            BybitProvider::update_receive_stats(self.stats.clone(), text.len(), local_time);
926
927            // PERFORMANCE: Avoid heap allocation in hot path
928            // First, do a quick check for the topic without full parsing
929            // This avoids parsing messages we don't care about
930            let should_process = match self.handler_type {
931                HandlerType::Trade => text.contains("\"topic\":\"publicTrade"),
932                HandlerType::Orderbook => text.contains("\"topic\":\"orderbook"),
933            };
934
935            if should_process {
936                // Use a stack-allocated buffer for small messages
937                // Falls back to heap allocation for larger messages
938                const STACK_BUFFER_SIZE: usize = 8192;
939
940                if text.len() <= STACK_BUFFER_SIZE {
941                    // Stack allocation path for small messages (most common case)
942                    let mut stack_buffer = [0u8; STACK_BUFFER_SIZE];
943                    let bytes = text.as_bytes();
944                    stack_buffer[..bytes.len()].copy_from_slice(bytes);
945                    let buffer_slice = &mut stack_buffer[..bytes.len()];
946
947                    match self.handler_type {
948                        HandlerType::Trade => {
949                            if let Ok(trade_response) =
950                                simd_json::from_slice::<TradeResponse>(buffer_slice)
951                                && let Some(ref tx) = self.trade_tx
952                                && let Err(e) = tx.send(trade_response).await
953                            {
954                                log::warn!("Failed to send trade message: {e}");
955                            }
956                        }
957                        HandlerType::Orderbook => {
958                            if let Ok(orderbook_response) =
959                                simd_json::from_slice::<OrderbookResponse>(buffer_slice)
960                                && let Some(ref tx) = self.orderbook_tx
961                                && let Err(e) = tx.send(orderbook_response).await
962                            {
963                                log::warn!("Failed to send orderbook message: {e}");
964                            }
965                        }
966                    }
967                } else {
968                    // Heap allocation path for large messages (rare case)
969                    let mut message_bytes = text.as_bytes().to_vec();
970
971                    match self.handler_type {
972                        HandlerType::Trade => {
973                            if let Ok(trade_response) =
974                                simd_json::from_slice::<TradeResponse>(&mut message_bytes)
975                                && let Some(ref tx) = self.trade_tx
976                                && let Err(e) = tx.send(trade_response).await
977                            {
978                                log::warn!("Failed to send trade message: {e}");
979                            }
980                        }
981                        HandlerType::Orderbook => {
982                            if let Ok(orderbook_response) =
983                                simd_json::from_slice::<OrderbookResponse>(&mut message_bytes)
984                                && let Some(ref tx) = self.orderbook_tx
985                                && let Err(e) = tx.send(orderbook_response).await
986                            {
987                                log::warn!("Failed to send orderbook message: {e}");
988                            }
989                        }
990                    }
991                }
992            }
993        }
994
995        Ok(())
996    }
997
998    async fn on_connected(&mut self) -> std::result::Result<(), WebSocketError> {
999        // Send subscription message after connection
1000        let subscription_json = simd_json::to_string(&self.subscription).map_err(|e| {
1001            WebSocketError::MessageProcessingError(format!("Failed to serialize subscription: {e}"))
1002        })?;
1003
1004        log::info!("Sending subscription: {subscription_json}");
1005
1006        // Actually send the subscription message
1007        let message = Message::Text(subscription_json.into());
1008        self.send_message(message)?;
1009
1010        Ok(())
1011    }
1012
1013    async fn on_disconnected(&mut self) -> std::result::Result<(), WebSocketError> {
1014        log::info!("Disconnected from Bybit WebSocket");
1015        Ok(())
1016    }
1017
1018    async fn on_error(&mut self, error: WebSocketError) -> std::result::Result<(), WebSocketError> {
1019        log::error!("Bybit WebSocket error: {error}");
1020        Ok(())
1021    }
1022
1023    fn set_sender(&mut self, sender: mpsc::UnboundedSender<Message>) {
1024        self.sender = Some(sender);
1025    }
1026
1027    fn send_message(&self, message: Message) -> std::result::Result<(), WebSocketError> {
1028        match &self.sender {
1029            Some(sender) => {
1030                sender
1031                    .send(message)
1032                    .map_err(|_| WebSocketError::NotConnected)?;
1033                Ok(())
1034            }
1035            None => Err(WebSocketError::NotConnected),
1036        }
1037    }
1038}