1use rusty_common::collections::FxHashMap;
2use smartstring::alias::String;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use super::data::{
7 orderbook::OrderbookResponse, subscription::SubscriptionRequest, trade::TradeResponse,
8};
9use super::types::{
10 BYBIT_API_URL, BYBIT_RATE_LIMITS, BYBIT_WS_URL_INVERSE, BYBIT_WS_URL_LINEAR,
11 BYBIT_WS_URL_OPTION, BYBIT_WS_URL_SPOT,
12};
13use anyhow::{Context, Result, anyhow};
14use async_trait::async_trait;
15use parking_lot::RwLock;
16use quanta::Clock;
17use reqwest::header::{HeaderMap, HeaderValue};
18use rusty_common::json::Value;
19use rusty_common::websocket::{Message, WebSocketClient, WebSocketConfig, WebSocketError};
20use rusty_model::{
21 data::{
22 book_snapshot::OrderBookSnapshot, market_trade::MarketTrade,
23 simd_orderbook::SharedSimdOrderBook,
24 },
25 instruments::{Instrument, InstrumentId},
26 venues::Venue,
27};
28use simd_json::prelude::{ValueAsArray, ValueAsObject, ValueAsScalar};
29use smallvec::SmallVec;
30use tokio::sync::{mpsc, watch};
31use tokio::task::JoinHandle;
32
33use crate::provider::prelude::*;
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum BybitCategory {
38 Spot,
40 Linear,
42 Inverse,
44 Option,
46}
47
48impl BybitCategory {
49 #[must_use]
51 pub const fn websocket_url(&self) -> &'static str {
52 match self {
53 Self::Spot => BYBIT_WS_URL_SPOT,
54 Self::Linear => BYBIT_WS_URL_LINEAR,
55 Self::Inverse => BYBIT_WS_URL_INVERSE,
56 Self::Option => BYBIT_WS_URL_OPTION,
57 }
58 }
59
60 #[must_use]
62 pub const fn as_str(&self) -> &'static str {
63 match self {
64 Self::Spot => "spot",
65 Self::Linear => "linear",
66 Self::Inverse => "inverse",
67 Self::Option => "option",
68 }
69 }
70}
71
72impl Default for BybitCategory {
73 fn default() -> Self {
74 Self::Spot
75 }
76}
77
78#[derive(Debug)]
84pub struct BybitProvider {
85 config: ConnectionConfig,
87
88 connection_status: Arc<RwLock<ConnectionState>>,
90
91 stats: Arc<RwLock<ConnectionStats>>,
93
94 clock: Clock,
96
97 subscriptions: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
99
100 ws_handles: Arc<RwLock<FxHashMap<String, JoinHandle<()>>>>,
102
103 last_connection_attempt: Arc<RwLock<Instant>>,
105
106 http_client: reqwest::Client,
108
109 category: BybitCategory,
111}
112
113impl Default for BybitProvider {
114 fn default() -> Self {
115 Self::new()
116 }
117}
118
119impl BybitProvider {
120 #[inline]
122 #[must_use]
123 pub const fn category(&self) -> BybitCategory {
124 self.category
125 }
126
127 #[inline]
132 #[must_use]
133 pub fn new() -> Self {
134 Self::new_with_category(BybitCategory::Spot)
135 }
136
137 #[inline]
139 #[must_use]
140 pub fn new_with_category(category: BybitCategory) -> Self {
141 Self::with_config(category, None)
142 }
143
144 #[inline]
146 #[must_use]
147 pub fn new_spot() -> Self {
148 Self::new_with_category(BybitCategory::Spot)
149 }
150
151 #[inline]
153 #[must_use]
154 pub fn new_linear() -> Self {
155 Self::new_with_category(BybitCategory::Linear)
156 }
157
158 #[inline]
160 #[must_use]
161 pub fn new_inverse() -> Self {
162 Self::new_with_category(BybitCategory::Inverse)
163 }
164
165 #[inline]
167 #[must_use]
168 pub fn new_option() -> Self {
169 Self::new_with_category(BybitCategory::Option)
170 }
171
172 #[inline]
174 #[must_use]
175 pub fn with_config(category: BybitCategory, config: Option<ConnectionConfig>) -> Self {
176 let mut default_config = ConnectionConfig::default();
177
178 default_config.websocket_config.base_url = category.websocket_url().into();
180 default_config.websocket_config.ping_interval_milliseconds = 20000; default_config.websocket_config.use_compression = false;
182
183 default_config.rest_config.base_url = BYBIT_API_URL.into();
185 default_config.rest_config.timeout_milliseconds = 5000; let config = config.unwrap_or(default_config);
189 let clock = config.clock.clone();
190
191 let mut headers = HeaderMap::new();
193 headers.insert(
194 "User-Agent",
195 HeaderValue::from_str(&config.rest_config.user_agent)
196 .unwrap_or_else(|_| HeaderValue::from_static("RustyHFT/1.0")),
197 );
198
199 let http_client = reqwest::Client::builder()
200 .timeout(Duration::from_millis(
201 config.rest_config.timeout_milliseconds,
202 ))
203 .connect_timeout(Duration::from_millis(
204 config.rest_config.timeout_milliseconds / 2,
205 ))
206 .pool_max_idle_per_host(config.rest_config.connection_pool_size)
207 .pool_idle_timeout(Duration::from_millis(
208 config.rest_config.keep_alive_milliseconds,
209 ))
210 .default_headers(headers)
211 .build()
212 .unwrap_or_default();
213
214 Self {
215 config,
216 connection_status: Arc::new(RwLock::new(ConnectionState::Disconnected)),
217 stats: Arc::new(RwLock::new(ConnectionStats::default())),
218 clock,
219 subscriptions: Arc::new(RwLock::new(FxHashMap::default())),
220 ws_handles: Arc::new(RwLock::new(FxHashMap::default())),
221 last_connection_attempt: Arc::new(RwLock::new(Instant::now())),
222 http_client,
223 category,
224 }
225 }
226
227 #[inline]
229 fn update_receive_stats(
230 stats: Arc<RwLock<ConnectionStats>>,
231 message_size: usize,
232 local_time: u64,
233 ) {
234 let mut s = stats.write();
235 s.messages_received += 1;
236 s.bytes_received += message_size as u64;
237 s.last_message_time = local_time;
238 }
239
240 fn create_websocket_config(
242 url: String,
243 use_compression: bool,
244 ping_interval_milliseconds: u64,
245 timeout_milliseconds: u64,
246 ) -> WebSocketConfig {
247 WebSocketConfig::builder(rusty_common::types::Exchange::Bybit, url.to_string())
248 .ping_interval(Duration::from_millis(ping_interval_milliseconds))
249 .timeout(Duration::from_millis(timeout_milliseconds))
250 .compression(if use_compression {
251 rusty_common::websocket::CompressionConfig::default()
252 } else {
253 rusty_common::websocket::CompressionConfig::disabled()
254 })
255 .build()
256 }
257
258 #[inline]
260 async fn connect(&self) -> Result<()> {
261 if *self.connection_status.read() == ConnectionState::Connected {
262 return Ok(());
263 }
264
265 {
267 let now = Instant::now();
268 let last_attempt = *self.last_connection_attempt.read();
269 let backoff_ms = self
270 .config
271 .websocket_config
272 .reconnect
273 .backoff_initial_milliseconds;
274
275 if now.duration_since(last_attempt) < Duration::from_millis(backoff_ms) {
276 tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
278 }
279
280 *self.last_connection_attempt.write() = Instant::now();
282 }
283
284 *self.connection_status.write() = ConnectionState::Connecting;
285
286 *self.connection_status.write() = ConnectionState::Connected;
289 self.stats.write().connected_time = self.clock.raw();
290 Ok(())
291 }
292}
293
294#[async_trait]
295impl Provider for BybitProvider {
296 type TradeMessage = TradeResponse;
297 type DepthMessage = OrderbookResponse;
298 type InstrumentMessage = Value;
299
300 fn name(&self) -> &'static str {
301 "Bybit"
302 }
303
304 fn venue(&self) -> Venue {
305 Venue::Bybit
306 }
307
308 fn config(&self) -> &ConnectionConfig {
309 &self.config
310 }
311
312 async fn init(&mut self) -> Result<()> {
313 self.connect().await
314 }
315
316 async fn shutdown(&mut self) -> Result<()> {
317 let mut keys_to_remove = Vec::new();
319
320 for (key, _) in self.subscriptions.read().iter() {
321 keys_to_remove.push(key.clone());
322 }
323
324 for key in keys_to_remove {
326 if let Some(tx) = self.subscriptions.write().remove(&key) {
327 let _ = tx.send(true);
328 }
329
330 if let Some(handle) = self.ws_handles.write().remove(&key) {
332 handle.abort();
333 }
334 }
335
336 *self.connection_status.write() = ConnectionState::Disconnected;
338
339 Ok(())
340 }
341
342 #[inline]
343 async fn subscribe_trades(
344 &self,
345 symbols: SmallVec<[String; 8]>,
346 ) -> Result<mpsc::Receiver<Self::TradeMessage>> {
347 self.connect().await?;
349
350 let (tx, rx) = mpsc::channel(1024);
352
353 let subscription = SubscriptionRequest::trades(symbols.to_vec());
355 let connection_id = format!("trade-{}", symbols.join(","));
356
357 let handle = self
359 .create_trade_connection(symbols, tx, subscription)
360 .await?;
361
362 self.ws_handles.write().insert(connection_id.into(), handle);
364
365 Ok(rx)
366 }
367
368 #[inline]
369 async fn unsubscribe_trades(&self) -> Result<()> {
370 let mut keys_to_remove = Vec::new();
372
373 for (key, _) in self.subscriptions.read().iter() {
374 if key.starts_with("trade-") {
375 keys_to_remove.push(key.clone());
376 }
377 }
378
379 for key in keys_to_remove {
381 if let Some(tx) = self.subscriptions.write().remove(&key) {
382 let _ = tx.send(true);
383 }
384
385 if let Some(handle) = self.ws_handles.write().remove(&key) {
387 handle.abort();
388 }
389 }
390
391 Ok(())
392 }
393
394 #[inline]
395 async fn subscribe_orderbook(
396 &self,
397 symbols: SmallVec<[String; 8]>,
398 ) -> Result<mpsc::Receiver<Self::DepthMessage>> {
399 self.connect().await?;
401
402 let (tx, rx) = mpsc::channel(1024);
404
405 let subscription = SubscriptionRequest::orderbook(symbols.to_vec(), 50);
407 let connection_id = format!("orderbook-{}", symbols.join(","));
408
409 let handle = self
411 .create_orderbook_connection(symbols, tx, subscription)
412 .await?;
413
414 self.ws_handles.write().insert(connection_id.into(), handle);
416
417 Ok(rx)
418 }
419
420 #[inline]
421 async fn unsubscribe_orderbook(&self) -> Result<()> {
422 let mut keys_to_remove = Vec::new();
424
425 for (key, _) in self.subscriptions.read().iter() {
426 if key.starts_with("orderbook-") {
427 keys_to_remove.push(key.clone());
428 }
429 }
430
431 for key in keys_to_remove {
433 if let Some(tx) = self.subscriptions.write().remove(&key) {
434 let _ = tx.send(true);
435 }
436
437 if let Some(handle) = self.ws_handles.write().remove(&key) {
439 handle.abort();
440 }
441 }
442
443 Ok(())
444 }
445
446 #[inline]
447 async fn get_instruments(&self) -> Result<Vec<Box<dyn Instrument>>> {
448 let url = format!(
450 "{}/v5/market/instruments-info?category={}",
451 self.config.rest_config.base_url,
452 self.category.as_str()
453 );
454
455 let response = self
456 .http_client
457 .get(&url)
458 .send()
459 .await
460 .context("Failed to fetch Bybit instruments info")?;
461
462 if !response.status().is_success() {
464 return Err(anyhow!(
465 "Failed to fetch Bybit instruments info: HTTP {}",
466 response.status()
467 ));
468 }
469
470 let bytes = response
472 .bytes()
473 .await
474 .context("Failed to get response bytes")?;
475 let mut bytes_vec = bytes.to_vec();
476 let instruments_info: simd_json::OwnedValue = simd_json::from_slice(&mut bytes_vec)
477 .context("Failed to parse Bybit instruments info response")?;
478
479 let result = instruments_info["result"].as_object().ok_or_else(|| {
481 anyhow!("Invalid response format: 'result' field not found or not an object")
482 })?;
483
484 let list = result["list"].as_array().ok_or_else(|| {
485 anyhow!("Invalid response format: 'list' field not found or not an array")
486 })?;
487
488 let mut instruments = Vec::with_capacity(list.len());
490
491 for instrument_data in list {
492 let status = instrument_data["status"].as_str().unwrap_or("");
494 if status != "Trading" {
495 continue;
496 }
497
498 let symbol = instrument_data["symbol"].as_str().ok_or_else(|| {
499 anyhow!("Invalid instrument data: 'symbol' field not found or not a String")
500 })?;
501
502 let instrument_id = InstrumentId::new(symbol, Venue::Bybit);
504 let instrument = Box::new(BybitInstrument { id: instrument_id }) as Box<dyn Instrument>;
505 instruments.push(instrument);
506 }
507
508 Ok(instruments)
509 }
510
511 #[inline]
512 async fn get_historical_trades(
513 &self,
514 _symbol: &str,
515 _limit: Option<u32>,
516 ) -> Result<Vec<MarketTrade>> {
517 Err(anyhow!("Historical trades not implemented yet"))
520 }
521
522 #[inline]
523 async fn get_orderbook_snapshot(
524 &self,
525 symbol: &str,
526 depth: Option<u32>,
527 ) -> Result<OrderBookSnapshot> {
528 let limit = depth.unwrap_or(50);
529 let url = format!(
530 "{}/v5/market/orderbook?category={}&symbol={}&limit={}",
531 self.config.rest_config.base_url,
532 self.category.as_str(),
533 symbol,
534 limit
535 );
536
537 let response = self.http_client.get(&url).send().await?;
538
539 if !response.status().is_success() {
540 let error_text = response.text().await?;
541 return Err(anyhow!(
542 "Failed to fetch orderbook snapshot: {}",
543 error_text
544 ));
545 }
546
547 let bytes = response
549 .bytes()
550 .await
551 .map_err(|e| anyhow!("Failed to get response bytes: {}", e))?;
552 let mut bytes_vec = bytes.to_vec();
553 let snapshot_data: simd_json::OwnedValue = simd_json::from_slice(&mut bytes_vec)
554 .map_err(|e| anyhow!("Failed to parse JSON: {}", e))?;
555
556 let timestamp = self.clock.raw();
558 let instrument_id = InstrumentId::new(symbol, Venue::Bybit);
559
560 let result = snapshot_data["result"]
561 .as_object()
562 .ok_or_else(|| anyhow!("Invalid response format: 'result' field not found"))?;
563
564 let update_id = result["u"].as_u64().unwrap_or(0);
565 let mut order_book_snapshot =
566 OrderBookSnapshot::new_empty(instrument_id, timestamp, update_id);
567
568 if let Some(bids) = result["b"].as_array() {
570 for bid in bids {
571 if let Some(bid_array) = bid.as_array()
572 && bid_array.len() >= 2
573 && let (Some(price_str), Some(quantity_str)) =
574 (bid_array[0].as_str(), bid_array[1].as_str())
575 && let (Ok(price), Ok(quantity)) = (
576 price_str.parse::<rust_decimal::Decimal>(),
577 quantity_str.parse::<rust_decimal::Decimal>(),
578 )
579 {
580 order_book_snapshot.add_bid(price, quantity);
581 }
582 }
583 }
584
585 if let Some(asks) = result["a"].as_array() {
587 for ask in asks {
588 if let Some(ask_array) = ask.as_array()
589 && ask_array.len() >= 2
590 && let (Some(price_str), Some(quantity_str)) =
591 (ask_array[0].as_str(), ask_array[1].as_str())
592 && let (Ok(price), Ok(quantity)) = (
593 price_str.parse::<rust_decimal::Decimal>(),
594 quantity_str.parse::<rust_decimal::Decimal>(),
595 )
596 {
597 order_book_snapshot.add_ask(price, quantity);
598 }
599 }
600 }
601
602 Ok(order_book_snapshot)
603 }
604
605 #[inline]
606 async fn get_realtime_orderbook(&self, _symbol: &str) -> Result<SharedSimdOrderBook> {
607 Err(anyhow!("Real-time orderbook not implemented yet"))
609 }
610
611 #[inline]
612 async fn is_connected(&self) -> bool {
613 *self.connection_status.read() == ConnectionState::Connected
614 }
615
616 #[inline]
617 async fn connection_status(&self) -> ConnectionState {
618 *self.connection_status.read()
619 }
620
621 #[inline]
622 async fn get_stats(&self) -> ConnectionStats {
623 self.stats.read().clone()
624 }
625
626 #[inline]
627 async fn ping(&self) -> Result<u64> {
628 let start = self.clock.raw();
631
632 let response = self
633 .http_client
634 .get(format!(
635 "{}/v5/market/time",
636 self.config.rest_config.base_url
637 ))
638 .send()
639 .await?;
640
641 if response.status().is_success() {
642 Ok(self.clock.raw().saturating_sub(start))
643 } else {
644 Err(anyhow!("Ping failed with status: {}", response.status()))
645 }
646 }
647
648 #[inline]
649 async fn reset_connection(&self) -> Result<()> {
650 let mut keys_to_remove = Vec::new();
652
653 for (key, _) in self.subscriptions.read().iter() {
654 keys_to_remove.push(key.clone());
655 }
656
657 for key in &keys_to_remove {
659 if let Some(tx) = self.subscriptions.write().remove(key) {
660 let _ = tx.send(true);
661 }
662
663 if let Some(handle) = self.ws_handles.write().remove(key) {
665 handle.abort();
666 }
667 }
668
669 *self.connection_status.write() = ConnectionState::Disconnected;
671
672 *self.stats.write() = ConnectionStats::default();
674
675 self.connect().await
677 }
678
679 #[inline]
680 fn get_rate_limits(&self) -> Vec<RateLimit> {
681 BYBIT_RATE_LIMITS.to_vec()
682 }
683}
684
685#[derive(Debug, Clone)]
687pub struct BybitInstrument {
688 pub id: InstrumentId,
690}
691
692impl Instrument for BybitInstrument {
693 fn id(&self) -> InstrumentId {
694 self.id.clone()
695 }
696
697 fn symbol(&self) -> String {
698 self.id.symbol.clone()
699 }
700
701 fn venue(&self) -> Venue {
702 self.id.venue
703 }
704
705 fn as_any(&self) -> &dyn std::any::Any {
706 self
707 }
708
709 fn clone_box(&self) -> Box<dyn Instrument> {
710 Box::new(self.clone())
711 }
712}
713
714impl BybitProvider {
715 #[inline]
717 async fn create_trade_connection(
718 &self,
719 symbols: SmallVec<[String; 8]>,
720 tx: mpsc::Sender<TradeResponse>,
721 subscription: SubscriptionRequest,
722 ) -> Result<JoinHandle<()>> {
723 let connection_id = format!("trade-{}", symbols.join(","));
724
725 let (stop_tx, stop_rx) = watch::channel(false);
727 self.subscriptions
728 .write()
729 .insert(connection_id.clone().into(), stop_tx);
730
731 let url = self.config.websocket_config.base_url.clone();
733 let clock = self.clock.clone();
734 let timeout_milliseconds = self.config.websocket_config.timeout_milliseconds;
735 let ping_interval_milliseconds = self.config.websocket_config.ping_interval_milliseconds;
736 let stats = self.stats.clone();
737 let connection_status = self.connection_status.clone();
738 let use_compression = self.config.websocket_config.use_compression;
739
740 let handle = tokio::spawn(async move {
742 loop {
743 if *stop_rx.borrow() {
745 break;
746 }
747
748 let ws_config = Self::create_websocket_config(
750 url.clone(),
751 use_compression,
752 ping_interval_milliseconds,
753 timeout_milliseconds,
754 );
755
756 let mut client = WebSocketClient::new(ws_config);
758
759 *connection_status.write() = ConnectionState::Connecting;
761
762 let handler = BybitMessageHandler::new_trade_handler(
764 clock.clone(),
765 stats.clone(),
766 tx.clone(),
767 subscription.clone(),
768 );
769
770 if let Err(e) = client.run(handler).await {
772 log::error!("WebSocket client error: {e}");
773 *connection_status.write() = ConnectionState::Error;
774 }
775
776 if *stop_rx.borrow() {
778 break;
779 }
780
781 tokio::time::sleep(Duration::from_millis(1000)).await;
783 }
784 });
785
786 Ok(handle)
787 }
788
789 #[inline]
791 async fn create_orderbook_connection(
792 &self,
793 symbols: SmallVec<[String; 8]>,
794 tx: mpsc::Sender<OrderbookResponse>,
795 subscription: SubscriptionRequest,
796 ) -> Result<JoinHandle<()>> {
797 let connection_id = format!("orderbook-{}", symbols.join(","));
798
799 let (stop_tx, stop_rx) = watch::channel(false);
801 self.subscriptions
802 .write()
803 .insert(connection_id.clone().into(), stop_tx);
804
805 let url = self.config.websocket_config.base_url.clone();
807 let clock = self.clock.clone();
808 let timeout_milliseconds = self.config.websocket_config.timeout_milliseconds;
809 let ping_interval_milliseconds = self.config.websocket_config.ping_interval_milliseconds;
810 let stats = self.stats.clone();
811 let connection_status = self.connection_status.clone();
812 let use_compression = self.config.websocket_config.use_compression;
813
814 let handle = tokio::spawn(async move {
816 loop {
817 if *stop_rx.borrow() {
819 break;
820 }
821
822 let ws_config = Self::create_websocket_config(
824 url.clone(),
825 use_compression,
826 ping_interval_milliseconds,
827 timeout_milliseconds,
828 );
829
830 let mut client = WebSocketClient::new(ws_config);
832
833 *connection_status.write() = ConnectionState::Connecting;
835
836 let handler = BybitMessageHandler::new_orderbook_handler(
838 clock.clone(),
839 stats.clone(),
840 tx.clone(),
841 subscription.clone(),
842 );
843
844 if let Err(e) = client.run(handler).await {
846 log::error!("WebSocket client error: {e}");
847 *connection_status.write() = ConnectionState::Error;
848 }
849
850 if *stop_rx.borrow() {
852 break;
853 }
854
855 tokio::time::sleep(Duration::from_millis(1000)).await;
857 }
858 });
859
860 Ok(handle)
861 }
862}
863
864#[derive(Debug)]
866struct BybitMessageHandler {
867 handler_type: HandlerType,
868 clock: Clock,
869 stats: Arc<RwLock<ConnectionStats>>,
870 trade_tx: Option<mpsc::Sender<TradeResponse>>,
871 orderbook_tx: Option<mpsc::Sender<OrderbookResponse>>,
872 subscription: SubscriptionRequest,
873 sender: Option<mpsc::UnboundedSender<Message>>,
874}
875
876#[derive(Debug)]
877enum HandlerType {
878 Trade,
879 Orderbook,
880}
881
882impl BybitMessageHandler {
883 const fn new_trade_handler(
884 clock: Clock,
885 stats: Arc<RwLock<ConnectionStats>>,
886 tx: mpsc::Sender<TradeResponse>,
887 subscription: SubscriptionRequest,
888 ) -> Self {
889 Self {
890 handler_type: HandlerType::Trade,
891 clock,
892 stats,
893 trade_tx: Some(tx),
894 orderbook_tx: None,
895 subscription,
896 sender: None,
897 }
898 }
899
900 const fn new_orderbook_handler(
901 clock: Clock,
902 stats: Arc<RwLock<ConnectionStats>>,
903 tx: mpsc::Sender<OrderbookResponse>,
904 subscription: SubscriptionRequest,
905 ) -> Self {
906 Self {
907 handler_type: HandlerType::Orderbook,
908 clock,
909 stats,
910 trade_tx: None,
911 orderbook_tx: Some(tx),
912 subscription,
913 sender: None,
914 }
915 }
916}
917
918#[async_trait]
919impl rusty_common::websocket::MessageHandler for BybitMessageHandler {
920 async fn on_message(&mut self, message: Message) -> std::result::Result<(), WebSocketError> {
921 let local_time = self.clock.raw();
922
923 if let Some(text) = message.as_text() {
925 BybitProvider::update_receive_stats(self.stats.clone(), text.len(), local_time);
926
927 let should_process = match self.handler_type {
931 HandlerType::Trade => text.contains("\"topic\":\"publicTrade"),
932 HandlerType::Orderbook => text.contains("\"topic\":\"orderbook"),
933 };
934
935 if should_process {
936 const STACK_BUFFER_SIZE: usize = 8192;
939
940 if text.len() <= STACK_BUFFER_SIZE {
941 let mut stack_buffer = [0u8; STACK_BUFFER_SIZE];
943 let bytes = text.as_bytes();
944 stack_buffer[..bytes.len()].copy_from_slice(bytes);
945 let buffer_slice = &mut stack_buffer[..bytes.len()];
946
947 match self.handler_type {
948 HandlerType::Trade => {
949 if let Ok(trade_response) =
950 simd_json::from_slice::<TradeResponse>(buffer_slice)
951 && let Some(ref tx) = self.trade_tx
952 && let Err(e) = tx.send(trade_response).await
953 {
954 log::warn!("Failed to send trade message: {e}");
955 }
956 }
957 HandlerType::Orderbook => {
958 if let Ok(orderbook_response) =
959 simd_json::from_slice::<OrderbookResponse>(buffer_slice)
960 && let Some(ref tx) = self.orderbook_tx
961 && let Err(e) = tx.send(orderbook_response).await
962 {
963 log::warn!("Failed to send orderbook message: {e}");
964 }
965 }
966 }
967 } else {
968 let mut message_bytes = text.as_bytes().to_vec();
970
971 match self.handler_type {
972 HandlerType::Trade => {
973 if let Ok(trade_response) =
974 simd_json::from_slice::<TradeResponse>(&mut message_bytes)
975 && let Some(ref tx) = self.trade_tx
976 && let Err(e) = tx.send(trade_response).await
977 {
978 log::warn!("Failed to send trade message: {e}");
979 }
980 }
981 HandlerType::Orderbook => {
982 if let Ok(orderbook_response) =
983 simd_json::from_slice::<OrderbookResponse>(&mut message_bytes)
984 && let Some(ref tx) = self.orderbook_tx
985 && let Err(e) = tx.send(orderbook_response).await
986 {
987 log::warn!("Failed to send orderbook message: {e}");
988 }
989 }
990 }
991 }
992 }
993 }
994
995 Ok(())
996 }
997
998 async fn on_connected(&mut self) -> std::result::Result<(), WebSocketError> {
999 let subscription_json = simd_json::to_string(&self.subscription).map_err(|e| {
1001 WebSocketError::MessageProcessingError(format!("Failed to serialize subscription: {e}"))
1002 })?;
1003
1004 log::info!("Sending subscription: {subscription_json}");
1005
1006 let message = Message::Text(subscription_json.into());
1008 self.send_message(message)?;
1009
1010 Ok(())
1011 }
1012
1013 async fn on_disconnected(&mut self) -> std::result::Result<(), WebSocketError> {
1014 log::info!("Disconnected from Bybit WebSocket");
1015 Ok(())
1016 }
1017
1018 async fn on_error(&mut self, error: WebSocketError) -> std::result::Result<(), WebSocketError> {
1019 log::error!("Bybit WebSocket error: {error}");
1020 Ok(())
1021 }
1022
1023 fn set_sender(&mut self, sender: mpsc::UnboundedSender<Message>) {
1024 self.sender = Some(sender);
1025 }
1026
1027 fn send_message(&self, message: Message) -> std::result::Result<(), WebSocketError> {
1028 match &self.sender {
1029 Some(sender) => {
1030 sender
1031 .send(message)
1032 .map_err(|_| WebSocketError::NotConnected)?;
1033 Ok(())
1034 }
1035 None => Err(WebSocketError::NotConnected),
1036 }
1037 }
1038}