rusty_feeder/exchange/binance/spot/
provider.rs

1//! Binance Spot market provider implementation
2//!
3//! This module implements the Provider trait for Binance Spot markets,
4//! enabling high-performance data acquisition and normalization.
5
6use rusty_common::collections::FxHashMap;
7use simd_json::prelude::{ValueAsArray, ValueAsScalar};
8use smartstring::alias::String;
9use std::fmt::Debug;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use super::message_handler::BinanceSpotMessageHandler;
14use super::message_handler_zerocopy::BinanceSpotZeroCopyMessageHandler;
15use anyhow::{Context, Result, anyhow};
16use async_trait::async_trait;
17use parking_lot::RwLock;
18use quanta::Clock;
19use reqwest::header::{HeaderMap, HeaderValue};
20use rusty_common::json::Value;
21use rusty_common::websocket::{WebSocketClient, WebSocketConfig};
22use rusty_model::{
23    data::{
24        book_snapshot::OrderBookSnapshot, market_trade::MarketTrade,
25        simd_orderbook::SharedSimdOrderBook,
26    },
27    instruments::{Instrument, InstrumentId},
28    venues::Venue,
29};
30use smallvec::SmallVec;
31use tokio::sync::{mpsc, watch};
32use tokio::task::JoinHandle;
33
34use crate::provider::prelude::*;
35
36use super::data::{
37    orderbook::{OrderbookMessage, OrderbookSnapshot, ParsedOrderbookSnapshot},
38    subscription::{create_orderbook_subscription, create_trade_subscription},
39    trade::TradeMessage,
40};
41use super::types::{
42    BINANCE_SPOT_API_URL, BINANCE_SPOT_RATE_LIMITS, BINANCE_SPOT_WS_COMBINED_URL,
43    BINANCE_SPOT_WS_URL,
44};
45
46use crate::exchange::binance::common::constants::DEFAULT_PING_INTERVAL_MS;
47use crate::exchange::binance::instrument::BinanceInstrument;
48
49/// Binance Spot exchange provider implementation
50#[derive(Debug)]
51pub struct BinanceSpotProvider {
52    /// Connection configuration
53    config: ConnectionConfig,
54
55    /// WebSocket connection status
56    connection_status: Arc<RwLock<ConnectionState>>,
57
58    /// Connection statistics
59    stats: Arc<RwLock<ConnectionStats>>,
60
61    /// Shared clock for time synchronization
62    clock: Clock,
63
64    /// Active subscriptions
65    subscriptions: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
66
67    /// Cached instruments
68    _instruments: Arc<RwLock<FxHashMap<String, Box<dyn Instrument>>>>,
69
70    /// Task handles for WebSocket connections
71    ws_handles: Arc<RwLock<FxHashMap<String, JoinHandle<()>>>>,
72
73    /// Last connection attempt timestamp
74    last_connection_attempt: Arc<RwLock<Instant>>,
75
76    /// HTTP client for REST API requests
77    http_client: reqwest::Client,
78}
79
80impl Default for BinanceSpotProvider {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86impl BinanceSpotProvider {
87    /// Create a new Binance Spot provider with default configuration
88    #[inline]
89    #[must_use]
90    pub fn new() -> Self {
91        Self::with_config(None)
92    }
93
94    /// Create a new Binance Spot provider with custom configuration
95    #[inline]
96    #[must_use]
97    pub fn with_config(config: Option<ConnectionConfig>) -> Self {
98        let mut default_config = ConnectionConfig::default();
99
100        // Set default WebSocket configuration
101        default_config.websocket_config.base_url = BINANCE_SPOT_WS_URL.into();
102        default_config.websocket_config.ping_interval_milliseconds = DEFAULT_PING_INTERVAL_MS; // 30 seconds
103        default_config.websocket_config.use_compression = false;
104
105        // Set default REST API configuration
106        default_config.rest_config.base_url = BINANCE_SPOT_API_URL.into();
107        default_config.rest_config.timeout_milliseconds = 5000; // 5 seconds
108
109        // Use custom config if provided
110        let config = config.unwrap_or(default_config);
111        let clock = config.clock.clone();
112
113        // Create HTTP client with appropriate settings
114        let mut headers = HeaderMap::new();
115        headers.insert(
116            "User-Agent",
117            HeaderValue::from_str(&config.rest_config.user_agent)
118                .unwrap_or_else(|_| HeaderValue::from_static("RustyHFT/1.0")),
119        );
120
121        let http_client = reqwest::Client::builder()
122            .timeout(Duration::from_millis(
123                config.rest_config.timeout_milliseconds,
124            ))
125            .connect_timeout(Duration::from_millis(
126                config.rest_config.timeout_milliseconds / 2,
127            ))
128            .pool_max_idle_per_host(config.rest_config.connection_pool_size)
129            .pool_idle_timeout(Duration::from_millis(
130                config.rest_config.keep_alive_milliseconds,
131            ))
132            .default_headers(headers)
133            .build()
134            .unwrap_or_default();
135
136        Self {
137            config,
138            connection_status: Arc::new(RwLock::new(ConnectionState::Disconnected)),
139            stats: Arc::new(RwLock::new(ConnectionStats::default())),
140            clock,
141            subscriptions: Arc::new(RwLock::new(FxHashMap::default())),
142            _instruments: Arc::new(RwLock::new(FxHashMap::default())),
143            ws_handles: Arc::new(RwLock::new(FxHashMap::default())),
144            last_connection_attempt: Arc::new(RwLock::new(Instant::now())),
145            http_client,
146        }
147    }
148
149    /// Helper function to update connection statistics upon message reception
150    #[inline]
151    fn update_receive_stats(
152        stats: Arc<RwLock<ConnectionStats>>,
153        message_size: usize,
154        local_time: u64,
155    ) {
156        let mut s = stats.write();
157        s.messages_received += 1;
158        s.bytes_received += message_size as u64;
159        s.last_message_time = local_time;
160    }
161
162    /// Helper function to create WebSocket request with proper headers
163    fn create_websocket_config(
164        url: String,
165        use_compression: bool,
166        ping_interval_milliseconds: u64,
167        timeout_milliseconds: u64,
168    ) -> WebSocketConfig {
169        // Binance does NOT support WebSocket compression extensions
170        // Adding compression headers can cause connection issues
171        // Their streams are already optimized and binary messages are handled efficiently
172        if use_compression {
173            log::debug!(
174                "Binance WebSocket does not support compression extensions, ignoring compression setting"
175            );
176        }
177
178        WebSocketConfig::builder(rusty_common::types::Exchange::Binance, url.to_string())
179            .ping_interval(Duration::from_millis(ping_interval_milliseconds))
180            .timeout(Duration::from_millis(timeout_milliseconds))
181            .compression(rusty_common::websocket::CompressionConfig::disabled())
182            .build()
183    }
184
185    /// Connect to the WebSocket API
186    #[inline]
187    async fn connect(&self) -> Result<()> {
188        if *self.connection_status.read() == ConnectionState::Connected {
189            return Ok(());
190        }
191
192        // Check if we need to respect reconnection backoff
193        {
194            let now = Instant::now();
195            let last_attempt = *self.last_connection_attempt.read();
196            let backoff_ms = self
197                .config
198                .websocket_config
199                .reconnect
200                .backoff_initial_milliseconds;
201
202            if now.duration_since(last_attempt) < Duration::from_millis(backoff_ms) {
203                // Wait until backoff period is over
204                tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
205            }
206
207            // Update last attempt time
208            *self.last_connection_attempt.write() = Instant::now();
209        }
210
211        *self.connection_status.write() = ConnectionState::Connecting;
212
213        // Note: Connection is now handled by individual trade/orderbook connections
214        // using WebSocketClient pattern. This method is kept for compatibility.
215        *self.connection_status.write() = ConnectionState::Connected;
216        self.stats.write().connected_time = self.clock.raw();
217        Ok(())
218    }
219
220    /// Create a WebSocket connection for trade data
221    #[inline]
222    async fn create_trade_connection(
223        &self,
224        symbols: SmallVec<[String; 8]>,
225        tx: mpsc::Sender<TradeMessage>,
226    ) -> Result<JoinHandle<()>> {
227        // For multiple symbols, use combined streams
228        let is_combined = symbols.len() > 1;
229
230        let url = if is_combined {
231            BINANCE_SPOT_WS_COMBINED_URL.to_string()
232        } else {
233            // For single symbol, use direct stream URL that doesn't require subscription
234            if symbols.len() == 1 {
235                let symbol_lower = symbols[0].to_lowercase();
236                format!("wss://stream.binance.com:9443/ws/{symbol_lower}@trade")
237            } else {
238                BINANCE_SPOT_WS_URL.to_string()
239            }
240        };
241
242        let connection_id = format!("trade-{}", symbols.join(","));
243
244        // Create subscription message
245        let subscription =
246            create_trade_subscription(&connection_id, symbols.clone(), None, None, false);
247
248        // Create stop signal
249        let (stop_tx, stop_rx) = watch::channel(false);
250        self.subscriptions
251            .write()
252            .insert(connection_id.clone().into(), stop_tx);
253
254        // Clone parameters for the task
255        let clock = self.clock.clone();
256        let timeout_milliseconds = self.config.websocket_config.timeout_milliseconds;
257        let ping_interval_milliseconds = self.config.websocket_config.ping_interval_milliseconds;
258        let stats = self.stats.clone();
259        let connection_status = self.connection_status.clone();
260        let use_compression = self.config.websocket_config.use_compression;
261        let use_zero_copy = self.config.websocket_config.use_zero_copy_parsing;
262
263        // Spawn a task for the WebSocket connection
264        let handle = tokio::spawn(async move {
265            loop {
266                // Check for stop signal
267                if *stop_rx.borrow() {
268                    break;
269                }
270
271                // Create WebSocket configuration
272                let ws_config = Self::create_websocket_config(
273                    url.clone().into(),
274                    use_compression,
275                    ping_interval_milliseconds,
276                    timeout_milliseconds,
277                );
278
279                // Create WebSocket client
280                let mut client = WebSocketClient::new(ws_config);
281
282                // Update connection status
283                *connection_status.write() = ConnectionState::Connecting;
284
285                // Run the WebSocket client with appropriate handler
286                let result = if use_zero_copy {
287                    let handler = BinanceSpotZeroCopyMessageHandler::new_trade_handler(
288                        clock.clone(),
289                        stats.clone(),
290                        tx.clone(),
291                        is_combined,
292                        None, // We'll handle subscription differently
293                    );
294                    client.run(handler).await
295                } else {
296                    let handler = BinanceSpotMessageHandler::new_trade_handler(
297                        clock.clone(),
298                        stats.clone(),
299                        tx.clone(),
300                        is_combined,
301                        None, // We'll handle subscription differently
302                    );
303                    client.run(handler).await
304                };
305
306                if let Err(e) = result {
307                    log::error!("WebSocket client error: {e}");
308                    *connection_status.write() = ConnectionState::Error;
309                }
310
311                // Check for stop signal before reconnecting
312                if *stop_rx.borrow() {
313                    break;
314                }
315
316                // Reconnect after a delay
317                tokio::time::sleep(Duration::from_millis(1000)).await;
318            }
319        });
320
321        Ok(handle)
322    }
323
324    /// Create a WebSocket connection for orderbook data
325    #[inline]
326    async fn create_orderbook_connection(
327        &self,
328        symbols: SmallVec<[String; 8]>,
329        tx: mpsc::Sender<OrderbookMessage>,
330    ) -> Result<JoinHandle<()>> {
331        // For multiple symbols, use combined streams
332        let is_combined = symbols.len() > 1;
333
334        let url = if is_combined {
335            BINANCE_SPOT_WS_COMBINED_URL.to_string()
336        } else {
337            BINANCE_SPOT_WS_URL.to_string()
338        };
339
340        let connection_id = format!("orderbook:{}", symbols.join(","));
341
342        // Create subscription message
343        let subscription = create_orderbook_subscription(
344            &connection_id,
345            symbols.clone(),
346            None,
347            None,
348            None,
349            None,
350            false,
351        );
352
353        // Create stop signal
354        let (stop_tx, stop_rx) = watch::channel(false);
355        self.subscriptions
356            .write()
357            .insert(connection_id.clone().into(), stop_tx);
358
359        // Clone parameters for the task
360        let clock = self.clock.clone();
361        let timeout_milliseconds = self.config.websocket_config.timeout_milliseconds;
362        let ping_interval_milliseconds = self.config.websocket_config.ping_interval_milliseconds;
363        let stats = self.stats.clone();
364        let connection_status = self.connection_status.clone();
365        let use_compression = self.config.websocket_config.use_compression;
366        let use_zero_copy = self.config.websocket_config.use_zero_copy_parsing;
367
368        // Spawn a task for the WebSocket connection
369        let handle = tokio::spawn(async move {
370            loop {
371                // Check for stop signal
372                if *stop_rx.borrow() {
373                    break;
374                }
375
376                // Create WebSocket configuration
377                let ws_config = Self::create_websocket_config(
378                    url.clone().into(),
379                    use_compression,
380                    ping_interval_milliseconds,
381                    timeout_milliseconds,
382                );
383
384                // Create WebSocket client
385                let mut client = WebSocketClient::new(ws_config);
386
387                // Update connection status
388                *connection_status.write() = ConnectionState::Connecting;
389
390                // Run the WebSocket client with appropriate handler
391                let result = if use_zero_copy {
392                    let handler = BinanceSpotZeroCopyMessageHandler::new_orderbook_handler(
393                        clock.clone(),
394                        stats.clone(),
395                        tx.clone(),
396                        is_combined,
397                        None, // We'll handle subscription differently
398                    );
399                    client.run(handler).await
400                } else {
401                    let handler = BinanceSpotMessageHandler::new_orderbook_handler(
402                        clock.clone(),
403                        stats.clone(),
404                        tx.clone(),
405                        is_combined,
406                        None, // We'll handle subscription differently
407                    );
408                    client.run(handler).await
409                };
410
411                if let Err(e) = result {
412                    log::error!("WebSocket client error: {e}");
413                    *connection_status.write() = ConnectionState::Error;
414                }
415
416                // Check for stop signal before reconnecting
417                if *stop_rx.borrow() {
418                    break;
419                }
420
421                // Reconnect after a delay
422                tokio::time::sleep(Duration::from_millis(1000)).await;
423            }
424        });
425
426        Ok(handle)
427    }
428
429    /// Fetch orderbook snapshot for a symbol
430    async fn fetch_orderbook_snapshot(
431        &self,
432        symbol: &str,
433        limit: Option<u32>,
434    ) -> Result<OrderbookSnapshot> {
435        let limit = limit.unwrap_or(100);
436        let url = format!(
437            "{}/api/v3/depth?symbol={}&limit={}",
438            self.config.rest_config.base_url, symbol, limit
439        );
440
441        let response = self.http_client.get(&url).send().await?;
442
443        if !response.status().is_success() {
444            let error_text = response.text().await?;
445            return Err(anyhow!(
446                "Failed to fetch orderbook snapshot: {}",
447                error_text
448            ));
449        }
450
451        // Parse the response using simd_json
452        let bytes = response
453            .bytes()
454            .await
455            .map_err(|e| anyhow!("Failed to get response bytes: {}", e))?;
456        let mut bytes_vec = bytes.to_vec();
457        let snapshot = simd_json::from_slice::<OrderbookSnapshot>(&mut bytes_vec)
458            .map_err(|e| anyhow!("Failed to parse JSON: {}", e))?;
459        Ok(snapshot)
460    }
461}
462
463#[async_trait]
464impl Provider for BinanceSpotProvider {
465    type TradeMessage = TradeMessage;
466    type DepthMessage = OrderbookMessage;
467    type InstrumentMessage = Value;
468
469    fn name(&self) -> &'static str {
470        "BinanceSpot"
471    }
472
473    fn venue(&self) -> rusty_model::venues::Venue {
474        rusty_model::venues::Venue::Binance
475    }
476
477    fn config(&self) -> &ConnectionConfig {
478        &self.config
479    }
480
481    async fn init(&mut self) -> Result<()> {
482        self.connect().await
483    }
484
485    async fn shutdown(&mut self) -> Result<()> {
486        // Unsubscribe from all active subscriptions
487        let mut keys_to_remove = Vec::new();
488
489        for (key, _) in self.subscriptions.read().iter() {
490            keys_to_remove.push(key.clone());
491        }
492
493        // Send stop signal for each subscription
494        for key in keys_to_remove {
495            if let Some(tx) = self.subscriptions.write().remove(&key) {
496                let _ = tx.send(true);
497            }
498
499            // Abort task
500            if let Some(handle) = self.ws_handles.write().remove(&key) {
501                handle.abort();
502            }
503        }
504
505        // Set connection status to disconnected
506        *self.connection_status.write() = ConnectionState::Disconnected;
507
508        Ok(())
509    }
510
511    #[inline]
512    async fn subscribe_trades(
513        &self,
514        symbols: SmallVec<[String; 8]>,
515    ) -> Result<mpsc::Receiver<Self::TradeMessage>> {
516        // Ensure connection is established
517        self.connect().await?;
518
519        // Create channel for trade messages
520        let (tx, rx) = mpsc::channel(1024);
521
522        // Create WebSocket connection
523        let handle = self.create_trade_connection(symbols.clone(), tx).await?;
524
525        // Save task handle
526        let connection_id = format!("trade-{}", symbols.join(","));
527        self.ws_handles.write().insert(connection_id.into(), handle);
528
529        Ok(rx)
530    }
531
532    #[inline]
533    async fn unsubscribe_trades(&self) -> Result<()> {
534        // Find all trade subscriptions
535        let mut keys_to_remove = Vec::new();
536
537        for (key, _) in self.subscriptions.read().iter() {
538            if key.starts_with("trade:") {
539                keys_to_remove.push(key.clone());
540            }
541        }
542
543        // Send stop signal for each subscription
544        for key in keys_to_remove {
545            if let Some(tx) = self.subscriptions.write().remove(&key) {
546                let _ = tx.send(true);
547            }
548
549            // Abort task
550            if let Some(handle) = self.ws_handles.write().remove(&key) {
551                handle.abort();
552            }
553        }
554
555        Ok(())
556    }
557
558    #[inline]
559    async fn subscribe_orderbook(
560        &self,
561        symbols: SmallVec<[String; 8]>,
562    ) -> Result<mpsc::Receiver<Self::DepthMessage>> {
563        // Ensure connection is established
564        self.connect().await?;
565
566        // Create channel for orderbook messages
567        let (tx, rx) = mpsc::channel(1024);
568
569        // Create WebSocket connection
570        let handle = self
571            .create_orderbook_connection(symbols.clone(), tx)
572            .await?;
573
574        // Save task handle
575        let connection_id = format!("orderbook:{}", symbols.join(","));
576        self.ws_handles.write().insert(connection_id.into(), handle);
577
578        Ok(rx)
579    }
580
581    #[inline]
582    async fn unsubscribe_orderbook(&self) -> Result<()> {
583        // Find all orderbook subscriptions
584        let mut keys_to_remove = Vec::new();
585
586        for (key, _) in self.subscriptions.read().iter() {
587            if key.starts_with("orderbook:") {
588                keys_to_remove.push(key.clone());
589            }
590        }
591
592        // Send stop signal for each subscription
593        for key in keys_to_remove {
594            if let Some(tx) = self.subscriptions.write().remove(&key) {
595                let _ = tx.send(true);
596            }
597
598            // Abort task
599            if let Some(handle) = self.ws_handles.write().remove(&key) {
600                handle.abort();
601            }
602        }
603
604        Ok(())
605    }
606
607    #[inline]
608    async fn get_instruments(&self) -> Result<Vec<Box<dyn Instrument>>> {
609        // Define the endpoint for exchange info
610        let url = "https://api.binance.com/api/v3/exchangeInfo";
611
612        // Make the HTTP request
613        let response = self
614            .http_client
615            .get(url)
616            .send()
617            .await
618            .context("Failed to fetch Binance exchange info")?;
619
620        // Check if the request was successful
621        if !response.status().is_success() {
622            return Err(anyhow!(
623                "Failed to fetch Binance exchange info: HTTP {}",
624                response.status()
625            ));
626        }
627
628        // Parse the response using simd_json
629        let bytes = response
630            .bytes()
631            .await
632            .context("Failed to get response bytes")?;
633        let mut bytes_vec = bytes.to_vec();
634        let exchange_info: simd_json::OwnedValue = simd_json::from_slice(&mut bytes_vec)
635            .context("Failed to parse Binance exchange info response")?;
636
637        // Extract the symbols from the response
638        let symbols = exchange_info["symbols"].as_array().ok_or_else(|| {
639            anyhow!("Invalid response format: 'symbols' field not found or not an array")
640        })?;
641
642        // Convert to instruments
643        let mut instruments = Vec::with_capacity(symbols.len());
644
645        for symbol_data in symbols {
646            // Skip symbols that are not trading
647            let status = symbol_data["status"].as_str().unwrap_or("");
648            if status != "TRADING" {
649                continue;
650            }
651
652            let symbol = symbol_data["symbol"].as_str().ok_or_else(|| {
653                anyhow!("Invalid symbol data: 'symbol' field not found or not a String")
654            })?;
655
656            // Create instrument
657            let instrument_id = InstrumentId::new(symbol, Venue::Binance);
658            let instrument =
659                Box::new(BinanceInstrument { id: instrument_id }) as Box<dyn Instrument>;
660
661            instruments.push(instrument);
662        }
663
664        Ok(instruments)
665    }
666
667    #[inline]
668    async fn get_historical_trades(
669        &self,
670        _symbol: &str,
671        _limit: Option<u32>,
672    ) -> Result<Vec<MarketTrade>> {
673        // Fetch historical trades from Binance API
674        // Not fully implemented yet
675        Err(anyhow!("Not implemented yet"))
676    }
677
678    #[inline]
679    async fn get_orderbook_snapshot(
680        &self,
681        symbol: &str,
682        depth: Option<u32>,
683    ) -> Result<OrderBookSnapshot> {
684        // Fetch orderbook snapshot
685        let snapshot = self.fetch_orderbook_snapshot(symbol, depth).await?;
686        let parsed = ParsedOrderbookSnapshot::from(snapshot);
687
688        // Convert to model OrderBookSnapshot
689        let instrument_id = InstrumentId::new(symbol, Venue::Binance);
690
691        // Create OrderBookSnapshot with current timestamp (nanoseconds since epoch)
692        let timestamp = self.clock.raw();
693
694        // Convert to OrderBookSnapshot
695        let mut order_book_depth =
696            OrderBookSnapshot::new_empty(instrument_id, timestamp, parsed.last_update_id);
697
698        // Add bids and asks
699        for (price, quantity) in parsed.bids {
700            order_book_depth.add_bid(price, quantity);
701        }
702
703        for (price, quantity) in parsed.asks {
704            order_book_depth.add_ask(price, quantity);
705        }
706
707        Ok(order_book_depth)
708    }
709
710    #[inline]
711    async fn get_realtime_orderbook(&self, _symbol: &str) -> Result<SharedSimdOrderBook> {
712        // Not yet implemented
713        Err(anyhow!("Not implemented yet"))
714    }
715
716    #[inline]
717    async fn is_connected(&self) -> bool {
718        *self.connection_status.read() == ConnectionState::Connected
719    }
720
721    #[inline]
722    async fn connection_status(&self) -> ConnectionState {
723        *self.connection_status.read()
724    }
725
726    #[inline]
727    async fn get_stats(&self) -> ConnectionStats {
728        self.stats.read().clone()
729    }
730
731    #[inline]
732    async fn ping(&self) -> Result<u64> {
733        // For WebSocketClient pattern, ping/pong is handled automatically
734        // Return a simple network latency test via HTTP
735        let start = self.clock.raw();
736
737        let response = self
738            .http_client
739            .get(format!("{}/api/v3/ping", self.config.rest_config.base_url))
740            .send()
741            .await?;
742
743        if response.status().is_success() {
744            Ok(self.clock.raw().saturating_sub(start))
745        } else {
746            Err(anyhow!("Ping failed with status: {}", response.status()))
747        }
748    }
749
750    #[inline]
751    async fn reset_connection(&self) -> Result<()> {
752        // First, unsubscribe all active subscriptions
753        let mut keys_to_remove = Vec::new();
754
755        for (key, _) in self.subscriptions.read().iter() {
756            keys_to_remove.push(key.clone());
757        }
758
759        // Send stop signal for each subscription
760        for key in &keys_to_remove {
761            if let Some(tx) = self.subscriptions.write().remove(key) {
762                let _ = tx.send(true);
763            }
764
765            // Abort task
766            if let Some(handle) = self.ws_handles.write().remove(key) {
767                handle.abort();
768            }
769        }
770
771        // Set status to disconnected
772        *self.connection_status.write() = ConnectionState::Disconnected;
773
774        // Reset stats
775        *self.stats.write() = ConnectionStats::default();
776
777        // Connect again
778        self.connect().await
779    }
780
781    #[inline]
782    fn get_rate_limits(&self) -> Vec<RateLimit> {
783        BINANCE_SPOT_RATE_LIMITS.to_vec()
784    }
785}