rusty_feeder/exchange/binance/spot/
provider.rs1use rusty_common::collections::FxHashMap;
7use simd_json::prelude::{ValueAsArray, ValueAsScalar};
8use smartstring::alias::String;
9use std::fmt::Debug;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use super::message_handler::BinanceSpotMessageHandler;
14use super::message_handler_zerocopy::BinanceSpotZeroCopyMessageHandler;
15use anyhow::{Context, Result, anyhow};
16use async_trait::async_trait;
17use parking_lot::RwLock;
18use quanta::Clock;
19use reqwest::header::{HeaderMap, HeaderValue};
20use rusty_common::json::Value;
21use rusty_common::websocket::{WebSocketClient, WebSocketConfig};
22use rusty_model::{
23 data::{
24 book_snapshot::OrderBookSnapshot, market_trade::MarketTrade,
25 simd_orderbook::SharedSimdOrderBook,
26 },
27 instruments::{Instrument, InstrumentId},
28 venues::Venue,
29};
30use smallvec::SmallVec;
31use tokio::sync::{mpsc, watch};
32use tokio::task::JoinHandle;
33
34use crate::provider::prelude::*;
35
36use super::data::{
37 orderbook::{OrderbookMessage, OrderbookSnapshot, ParsedOrderbookSnapshot},
38 subscription::{create_orderbook_subscription, create_trade_subscription},
39 trade::TradeMessage,
40};
41use super::types::{
42 BINANCE_SPOT_API_URL, BINANCE_SPOT_RATE_LIMITS, BINANCE_SPOT_WS_COMBINED_URL,
43 BINANCE_SPOT_WS_URL,
44};
45
46use crate::exchange::binance::common::constants::DEFAULT_PING_INTERVAL_MS;
47use crate::exchange::binance::instrument::BinanceInstrument;
48
49#[derive(Debug)]
51pub struct BinanceSpotProvider {
52 config: ConnectionConfig,
54
55 connection_status: Arc<RwLock<ConnectionState>>,
57
58 stats: Arc<RwLock<ConnectionStats>>,
60
61 clock: Clock,
63
64 subscriptions: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
66
67 _instruments: Arc<RwLock<FxHashMap<String, Box<dyn Instrument>>>>,
69
70 ws_handles: Arc<RwLock<FxHashMap<String, JoinHandle<()>>>>,
72
73 last_connection_attempt: Arc<RwLock<Instant>>,
75
76 http_client: reqwest::Client,
78}
79
80impl Default for BinanceSpotProvider {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl BinanceSpotProvider {
87 #[inline]
89 #[must_use]
90 pub fn new() -> Self {
91 Self::with_config(None)
92 }
93
94 #[inline]
96 #[must_use]
97 pub fn with_config(config: Option<ConnectionConfig>) -> Self {
98 let mut default_config = ConnectionConfig::default();
99
100 default_config.websocket_config.base_url = BINANCE_SPOT_WS_URL.into();
102 default_config.websocket_config.ping_interval_milliseconds = DEFAULT_PING_INTERVAL_MS; default_config.websocket_config.use_compression = false;
104
105 default_config.rest_config.base_url = BINANCE_SPOT_API_URL.into();
107 default_config.rest_config.timeout_milliseconds = 5000; let config = config.unwrap_or(default_config);
111 let clock = config.clock.clone();
112
113 let mut headers = HeaderMap::new();
115 headers.insert(
116 "User-Agent",
117 HeaderValue::from_str(&config.rest_config.user_agent)
118 .unwrap_or_else(|_| HeaderValue::from_static("RustyHFT/1.0")),
119 );
120
121 let http_client = reqwest::Client::builder()
122 .timeout(Duration::from_millis(
123 config.rest_config.timeout_milliseconds,
124 ))
125 .connect_timeout(Duration::from_millis(
126 config.rest_config.timeout_milliseconds / 2,
127 ))
128 .pool_max_idle_per_host(config.rest_config.connection_pool_size)
129 .pool_idle_timeout(Duration::from_millis(
130 config.rest_config.keep_alive_milliseconds,
131 ))
132 .default_headers(headers)
133 .build()
134 .unwrap_or_default();
135
136 Self {
137 config,
138 connection_status: Arc::new(RwLock::new(ConnectionState::Disconnected)),
139 stats: Arc::new(RwLock::new(ConnectionStats::default())),
140 clock,
141 subscriptions: Arc::new(RwLock::new(FxHashMap::default())),
142 _instruments: Arc::new(RwLock::new(FxHashMap::default())),
143 ws_handles: Arc::new(RwLock::new(FxHashMap::default())),
144 last_connection_attempt: Arc::new(RwLock::new(Instant::now())),
145 http_client,
146 }
147 }
148
149 #[inline]
151 fn update_receive_stats(
152 stats: Arc<RwLock<ConnectionStats>>,
153 message_size: usize,
154 local_time: u64,
155 ) {
156 let mut s = stats.write();
157 s.messages_received += 1;
158 s.bytes_received += message_size as u64;
159 s.last_message_time = local_time;
160 }
161
162 fn create_websocket_config(
164 url: String,
165 use_compression: bool,
166 ping_interval_milliseconds: u64,
167 timeout_milliseconds: u64,
168 ) -> WebSocketConfig {
169 if use_compression {
173 log::debug!(
174 "Binance WebSocket does not support compression extensions, ignoring compression setting"
175 );
176 }
177
178 WebSocketConfig::builder(rusty_common::types::Exchange::Binance, url.to_string())
179 .ping_interval(Duration::from_millis(ping_interval_milliseconds))
180 .timeout(Duration::from_millis(timeout_milliseconds))
181 .compression(rusty_common::websocket::CompressionConfig::disabled())
182 .build()
183 }
184
185 #[inline]
187 async fn connect(&self) -> Result<()> {
188 if *self.connection_status.read() == ConnectionState::Connected {
189 return Ok(());
190 }
191
192 {
194 let now = Instant::now();
195 let last_attempt = *self.last_connection_attempt.read();
196 let backoff_ms = self
197 .config
198 .websocket_config
199 .reconnect
200 .backoff_initial_milliseconds;
201
202 if now.duration_since(last_attempt) < Duration::from_millis(backoff_ms) {
203 tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
205 }
206
207 *self.last_connection_attempt.write() = Instant::now();
209 }
210
211 *self.connection_status.write() = ConnectionState::Connecting;
212
213 *self.connection_status.write() = ConnectionState::Connected;
216 self.stats.write().connected_time = self.clock.raw();
217 Ok(())
218 }
219
220 #[inline]
222 async fn create_trade_connection(
223 &self,
224 symbols: SmallVec<[String; 8]>,
225 tx: mpsc::Sender<TradeMessage>,
226 ) -> Result<JoinHandle<()>> {
227 let is_combined = symbols.len() > 1;
229
230 let url = if is_combined {
231 BINANCE_SPOT_WS_COMBINED_URL.to_string()
232 } else {
233 if symbols.len() == 1 {
235 let symbol_lower = symbols[0].to_lowercase();
236 format!("wss://stream.binance.com:9443/ws/{symbol_lower}@trade")
237 } else {
238 BINANCE_SPOT_WS_URL.to_string()
239 }
240 };
241
242 let connection_id = format!("trade-{}", symbols.join(","));
243
244 let subscription =
246 create_trade_subscription(&connection_id, symbols.clone(), None, None, false);
247
248 let (stop_tx, stop_rx) = watch::channel(false);
250 self.subscriptions
251 .write()
252 .insert(connection_id.clone().into(), stop_tx);
253
254 let clock = self.clock.clone();
256 let timeout_milliseconds = self.config.websocket_config.timeout_milliseconds;
257 let ping_interval_milliseconds = self.config.websocket_config.ping_interval_milliseconds;
258 let stats = self.stats.clone();
259 let connection_status = self.connection_status.clone();
260 let use_compression = self.config.websocket_config.use_compression;
261 let use_zero_copy = self.config.websocket_config.use_zero_copy_parsing;
262
263 let handle = tokio::spawn(async move {
265 loop {
266 if *stop_rx.borrow() {
268 break;
269 }
270
271 let ws_config = Self::create_websocket_config(
273 url.clone().into(),
274 use_compression,
275 ping_interval_milliseconds,
276 timeout_milliseconds,
277 );
278
279 let mut client = WebSocketClient::new(ws_config);
281
282 *connection_status.write() = ConnectionState::Connecting;
284
285 let result = if use_zero_copy {
287 let handler = BinanceSpotZeroCopyMessageHandler::new_trade_handler(
288 clock.clone(),
289 stats.clone(),
290 tx.clone(),
291 is_combined,
292 None, );
294 client.run(handler).await
295 } else {
296 let handler = BinanceSpotMessageHandler::new_trade_handler(
297 clock.clone(),
298 stats.clone(),
299 tx.clone(),
300 is_combined,
301 None, );
303 client.run(handler).await
304 };
305
306 if let Err(e) = result {
307 log::error!("WebSocket client error: {e}");
308 *connection_status.write() = ConnectionState::Error;
309 }
310
311 if *stop_rx.borrow() {
313 break;
314 }
315
316 tokio::time::sleep(Duration::from_millis(1000)).await;
318 }
319 });
320
321 Ok(handle)
322 }
323
324 #[inline]
326 async fn create_orderbook_connection(
327 &self,
328 symbols: SmallVec<[String; 8]>,
329 tx: mpsc::Sender<OrderbookMessage>,
330 ) -> Result<JoinHandle<()>> {
331 let is_combined = symbols.len() > 1;
333
334 let url = if is_combined {
335 BINANCE_SPOT_WS_COMBINED_URL.to_string()
336 } else {
337 BINANCE_SPOT_WS_URL.to_string()
338 };
339
340 let connection_id = format!("orderbook:{}", symbols.join(","));
341
342 let subscription = create_orderbook_subscription(
344 &connection_id,
345 symbols.clone(),
346 None,
347 None,
348 None,
349 None,
350 false,
351 );
352
353 let (stop_tx, stop_rx) = watch::channel(false);
355 self.subscriptions
356 .write()
357 .insert(connection_id.clone().into(), stop_tx);
358
359 let clock = self.clock.clone();
361 let timeout_milliseconds = self.config.websocket_config.timeout_milliseconds;
362 let ping_interval_milliseconds = self.config.websocket_config.ping_interval_milliseconds;
363 let stats = self.stats.clone();
364 let connection_status = self.connection_status.clone();
365 let use_compression = self.config.websocket_config.use_compression;
366 let use_zero_copy = self.config.websocket_config.use_zero_copy_parsing;
367
368 let handle = tokio::spawn(async move {
370 loop {
371 if *stop_rx.borrow() {
373 break;
374 }
375
376 let ws_config = Self::create_websocket_config(
378 url.clone().into(),
379 use_compression,
380 ping_interval_milliseconds,
381 timeout_milliseconds,
382 );
383
384 let mut client = WebSocketClient::new(ws_config);
386
387 *connection_status.write() = ConnectionState::Connecting;
389
390 let result = if use_zero_copy {
392 let handler = BinanceSpotZeroCopyMessageHandler::new_orderbook_handler(
393 clock.clone(),
394 stats.clone(),
395 tx.clone(),
396 is_combined,
397 None, );
399 client.run(handler).await
400 } else {
401 let handler = BinanceSpotMessageHandler::new_orderbook_handler(
402 clock.clone(),
403 stats.clone(),
404 tx.clone(),
405 is_combined,
406 None, );
408 client.run(handler).await
409 };
410
411 if let Err(e) = result {
412 log::error!("WebSocket client error: {e}");
413 *connection_status.write() = ConnectionState::Error;
414 }
415
416 if *stop_rx.borrow() {
418 break;
419 }
420
421 tokio::time::sleep(Duration::from_millis(1000)).await;
423 }
424 });
425
426 Ok(handle)
427 }
428
429 async fn fetch_orderbook_snapshot(
431 &self,
432 symbol: &str,
433 limit: Option<u32>,
434 ) -> Result<OrderbookSnapshot> {
435 let limit = limit.unwrap_or(100);
436 let url = format!(
437 "{}/api/v3/depth?symbol={}&limit={}",
438 self.config.rest_config.base_url, symbol, limit
439 );
440
441 let response = self.http_client.get(&url).send().await?;
442
443 if !response.status().is_success() {
444 let error_text = response.text().await?;
445 return Err(anyhow!(
446 "Failed to fetch orderbook snapshot: {}",
447 error_text
448 ));
449 }
450
451 let bytes = response
453 .bytes()
454 .await
455 .map_err(|e| anyhow!("Failed to get response bytes: {}", e))?;
456 let mut bytes_vec = bytes.to_vec();
457 let snapshot = simd_json::from_slice::<OrderbookSnapshot>(&mut bytes_vec)
458 .map_err(|e| anyhow!("Failed to parse JSON: {}", e))?;
459 Ok(snapshot)
460 }
461}
462
463#[async_trait]
464impl Provider for BinanceSpotProvider {
465 type TradeMessage = TradeMessage;
466 type DepthMessage = OrderbookMessage;
467 type InstrumentMessage = Value;
468
469 fn name(&self) -> &'static str {
470 "BinanceSpot"
471 }
472
473 fn venue(&self) -> rusty_model::venues::Venue {
474 rusty_model::venues::Venue::Binance
475 }
476
477 fn config(&self) -> &ConnectionConfig {
478 &self.config
479 }
480
481 async fn init(&mut self) -> Result<()> {
482 self.connect().await
483 }
484
485 async fn shutdown(&mut self) -> Result<()> {
486 let mut keys_to_remove = Vec::new();
488
489 for (key, _) in self.subscriptions.read().iter() {
490 keys_to_remove.push(key.clone());
491 }
492
493 for key in keys_to_remove {
495 if let Some(tx) = self.subscriptions.write().remove(&key) {
496 let _ = tx.send(true);
497 }
498
499 if let Some(handle) = self.ws_handles.write().remove(&key) {
501 handle.abort();
502 }
503 }
504
505 *self.connection_status.write() = ConnectionState::Disconnected;
507
508 Ok(())
509 }
510
511 #[inline]
512 async fn subscribe_trades(
513 &self,
514 symbols: SmallVec<[String; 8]>,
515 ) -> Result<mpsc::Receiver<Self::TradeMessage>> {
516 self.connect().await?;
518
519 let (tx, rx) = mpsc::channel(1024);
521
522 let handle = self.create_trade_connection(symbols.clone(), tx).await?;
524
525 let connection_id = format!("trade-{}", symbols.join(","));
527 self.ws_handles.write().insert(connection_id.into(), handle);
528
529 Ok(rx)
530 }
531
532 #[inline]
533 async fn unsubscribe_trades(&self) -> Result<()> {
534 let mut keys_to_remove = Vec::new();
536
537 for (key, _) in self.subscriptions.read().iter() {
538 if key.starts_with("trade:") {
539 keys_to_remove.push(key.clone());
540 }
541 }
542
543 for key in keys_to_remove {
545 if let Some(tx) = self.subscriptions.write().remove(&key) {
546 let _ = tx.send(true);
547 }
548
549 if let Some(handle) = self.ws_handles.write().remove(&key) {
551 handle.abort();
552 }
553 }
554
555 Ok(())
556 }
557
558 #[inline]
559 async fn subscribe_orderbook(
560 &self,
561 symbols: SmallVec<[String; 8]>,
562 ) -> Result<mpsc::Receiver<Self::DepthMessage>> {
563 self.connect().await?;
565
566 let (tx, rx) = mpsc::channel(1024);
568
569 let handle = self
571 .create_orderbook_connection(symbols.clone(), tx)
572 .await?;
573
574 let connection_id = format!("orderbook:{}", symbols.join(","));
576 self.ws_handles.write().insert(connection_id.into(), handle);
577
578 Ok(rx)
579 }
580
581 #[inline]
582 async fn unsubscribe_orderbook(&self) -> Result<()> {
583 let mut keys_to_remove = Vec::new();
585
586 for (key, _) in self.subscriptions.read().iter() {
587 if key.starts_with("orderbook:") {
588 keys_to_remove.push(key.clone());
589 }
590 }
591
592 for key in keys_to_remove {
594 if let Some(tx) = self.subscriptions.write().remove(&key) {
595 let _ = tx.send(true);
596 }
597
598 if let Some(handle) = self.ws_handles.write().remove(&key) {
600 handle.abort();
601 }
602 }
603
604 Ok(())
605 }
606
607 #[inline]
608 async fn get_instruments(&self) -> Result<Vec<Box<dyn Instrument>>> {
609 let url = "https://api.binance.com/api/v3/exchangeInfo";
611
612 let response = self
614 .http_client
615 .get(url)
616 .send()
617 .await
618 .context("Failed to fetch Binance exchange info")?;
619
620 if !response.status().is_success() {
622 return Err(anyhow!(
623 "Failed to fetch Binance exchange info: HTTP {}",
624 response.status()
625 ));
626 }
627
628 let bytes = response
630 .bytes()
631 .await
632 .context("Failed to get response bytes")?;
633 let mut bytes_vec = bytes.to_vec();
634 let exchange_info: simd_json::OwnedValue = simd_json::from_slice(&mut bytes_vec)
635 .context("Failed to parse Binance exchange info response")?;
636
637 let symbols = exchange_info["symbols"].as_array().ok_or_else(|| {
639 anyhow!("Invalid response format: 'symbols' field not found or not an array")
640 })?;
641
642 let mut instruments = Vec::with_capacity(symbols.len());
644
645 for symbol_data in symbols {
646 let status = symbol_data["status"].as_str().unwrap_or("");
648 if status != "TRADING" {
649 continue;
650 }
651
652 let symbol = symbol_data["symbol"].as_str().ok_or_else(|| {
653 anyhow!("Invalid symbol data: 'symbol' field not found or not a String")
654 })?;
655
656 let instrument_id = InstrumentId::new(symbol, Venue::Binance);
658 let instrument =
659 Box::new(BinanceInstrument { id: instrument_id }) as Box<dyn Instrument>;
660
661 instruments.push(instrument);
662 }
663
664 Ok(instruments)
665 }
666
667 #[inline]
668 async fn get_historical_trades(
669 &self,
670 _symbol: &str,
671 _limit: Option<u32>,
672 ) -> Result<Vec<MarketTrade>> {
673 Err(anyhow!("Not implemented yet"))
676 }
677
678 #[inline]
679 async fn get_orderbook_snapshot(
680 &self,
681 symbol: &str,
682 depth: Option<u32>,
683 ) -> Result<OrderBookSnapshot> {
684 let snapshot = self.fetch_orderbook_snapshot(symbol, depth).await?;
686 let parsed = ParsedOrderbookSnapshot::from(snapshot);
687
688 let instrument_id = InstrumentId::new(symbol, Venue::Binance);
690
691 let timestamp = self.clock.raw();
693
694 let mut order_book_depth =
696 OrderBookSnapshot::new_empty(instrument_id, timestamp, parsed.last_update_id);
697
698 for (price, quantity) in parsed.bids {
700 order_book_depth.add_bid(price, quantity);
701 }
702
703 for (price, quantity) in parsed.asks {
704 order_book_depth.add_ask(price, quantity);
705 }
706
707 Ok(order_book_depth)
708 }
709
710 #[inline]
711 async fn get_realtime_orderbook(&self, _symbol: &str) -> Result<SharedSimdOrderBook> {
712 Err(anyhow!("Not implemented yet"))
714 }
715
716 #[inline]
717 async fn is_connected(&self) -> bool {
718 *self.connection_status.read() == ConnectionState::Connected
719 }
720
721 #[inline]
722 async fn connection_status(&self) -> ConnectionState {
723 *self.connection_status.read()
724 }
725
726 #[inline]
727 async fn get_stats(&self) -> ConnectionStats {
728 self.stats.read().clone()
729 }
730
731 #[inline]
732 async fn ping(&self) -> Result<u64> {
733 let start = self.clock.raw();
736
737 let response = self
738 .http_client
739 .get(format!("{}/api/v3/ping", self.config.rest_config.base_url))
740 .send()
741 .await?;
742
743 if response.status().is_success() {
744 Ok(self.clock.raw().saturating_sub(start))
745 } else {
746 Err(anyhow!("Ping failed with status: {}", response.status()))
747 }
748 }
749
750 #[inline]
751 async fn reset_connection(&self) -> Result<()> {
752 let mut keys_to_remove = Vec::new();
754
755 for (key, _) in self.subscriptions.read().iter() {
756 keys_to_remove.push(key.clone());
757 }
758
759 for key in &keys_to_remove {
761 if let Some(tx) = self.subscriptions.write().remove(key) {
762 let _ = tx.send(true);
763 }
764
765 if let Some(handle) = self.ws_handles.write().remove(key) {
767 handle.abort();
768 }
769 }
770
771 *self.connection_status.write() = ConnectionState::Disconnected;
773
774 *self.stats.write() = ConnectionStats::default();
776
777 self.connect().await
779 }
780
781 #[inline]
782 fn get_rate_limits(&self) -> Vec<RateLimit> {
783 BINANCE_SPOT_RATE_LIMITS.to_vec()
784 }
785}