1use rusty_common::collections::FxHashMap;
7use smartstring::alias::String;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use super::data::{MarketTradeResponse, orderbook::OrderbookResponse};
12use anyhow::{Context, Result, anyhow};
13use async_trait::async_trait;
14use parking_lot::RwLock;
15use quanta::Clock;
16use reqwest::header::{HeaderMap, HeaderValue};
17use rusty_common::json::Value;
18use rusty_common::websocket::{Message, WebSocketClient, WebSocketConfig, WebSocketError};
19use rusty_model::{
20 data::{
21 book_snapshot::OrderBookSnapshot, market_trade::MarketTrade,
22 simd_orderbook::SharedSimdOrderBook,
23 },
24 instruments::{Instrument, InstrumentId},
25 venues::Venue,
26};
27use simd_json::prelude::{ValueAsArray, ValueAsScalar, ValueObjectAccess};
28use smallvec::SmallVec;
29use tokio::sync::{mpsc, watch};
30use tokio::task::JoinHandle;
31
32use crate::provider::prelude::*;
33
34const BYBIT_FUTURES_API_URL: &str = "https://api.bybit.com";
36const BYBIT_FUTURES_WS_URL: &str = "wss://stream.bybit.com/v5/public/linear";
37const _BYBIT_FUTURES_TESTNET_API_URL: &str = "https://api-testnet.bybit.com";
38const _BYBIT_FUTURES_TESTNET_WS_URL: &str = "wss://stream-testnet.bybit.com/v5/public/linear";
39
40const BYBIT_FUTURES_RATE_LIMITS: &[RateLimit] = &[
42 RateLimit {
43 limit_type: "REQUEST_WEIGHT",
44 interval: "MINUTE",
45 interval_num: 1,
46 limit: 120,
47 },
48 RateLimit {
49 limit_type: "ORDERS",
50 interval: "MINUTE",
51 interval_num: 1,
52 limit: 100,
53 },
54];
55
56#[derive(Debug, Clone)]
58pub struct BybitFuturesInstrument {
59 pub id: InstrumentId,
61}
62
63impl Instrument for BybitFuturesInstrument {
64 fn id(&self) -> InstrumentId {
65 self.id.clone()
66 }
67
68 fn symbol(&self) -> String {
69 self.id.symbol.clone()
70 }
71
72 fn venue(&self) -> Venue {
73 self.id.venue
74 }
75
76 fn as_any(&self) -> &dyn std::any::Any {
77 self
78 }
79
80 fn clone_box(&self) -> Box<dyn Instrument> {
81 Box::new(self.clone())
82 }
83}
84
85#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
87struct SubscriptionMessage {
88 op: String,
89 args: Vec<String>,
90}
91
92impl SubscriptionMessage {
93 fn trades(symbols: Vec<String>) -> Self {
95 let args = symbols
96 .into_iter()
97 .map(|s| format!("publicTrade.{s}").into())
98 .collect();
99
100 Self {
101 op: "subscribe".into(),
102 args,
103 }
104 }
105
106 fn orderbook(symbols: Vec<String>) -> Self {
108 let args = symbols
109 .into_iter()
110 .map(|s| format!("orderbook.50.{s}").into())
111 .collect();
112
113 Self {
114 op: "subscribe".into(),
115 args,
116 }
117 }
118}
119
120#[derive(Debug)]
125pub struct BybitFuturesProvider {
126 config: ConnectionConfig,
128
129 connection_status: Arc<RwLock<ConnectionState>>,
131
132 stats: Arc<RwLock<ConnectionStats>>,
134
135 clock: Clock,
137
138 subscriptions: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
140
141 ws_handles: Arc<RwLock<FxHashMap<String, JoinHandle<()>>>>,
143
144 last_connection_attempt: Arc<RwLock<Instant>>,
146
147 http_client: reqwest::Client,
149}
150
151impl Default for BybitFuturesProvider {
152 fn default() -> Self {
153 Self::new()
154 }
155}
156
157impl BybitFuturesProvider {
158 #[inline]
160 #[must_use]
161 pub fn new() -> Self {
162 Self::with_config(None)
163 }
164
165 #[inline]
167 #[must_use]
168 pub fn with_config(config: Option<ConnectionConfig>) -> Self {
169 let mut default_config = ConnectionConfig::default();
170
171 default_config.websocket_config.base_url = BYBIT_FUTURES_WS_URL.into();
173 default_config.websocket_config.ping_interval_milliseconds = 20000; default_config.websocket_config.use_compression = false;
175
176 default_config.rest_config.base_url = BYBIT_FUTURES_API_URL.into();
178 default_config.rest_config.timeout_milliseconds = 5000; let config = config.unwrap_or(default_config);
182 let clock = config.clock.clone();
183
184 let mut headers = HeaderMap::new();
186 headers.insert(
187 "User-Agent",
188 HeaderValue::from_str(&config.rest_config.user_agent)
189 .unwrap_or_else(|_| HeaderValue::from_static("RustyHFT/1.0")),
190 );
191
192 let http_client = reqwest::Client::builder()
193 .timeout(Duration::from_millis(
194 config.rest_config.timeout_milliseconds,
195 ))
196 .connect_timeout(Duration::from_millis(
197 config.rest_config.timeout_milliseconds / 2,
198 ))
199 .pool_max_idle_per_host(config.rest_config.connection_pool_size)
200 .pool_idle_timeout(Duration::from_millis(
201 config.rest_config.keep_alive_milliseconds,
202 ))
203 .default_headers(headers)
204 .build()
205 .unwrap_or_default();
206
207 Self {
208 config,
209 connection_status: Arc::new(RwLock::new(ConnectionState::Disconnected)),
210 stats: Arc::new(RwLock::new(ConnectionStats::default())),
211 clock,
212 subscriptions: Arc::new(RwLock::new(FxHashMap::default())),
213 ws_handles: Arc::new(RwLock::new(FxHashMap::default())),
214 last_connection_attempt: Arc::new(RwLock::new(Instant::now())),
215 http_client,
216 }
217 }
218
219 #[inline]
221 fn update_receive_stats(
222 stats: Arc<RwLock<ConnectionStats>>,
223 message_size: usize,
224 local_time: u64,
225 ) {
226 let mut s = stats.write();
227 s.messages_received += 1;
228 s.bytes_received += message_size as u64;
229 s.last_message_time = local_time;
230 }
231
232 fn create_websocket_config(
234 url: String,
235 use_compression: bool,
236 ping_interval_milliseconds: u64,
237 timeout_milliseconds: u64,
238 ) -> WebSocketConfig {
239 WebSocketConfig::builder(rusty_common::types::Exchange::Bybit, url.to_string())
240 .ping_interval(Duration::from_millis(ping_interval_milliseconds))
241 .timeout(Duration::from_millis(timeout_milliseconds))
242 .compression(if use_compression {
243 rusty_common::websocket::CompressionConfig::default()
244 } else {
245 rusty_common::websocket::CompressionConfig::disabled()
246 })
247 .build()
248 }
249
250 #[inline]
252 async fn connect(&self) -> Result<()> {
253 if *self.connection_status.read() == ConnectionState::Connected {
254 return Ok(());
255 }
256
257 {
259 let now = Instant::now();
260 let last_attempt = *self.last_connection_attempt.read();
261 let backoff_ms = self
262 .config
263 .websocket_config
264 .reconnect
265 .backoff_initial_milliseconds;
266
267 if now.duration_since(last_attempt) < Duration::from_millis(backoff_ms) {
268 tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
270 }
271
272 *self.last_connection_attempt.write() = Instant::now();
274 }
275
276 *self.connection_status.write() = ConnectionState::Connecting;
277
278 *self.connection_status.write() = ConnectionState::Connected;
281 self.stats.write().connected_time = self.clock.raw();
282 Ok(())
283 }
284
285 #[inline]
287 async fn create_trade_connection(
288 &self,
289 symbols: SmallVec<[String; 8]>,
290 tx: mpsc::Sender<MarketTradeResponse>,
291 subscription: SubscriptionMessage,
292 ) -> Result<JoinHandle<()>> {
293 let connection_id = format!("trade-{}", symbols.join(","));
294
295 let (stop_tx, stop_rx) = watch::channel(false);
297 self.subscriptions
298 .write()
299 .insert(connection_id.clone().into(), stop_tx);
300
301 let url = self.config.websocket_config.base_url.clone();
303 let clock = self.clock.clone();
304 let timeout_milliseconds = self.config.websocket_config.timeout_milliseconds;
305 let ping_interval_milliseconds = self.config.websocket_config.ping_interval_milliseconds;
306 let stats = self.stats.clone();
307 let connection_status = self.connection_status.clone();
308 let use_compression = self.config.websocket_config.use_compression;
309
310 let handle = tokio::spawn(async move {
312 loop {
313 if *stop_rx.borrow() {
315 break;
316 }
317
318 let ws_config = Self::create_websocket_config(
320 url.clone(),
321 use_compression,
322 ping_interval_milliseconds,
323 timeout_milliseconds,
324 );
325
326 let mut client = WebSocketClient::new(ws_config);
328
329 *connection_status.write() = ConnectionState::Connecting;
331
332 let handler = BybitFuturesMessageHandler::new_trade_handler(
334 clock.clone(),
335 stats.clone(),
336 tx.clone(),
337 subscription.clone(),
338 );
339
340 if let Err(e) = client.run(handler).await {
342 log::error!("WebSocket client error: {e}");
343 *connection_status.write() = ConnectionState::Error;
344 }
345
346 if *stop_rx.borrow() {
348 break;
349 }
350
351 tokio::time::sleep(Duration::from_millis(1000)).await;
353 }
354 });
355
356 Ok(handle)
357 }
358
359 #[inline]
361 async fn create_orderbook_connection(
362 &self,
363 symbols: SmallVec<[String; 8]>,
364 tx: mpsc::Sender<OrderbookResponse>,
365 subscription: SubscriptionMessage,
366 ) -> Result<JoinHandle<()>> {
367 let connection_id = format!("orderbook-{}", symbols.join(","));
368
369 let (stop_tx, stop_rx) = watch::channel(false);
371 self.subscriptions
372 .write()
373 .insert(connection_id.clone().into(), stop_tx);
374
375 let url = self.config.websocket_config.base_url.clone();
377 let clock = self.clock.clone();
378 let timeout_milliseconds = self.config.websocket_config.timeout_milliseconds;
379 let ping_interval_milliseconds = self.config.websocket_config.ping_interval_milliseconds;
380 let stats = self.stats.clone();
381 let connection_status = self.connection_status.clone();
382 let use_compression = self.config.websocket_config.use_compression;
383
384 let handle = tokio::spawn(async move {
386 loop {
387 if *stop_rx.borrow() {
389 break;
390 }
391
392 let ws_config = Self::create_websocket_config(
394 url.clone(),
395 use_compression,
396 ping_interval_milliseconds,
397 timeout_milliseconds,
398 );
399
400 let mut client = WebSocketClient::new(ws_config);
402
403 *connection_status.write() = ConnectionState::Connecting;
405
406 let handler = BybitFuturesMessageHandler::new_orderbook_handler(
408 clock.clone(),
409 stats.clone(),
410 tx.clone(),
411 subscription.clone(),
412 );
413
414 if let Err(e) = client.run(handler).await {
416 log::error!("WebSocket client error: {e}");
417 *connection_status.write() = ConnectionState::Error;
418 }
419
420 if *stop_rx.borrow() {
422 break;
423 }
424
425 tokio::time::sleep(Duration::from_millis(1000)).await;
427 }
428 });
429
430 Ok(handle)
431 }
432
433 async fn fetch_orderbook_snapshot(
435 &self,
436 symbol: &str,
437 depth: Option<u32>,
438 ) -> Result<OrderBookSnapshot> {
439 let limit = depth.unwrap_or(50);
440 let url = format!(
441 "{}/v5/market/orderbook?category=linear&symbol={}&limit={}",
442 self.config.rest_config.base_url, symbol, limit
443 );
444
445 let response = self.http_client.get(&url).send().await?;
446
447 if !response.status().is_success() {
448 let error_text = response.text().await?;
449 return Err(anyhow!(
450 "Failed to fetch orderbook snapshot: {}",
451 error_text
452 ));
453 }
454
455 let bytes = response
457 .bytes()
458 .await
459 .context("Failed to get response bytes")?;
460 let mut bytes_vec = bytes.to_vec();
461 let response_json: simd_json::OwnedValue =
462 simd_json::from_slice(&mut bytes_vec).context("Failed to parse JSON")?;
463
464 let result = response_json
466 .get("result")
467 .ok_or_else(|| anyhow!("Invalid response format"))?;
468
469 let update_id = result.get("u").and_then(|v| v.as_u64()).unwrap_or(0);
470
471 let bids = result
472 .get("b")
473 .and_then(|b| b.as_array())
474 .ok_or_else(|| anyhow!("Missing bids"))?;
475
476 let asks = result
477 .get("a")
478 .and_then(|a| a.as_array())
479 .ok_or_else(|| anyhow!("Missing asks"))?;
480
481 let timestamp = self.clock.raw();
483 let instrument_id = InstrumentId::new(symbol, Venue::Bybit);
484
485 let mut order_book_snapshot =
486 OrderBookSnapshot::new_empty(instrument_id, timestamp, update_id);
487
488 for bid in bids {
490 if let Some(bid_array) = bid.as_array()
491 && bid_array.len() >= 2
492 && let (Some(price_str), Some(quantity_str)) =
493 (bid_array[0].as_str(), bid_array[1].as_str())
494 && let (Ok(price), Ok(quantity)) = (
495 price_str.parse::<rust_decimal::Decimal>(),
496 quantity_str.parse::<rust_decimal::Decimal>(),
497 )
498 {
499 order_book_snapshot.add_bid(price, quantity);
500 }
501 }
502
503 for ask in asks {
505 if let Some(ask_array) = ask.as_array()
506 && ask_array.len() >= 2
507 && let (Some(price_str), Some(quantity_str)) =
508 (ask_array[0].as_str(), ask_array[1].as_str())
509 && let (Ok(price), Ok(quantity)) = (
510 price_str.parse::<rust_decimal::Decimal>(),
511 quantity_str.parse::<rust_decimal::Decimal>(),
512 )
513 {
514 order_book_snapshot.add_ask(price, quantity);
515 }
516 }
517
518 Ok(order_book_snapshot)
519 }
520}
521
522#[async_trait]
523impl Provider for BybitFuturesProvider {
524 type TradeMessage = MarketTradeResponse;
525 type DepthMessage = OrderbookResponse;
526 type InstrumentMessage = Value;
527
528 fn name(&self) -> &'static str {
529 "Bybit Futures"
530 }
531
532 fn venue(&self) -> Venue {
533 Venue::Bybit
534 }
535
536 fn config(&self) -> &ConnectionConfig {
537 &self.config
538 }
539
540 async fn init(&mut self) -> Result<()> {
541 self.connect().await
542 }
543
544 async fn subscribe_trades(
545 &self,
546 symbols: SmallVec<[String; 8]>,
547 ) -> Result<mpsc::Receiver<Self::TradeMessage>> {
548 self.connect().await?;
550
551 let (tx, rx) = mpsc::channel(1024);
553
554 let subscription = SubscriptionMessage::trades(symbols.to_vec());
556 let connection_id = format!("trade-{}", symbols.join(","));
557
558 let handle = self
560 .create_trade_connection(symbols, tx, subscription)
561 .await?;
562
563 self.ws_handles.write().insert(connection_id.into(), handle);
565
566 Ok(rx)
567 }
568
569 async fn unsubscribe_trades(&self) -> Result<()> {
570 let mut keys_to_remove = Vec::new();
572
573 for (key, _) in self.subscriptions.read().iter() {
574 if key.starts_with("trade-") {
575 keys_to_remove.push(key.clone());
576 }
577 }
578
579 for key in keys_to_remove {
581 if let Some(tx) = self.subscriptions.write().remove(&key) {
582 let _ = tx.send(true);
583 }
584
585 if let Some(handle) = self.ws_handles.write().remove(&key) {
587 handle.abort();
588 }
589 }
590
591 Ok(())
592 }
593
594 async fn subscribe_orderbook(
595 &self,
596 symbols: SmallVec<[String; 8]>,
597 ) -> Result<mpsc::Receiver<Self::DepthMessage>> {
598 self.connect().await?;
600
601 let (tx, rx) = mpsc::channel(1024);
603
604 let subscription = SubscriptionMessage::orderbook(symbols.to_vec());
606 let connection_id = format!("orderbook-{}", symbols.join(","));
607
608 let handle = self
610 .create_orderbook_connection(symbols, tx, subscription)
611 .await?;
612
613 self.ws_handles.write().insert(connection_id.into(), handle);
615
616 Ok(rx)
617 }
618
619 async fn unsubscribe_orderbook(&self) -> Result<()> {
620 let mut keys_to_remove = Vec::new();
622
623 for (key, _) in self.subscriptions.read().iter() {
624 if key.starts_with("orderbook-") {
625 keys_to_remove.push(key.clone());
626 }
627 }
628
629 for key in keys_to_remove {
631 if let Some(tx) = self.subscriptions.write().remove(&key) {
632 let _ = tx.send(true);
633 }
634
635 if let Some(handle) = self.ws_handles.write().remove(&key) {
637 handle.abort();
638 }
639 }
640
641 Ok(())
642 }
643
644 #[inline]
645 async fn get_instruments(&self) -> Result<Vec<Box<dyn Instrument>>> {
646 let url = format!(
648 "{}/v5/market/instruments-info?category=linear",
649 self.config.rest_config.base_url
650 );
651
652 let response = self
653 .http_client
654 .get(&url)
655 .send()
656 .await
657 .context("Failed to fetch Bybit instruments info")?;
658
659 if !response.status().is_success() {
661 return Err(anyhow!(
662 "Failed to fetch Bybit instruments info: HTTP {}",
663 response.status()
664 ));
665 }
666
667 let bytes = response
669 .bytes()
670 .await
671 .context("Failed to get response bytes")?;
672 let mut bytes_vec = bytes.to_vec();
673 let instruments_info: simd_json::OwnedValue = simd_json::from_slice(&mut bytes_vec)
674 .context("Failed to parse Bybit instruments info response")?;
675
676 let result = instruments_info
678 .get("result")
679 .ok_or_else(|| anyhow!("Invalid response format: missing result"))?;
680
681 let list = result
682 .get("list")
683 .and_then(|l| l.as_array())
684 .ok_or_else(|| anyhow!("Invalid response format: expected array of instruments"))?;
685
686 let mut instruments = Vec::with_capacity(list.len());
688
689 for instrument_data in list {
690 let status = instrument_data
692 .get("status")
693 .and_then(|s| s.as_str())
694 .unwrap_or("");
695 if status != "Trading" {
696 continue;
697 }
698
699 let symbol = instrument_data
700 .get("symbol")
701 .and_then(|s| s.as_str())
702 .ok_or_else(|| {
703 anyhow!("Invalid instrument data: 'symbol' field not found or not a String")
704 })?;
705
706 let instrument_id = InstrumentId::new(symbol, Venue::Bybit);
708 let instrument =
709 Box::new(BybitFuturesInstrument { id: instrument_id }) as Box<dyn Instrument>;
710 instruments.push(instrument);
711 }
712
713 Ok(instruments)
714 }
715
716 #[inline]
717 async fn get_historical_trades(
718 &self,
719 _symbol: &str,
720 _limit: Option<u32>,
721 ) -> Result<Vec<MarketTrade>> {
722 Err(anyhow!("Historical trades not implemented yet"))
725 }
726
727 async fn get_orderbook_snapshot(
728 &self,
729 symbol: &str,
730 depth: Option<u32>,
731 ) -> Result<OrderBookSnapshot> {
732 self.fetch_orderbook_snapshot(symbol, depth).await
733 }
734
735 #[inline]
736 async fn get_realtime_orderbook(&self, _symbol: &str) -> Result<SharedSimdOrderBook> {
737 Err(anyhow!("Real-time orderbook not implemented yet"))
739 }
740
741 #[inline]
742 async fn is_connected(&self) -> bool {
743 *self.connection_status.read() == ConnectionState::Connected
744 }
745
746 #[inline]
747 async fn connection_status(&self) -> ConnectionState {
748 *self.connection_status.read()
749 }
750
751 #[inline]
752 async fn get_stats(&self) -> ConnectionStats {
753 self.stats.read().clone()
754 }
755
756 #[inline]
757 async fn ping(&self) -> Result<u64> {
758 let start = self.clock.raw();
761
762 let response = self
763 .http_client
764 .get(format!(
765 "{}/v5/market/time",
766 self.config.rest_config.base_url
767 ))
768 .send()
769 .await?;
770
771 if response.status().is_success() {
772 Ok(self.clock.raw().saturating_sub(start))
773 } else {
774 Err(anyhow!("Ping failed with status: {}", response.status()))
775 }
776 }
777
778 #[inline]
779 async fn reset_connection(&self) -> Result<()> {
780 let mut keys_to_remove = Vec::new();
782
783 for (key, _) in self.subscriptions.read().iter() {
784 keys_to_remove.push(key.clone());
785 }
786
787 for key in &keys_to_remove {
789 if let Some(tx) = self.subscriptions.write().remove(key) {
790 let _ = tx.send(true);
791 }
792
793 if let Some(handle) = self.ws_handles.write().remove(key) {
795 handle.abort();
796 }
797 }
798
799 *self.connection_status.write() = ConnectionState::Disconnected;
801
802 *self.stats.write() = ConnectionStats::default();
804
805 self.connect().await
807 }
808
809 fn get_rate_limits(&self) -> Vec<RateLimit> {
810 BYBIT_FUTURES_RATE_LIMITS.to_vec()
811 }
812
813 async fn shutdown(&mut self) -> Result<()> {
814 let mut keys_to_remove = Vec::new();
816
817 for (key, _) in self.subscriptions.read().iter() {
818 keys_to_remove.push(key.clone());
819 }
820
821 for key in keys_to_remove {
823 if let Some(tx) = self.subscriptions.write().remove(&key) {
824 let _ = tx.send(true);
825 }
826
827 if let Some(handle) = self.ws_handles.write().remove(&key) {
829 handle.abort();
830 }
831 }
832
833 *self.connection_status.write() = ConnectionState::Disconnected;
835
836 Ok(())
837 }
838}
839
840#[derive(Debug)]
842struct BybitFuturesMessageHandler {
843 handler_type: HandlerType,
844 clock: Clock,
845 stats: Arc<RwLock<ConnectionStats>>,
846 trade_tx: Option<mpsc::Sender<MarketTradeResponse>>,
847 orderbook_tx: Option<mpsc::Sender<OrderbookResponse>>,
848 subscription: SubscriptionMessage,
849}
850
851#[derive(Debug)]
852enum HandlerType {
853 Trade,
854 Orderbook,
855}
856
857impl BybitFuturesMessageHandler {
858 const fn new_trade_handler(
859 clock: Clock,
860 stats: Arc<RwLock<ConnectionStats>>,
861 tx: mpsc::Sender<MarketTradeResponse>,
862 subscription: SubscriptionMessage,
863 ) -> Self {
864 Self {
865 handler_type: HandlerType::Trade,
866 clock,
867 stats,
868 trade_tx: Some(tx),
869 orderbook_tx: None,
870 subscription,
871 }
872 }
873
874 const fn new_orderbook_handler(
875 clock: Clock,
876 stats: Arc<RwLock<ConnectionStats>>,
877 tx: mpsc::Sender<OrderbookResponse>,
878 subscription: SubscriptionMessage,
879 ) -> Self {
880 Self {
881 handler_type: HandlerType::Orderbook,
882 clock,
883 stats,
884 trade_tx: None,
885 orderbook_tx: Some(tx),
886 subscription,
887 }
888 }
889}
890
891#[async_trait]
892impl rusty_common::websocket::MessageHandler for BybitFuturesMessageHandler {
893 async fn on_message(&mut self, message: Message) -> std::result::Result<(), WebSocketError> {
894 let local_time = self.clock.raw();
895
896 if let Some(text) = message.as_text() {
898 BybitFuturesProvider::update_receive_stats(self.stats.clone(), text.len(), local_time);
899
900 let mut message_bytes = text.as_bytes().to_vec();
902 let json_value: simd_json::OwnedValue = simd_json::from_slice(&mut message_bytes)
903 .map_err(|e| {
904 WebSocketError::MessageProcessingError(format!("Failed to parse JSON: {e}"))
905 })?;
906
907 if let Some(topic) = json_value["topic"].as_str() {
909 match self.handler_type {
910 HandlerType::Trade if topic.starts_with("publicTrade.") => {
911 if let Ok(trade_response) =
912 simd_json::from_slice::<MarketTradeResponse>(&mut message_bytes)
913 && let Some(ref tx) = self.trade_tx
914 && let Err(e) = tx.send(trade_response).await
915 {
916 log::warn!("Failed to send trade message: {e}");
917 }
918 }
919 HandlerType::Orderbook if topic.starts_with("orderbook.") => {
920 if let Ok(orderbook_response) =
921 simd_json::from_slice::<OrderbookResponse>(&mut message_bytes)
922 && let Some(ref tx) = self.orderbook_tx
923 && let Err(e) = tx.send(orderbook_response).await
924 {
925 log::warn!("Failed to send orderbook message: {e}");
926 }
927 }
928 _ => {
929 if let Some(op) = json_value["op"].as_str() {
931 match op {
932 "subscribe" => {
933 log::info!("Bybit Futures subscription confirmed: {text}");
934 }
935 "pong" => {
936 log::trace!("Bybit Futures pong received");
937 }
938 _ => {
939 log::debug!("Unhandled Bybit Futures message op: {op}");
940 }
941 }
942 }
943 }
944 }
945 }
946 }
947
948 Ok(())
949 }
950
951 async fn on_connected(&mut self) -> std::result::Result<(), WebSocketError> {
952 let subscription_json = simd_json::to_string(&self.subscription).map_err(|e| {
954 WebSocketError::MessageProcessingError(format!("Failed to serialize subscription: {e}"))
955 })?;
956
957 log::info!("Sending Bybit Futures subscription: {subscription_json}");
958
959 Ok(())
962 }
963
964 async fn on_disconnected(&mut self) -> std::result::Result<(), WebSocketError> {
965 log::info!("Disconnected from Bybit Futures WebSocket");
966 Ok(())
967 }
968
969 async fn on_error(&mut self, error: WebSocketError) -> std::result::Result<(), WebSocketError> {
970 log::error!("Bybit Futures WebSocket error: {error}");
971 Ok(())
972 }
973}