1use rusty_common::collections::FxHashMap;
7use simd_json::prelude::{ValueAsArray, ValueAsObject, ValueAsScalar, ValueObjectAccess};
8use smartstring::alias::String;
9use std::fmt::Debug;
10use std::sync::Arc;
11
12use anyhow::{Result, anyhow};
13use async_trait::async_trait;
14use parking_lot::RwLock;
15use quanta::Clock;
16use rusty_common::json::{self};
17use rusty_model::{
18 data::{book_snapshot::OrderBookSnapshot, market_trade::MarketTrade},
19 instruments::{Instrument, InstrumentId},
20 venues::Venue,
21};
22use smallvec::{SmallVec, smallvec};
23use tokio::sync::{mpsc, watch};
24
25use rusty_common::websocket::exchanges::binance;
27use rusty_common::websocket::{
28 Message, MessageHandler, WebSocketClient, WebSocketConfig, WebSocketError, WebSocketResult,
29};
30
31use crate::exchange::binance::common::messages::SubscriptionResult;
32use crate::provider::ext::DataHandler;
33use crate::provider::prelude::*;
34use rust_decimal::prelude::FromStr;
35use rusty_model::data::simd_orderbook::SharedSimdOrderBook;
36
37use super::data::{
38 orderbook::OrderbookMessage,
39 subscription::{create_orderbook_subscription, create_trade_subscription},
40 trade::AggTradeMessage,
41};
42use super::types::{
43 BINANCE_COIN_FUTURES_API_URL, BINANCE_COIN_FUTURES_WS_COMBINED_URL,
44 BINANCE_COIN_FUTURES_WS_URL, BINANCE_USD_FUTURES_API_URL, BINANCE_USD_FUTURES_WS_COMBINED_URL,
45 BINANCE_USD_FUTURES_WS_URL,
46};
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum BinanceFuturesMarketType {
51 UsdM,
53
54 CoinM,
56}
57
58pub struct BinanceFuturesProvider {
60 config: ConnectionConfig,
62
63 market_type: BinanceFuturesMarketType,
65
66 connection_status: Arc<RwLock<ConnectionState>>,
68
69 stats: Arc<RwLock<ConnectionStats>>,
71
72 clock: Clock,
74
75 subscriptions: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
77
78 message_tx: mpsc::Sender<ProviderMessage>,
80
81 message_rx: Arc<tokio::sync::RwLock<mpsc::Receiver<ProviderMessage>>>,
83
84 ws_client: Option<WebSocketClient>,
86
87 data_handler: Arc<tokio::sync::RwLock<Option<Box<dyn DataHandler + Send + Sync>>>>,
89
90 http_client: reqwest::Client,
92}
93
94impl Debug for BinanceFuturesProvider {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 f.debug_struct("BinanceFuturesProvider")
97 .field("market_type", &self.market_type)
98 .field("connection_status", &self.connection_status)
99 .field("stats", &self.stats)
100 .field("ws_client", &self.ws_client.is_some())
101 .field("data_handler_present", &true) .finish()
103 }
104}
105
106struct BinanceFuturesHandler {
108 data_handler: Arc<tokio::sync::RwLock<Option<Box<dyn DataHandler + Send + Sync>>>>,
109 clock: Clock,
110 stats: Arc<RwLock<ConnectionStats>>,
111 message_tx: mpsc::Sender<ProviderMessage>,
112}
113
114#[async_trait]
115impl MessageHandler for BinanceFuturesHandler {
116 async fn on_connected(&mut self) -> WebSocketResult<()> {
117 log::info!("Binance Futures WebSocket connected");
118 let _ = self.message_tx.send(ProviderMessage::Connected).await;
119 Ok(())
120 }
121
122 async fn on_message(&mut self, message: Message) -> WebSocketResult<()> {
123 match message {
124 Message::Text(text) => {
125 match unsafe { simd_json::from_str::<json::Value>(&mut text.clone()) } {
128 Ok(value) => {
129 if let Err(e) = self.handle_market_data(&value).await {
130 log::error!("Failed to handle market data: {e}");
131 }
132 }
133 Err(e) => {
134 log::error!("Failed to parse WebSocket message: {e}");
135 }
136 }
137 }
138 Message::Binary(_) => {
139 log::warn!("Received unexpected binary message from Binance");
140 }
141 _ => {}
142 }
143 Ok(())
144 }
145
146 async fn on_error(&mut self, error: WebSocketError) -> WebSocketResult<()> {
147 log::error!("Binance Futures WebSocket error: {}", error);
148 let _ = self
149 .message_tx
150 .send(ProviderMessage::Error(error.to_string().into()))
151 .await;
152 Ok(())
153 }
154
155 async fn on_disconnected(&mut self) -> WebSocketResult<()> {
156 log::info!("Binance Futures WebSocket disconnected");
157 let _ = self.message_tx.send(ProviderMessage::Disconnected).await;
158 Ok(())
159 }
160}
161
162impl BinanceFuturesHandler {
163 async fn handle_market_data(&self, value: &json::Value) -> Result<()> {
164 if let Ok(msg) = json::from_value::<OrderbookMessage>(value.clone()) {
167 return self.handle_orderbook_update(msg).await;
168 }
169
170 if let Ok(msg) = json::from_value::<AggTradeMessage>(value.clone()) {
172 return self.handle_agg_trade(msg).await;
173 }
174
175 if let Ok(msg) = json::from_value::<SubscriptionResult>(value.clone()) {
177 log::debug!("Received subscription confirmation: {msg:?}");
178 return Ok(());
179 }
180
181 log::debug!("Unhandled message type");
182 Ok(())
183 }
184
185 async fn handle_orderbook_update(&self, msg: OrderbookMessage) -> Result<()> {
186 self.stats.write().messages_received += 1;
188
189 if let Some(handler) = &*self.data_handler.read().await {
191 let snapshot = self.convert_to_snapshot(msg)?;
193 handler.handle_orderbook_snapshot(snapshot).await?;
194 }
195
196 Ok(())
197 }
198
199 async fn handle_trade(&self, msg: MarketTrade) -> Result<()> {
200 self.stats.write().messages_received += 1;
202
203 if let Some(handler) = &*self.data_handler.read().await {
205 handler.handle_market_trade(msg).await?;
206 }
207
208 Ok(())
209 }
210
211 async fn handle_agg_trade(&self, msg: AggTradeMessage) -> Result<()> {
212 self.stats.write().messages_received += 1;
214
215 if let Some(handler) = &*self.data_handler.read().await {
217 let trade = self.convert_agg_trade(msg)?;
218 handler.handle_market_trade(trade).await?;
219 }
220
221 Ok(())
222 }
223
224 fn convert_to_snapshot(&self, msg: OrderbookMessage) -> Result<OrderBookSnapshot> {
225 use rust_decimal::prelude::FromStr;
226 use rusty_model::data::orderbook::PriceLevel;
227
228 let timestamp_init = self.clock.raw();
230
231 let instrument_id = InstrumentId::new(&msg.symbol, Venue::Binance);
233
234 let mut bids = SmallVec::<[PriceLevel; 64]>::new();
236 for bid in msg.bids {
237 if bid.len() >= 2 {
238 let price = rust_decimal::Decimal::from_str(&bid[0])
239 .map_err(|e| anyhow!("Failed to parse bid price: {}", e))?;
240 let quantity = rust_decimal::Decimal::from_str(&bid[1])
241 .map_err(|e| anyhow!("Failed to parse bid quantity: {}", e))?;
242
243 bids.push(PriceLevel { price, quantity });
244 }
245 }
246
247 let mut asks = SmallVec::<[PriceLevel; 64]>::new();
249 for ask in msg.asks {
250 if ask.len() >= 2 {
251 let price = rust_decimal::Decimal::from_str(&ask[0])
252 .map_err(|e| anyhow!("Failed to parse ask price: {}", e))?;
253 let quantity = rust_decimal::Decimal::from_str(&ask[1])
254 .map_err(|e| anyhow!("Failed to parse ask quantity: {}", e))?;
255
256 asks.push(PriceLevel { price, quantity });
257 }
258 }
259
260 let snapshot = OrderBookSnapshot::new(
262 instrument_id,
263 bids,
264 asks,
265 msg.final_update_id,
266 msg.event_time * 1_000_000, timestamp_init,
268 );
269
270 Ok(snapshot)
271 }
272
273 fn convert_agg_trade(&self, msg: AggTradeMessage) -> Result<MarketTrade> {
274 use rust_decimal::prelude::FromStr;
275 use rusty_model::enums::OrderSide;
276
277 let price = rust_decimal::Decimal::from_str(&msg.price)
279 .map_err(|e| anyhow!("Failed to parse trade price: {}", e))?;
280 let quantity = rust_decimal::Decimal::from_str(&msg.quantity)
281 .map_err(|e| anyhow!("Failed to parse trade quantity: {}", e))?;
282
283 let direction = if msg.is_buyer_market_maker {
286 OrderSide::Sell } else {
288 OrderSide::Buy };
290
291 let instrument_id = InstrumentId::new(&msg.symbol, Venue::Binance);
293
294 let trade = MarketTrade {
296 timestamp: self.clock.now(),
297 exchange_time_ns: msg.trade_time * 1_000_000, price,
299 quantity,
300 direction,
301 instrument_id,
302 };
303
304 Ok(trade)
305 }
306}
307
308enum ProviderMessage {
310 Connected,
311 Disconnected,
312 Error(String),
313 Subscribe(String),
314 Unsubscribe(String),
315}
316
317impl HttpClientProvider for BinanceFuturesProvider {
318 fn http_client(&self) -> &reqwest::Client {
319 &self.http_client
320 }
321}
322
323impl BinanceFuturesProvider {
324 #[must_use]
326 pub fn new(config: ConnectionConfig, market_type: BinanceFuturesMarketType) -> Self {
327 let (message_tx, message_rx) = mpsc::channel(1000);
328
329 let http_client = Self::create_http_client(&config);
331
332 Self {
333 config,
334 market_type,
335 connection_status: Arc::new(RwLock::new(ConnectionState::Disconnected)),
336 stats: Arc::new(RwLock::new(ConnectionStats::default())),
337 clock: Clock::new(),
338 subscriptions: Arc::new(RwLock::new(FxHashMap::default())),
339 message_tx,
340 message_rx: Arc::new(tokio::sync::RwLock::new(message_rx)),
341 ws_client: None,
342 data_handler: Arc::new(tokio::sync::RwLock::new(None)),
343 http_client,
344 }
345 }
346
347 const fn get_ws_url(&self, use_combined: bool) -> &'static str {
349 match (self.market_type, use_combined) {
350 (BinanceFuturesMarketType::UsdM, true) => BINANCE_USD_FUTURES_WS_COMBINED_URL,
351 (BinanceFuturesMarketType::UsdM, false) => BINANCE_USD_FUTURES_WS_URL,
352 (BinanceFuturesMarketType::CoinM, true) => BINANCE_COIN_FUTURES_WS_COMBINED_URL,
353 (BinanceFuturesMarketType::CoinM, false) => BINANCE_COIN_FUTURES_WS_URL,
354 }
355 }
356
357 fn create_ws_config(&self) -> WebSocketConfig {
359 binance::default_config(self.get_ws_url(true).to_string())
360 }
361}
362
363impl BinanceFuturesProvider {
364 pub const fn name(&self) -> &str {
366 match self.market_type {
367 BinanceFuturesMarketType::UsdM => "binance_futures_usdm",
368 BinanceFuturesMarketType::CoinM => "binance_futures_coinm",
369 }
370 }
371
372 pub const fn venue(&self) -> Venue {
374 Venue::Binance
375 }
376
377 pub async fn connect(&mut self) -> Result<()> {
379 *self.connection_status.write() = ConnectionState::Connecting;
381
382 let config = self.create_ws_config();
384 let client = WebSocketClient::new(config);
385
386 let handler = BinanceFuturesHandler {
388 data_handler: self.data_handler.clone(),
389 clock: self.clock.clone(),
390 stats: self.stats.clone(),
391 message_tx: self.message_tx.clone(),
392 };
393
394 self.ws_client = Some(client);
396
397 *self.connection_status.write() = ConnectionState::Connected;
403
404 Ok(())
405 }
406
407 pub async fn disconnect(&mut self) -> Result<()> {
409 if let Some(client) = &self.ws_client {
410 client.shutdown();
411 }
412
413 *self.connection_status.write() = ConnectionState::Disconnected;
414 self.ws_client = None;
415
416 Ok(())
417 }
418
419 pub async fn subscribe_orderbook(
421 &mut self,
422 instrument_id: &InstrumentId,
423 depth: usize,
424 ) -> Result<()> {
425 let symbol = instrument_id.symbol.to_lowercase();
426 let stream = format!("{symbol}@depth{depth}@100ms");
427
428 let mut symbols: SmallVec<[String; 8]> = SmallVec::<[String; 8]>::new();
430 symbols.push(symbol.into());
431 let sub_msg =
432 create_orderbook_subscription("orderbook", symbols, Some(depth), Some(1), None, false);
433
434 if let Some(client) = &self.ws_client {
436 let message = simd_json::to_string(&sub_msg)
437 .map_err(|e| anyhow!("Failed to serialize subscription: {}", e))?;
438
439 client
440 .send(Message::Text(message.into()))
441 .await
442 .map_err(|e| anyhow!("Failed to send subscription: {:?}", e))?;
443
444 log::info!("Subscribed to orderbook: {stream}");
445 } else {
446 return Err(anyhow!("WebSocket client not connected"));
447 }
448
449 Ok(())
450 }
451
452 pub async fn subscribe_trades(&mut self, instrument_id: &InstrumentId) -> Result<()> {
454 let symbol = instrument_id.symbol.to_lowercase();
455 let stream = format!("{symbol}@aggTrade");
456
457 let mut symbols: SmallVec<[String; 8]> = SmallVec::<[String; 8]>::new();
459 symbols.push(symbol.into());
460 let sub_msg = create_trade_subscription("trades", symbols, Some(1), None, false);
461
462 if let Some(client) = &self.ws_client {
464 let message = simd_json::to_string(&sub_msg)
465 .map_err(|e| anyhow!("Failed to serialize subscription: {}", e))?;
466
467 client
468 .send(Message::Text(message.into()))
469 .await
470 .map_err(|e| anyhow!("Failed to send subscription: {:?}", e))?;
471
472 log::info!("Subscribed to trades: {stream}");
473 } else {
474 return Err(anyhow!("WebSocket client not connected"));
475 }
476
477 Ok(())
478 }
479
480 pub async fn unsubscribe_orderbook(&mut self, instrument_id: &InstrumentId) -> Result<()> {
482 let symbol = instrument_id.symbol.to_lowercase();
483
484 self.subscriptions.write().remove(symbol.as_str());
486
487 if let Some(client) = &self.ws_client {
489 use super::data::subscription::WebSocketSubscribeRequest;
490
491 let depth = 10; let unsub_msg = WebSocketSubscribeRequest {
495 id: 2,
496 method: "UNSUBSCRIBE".into(),
497 params: smallvec![format!("{symbol}@depth{depth}").into()],
498 };
499
500 let message = simd_json::to_string(&unsub_msg)
501 .map_err(|e| anyhow!("Failed to serialize unsubscription: {}", e))?;
502
503 client
504 .send(Message::Text(message.into()))
505 .await
506 .map_err(|e| anyhow!("Failed to send unsubscription: {:?}", e))?;
507
508 log::info!("Unsubscribed from orderbook: {symbol}");
509 }
510
511 Ok(())
512 }
513
514 pub async fn unsubscribe_trades(&mut self, instrument_id: &InstrumentId) -> Result<()> {
516 let symbol = instrument_id.symbol.to_lowercase();
517
518 self.subscriptions.write().remove(symbol.as_str());
520
521 if let Some(client) = &self.ws_client {
523 use super::data::subscription::WebSocketSubscribeRequest;
524
525 let unsub_msg = WebSocketSubscribeRequest {
526 id: 2,
527 method: "UNSUBSCRIBE".into(),
528 params: smallvec![format!("{symbol}@aggTrade").into()],
529 };
530
531 let message = simd_json::to_string(&unsub_msg)
532 .map_err(|e| anyhow!("Failed to serialize unsubscription: {}", e))?;
533
534 client
535 .send(Message::Text(message.into()))
536 .await
537 .map_err(|e| anyhow!("Failed to send unsubscription: {:?}", e))?;
538
539 log::info!("Unsubscribed from trades: {symbol}");
540 }
541
542 Ok(())
543 }
544
545 pub fn is_connected(&self) -> bool {
547 matches!(*self.connection_status.read(), ConnectionState::Connected)
548 }
549
550 pub fn connection_stats(&self) -> ConnectionStats {
552 self.stats.read().clone()
553 }
554
555 pub async fn run_with_handler(
557 &mut self,
558 handler: Box<dyn DataHandler + Send + Sync>,
559 ) -> Result<()> {
560 *self.data_handler.write().await = Some(handler);
562
563 if !self.is_connected() {
565 self.connect().await?;
566 }
567
568 let mut rx = self.message_rx.write().await;
570 while let Some(msg) = rx.recv().await {
571 match msg {
572 ProviderMessage::Connected => {
573 log::info!("Provider connected");
574 }
575 ProviderMessage::Disconnected => {
576 log::warn!("Provider disconnected");
577 }
579 ProviderMessage::Error(e) => {
580 log::error!("Provider error: {e}");
581 }
582 _ => {}
583 }
584 }
585
586 Ok(())
587 }
588
589 pub async fn fetch_instruments(&self) -> Result<Vec<Box<dyn Instrument>>> {
591 use reqwest::Client;
592 use rusty_model::instruments::SpotInstrument;
593
594 let api_url = match self.market_type {
596 BinanceFuturesMarketType::UsdM => BINANCE_USD_FUTURES_API_URL,
597 BinanceFuturesMarketType::CoinM => BINANCE_COIN_FUTURES_API_URL,
598 };
599
600 let client = Client::new();
602
603 let url = format!("{api_url}/fapi/v1/exchangeInfo");
605 let response = client
606 .get(&url)
607 .send()
608 .await
609 .map_err(|e| anyhow!("Failed to fetch exchange info: {}", e))?;
610
611 if !response.status().is_success() {
613 let status = response.status();
614 let body = response.text().await.unwrap_or_default();
615 return Err(anyhow!(
616 "Failed to fetch instruments: {} - {}",
617 status,
618 body
619 ));
620 }
621
622 let body = response
624 .text()
625 .await
626 .map_err(|e| anyhow!("Failed to read response body: {}", e))?;
627 let data: json::Value = unsafe { simd_json::from_str(&mut body.clone()) }
628 .map_err(|e| anyhow!("Failed to parse exchange info: {}", e))?;
629
630 let mut instruments = Vec::new();
632
633 if let Some(obj) = data.as_object()
634 && let Some(symbols) = obj.get("symbols").and_then(|v| v.as_array())
635 {
636 for symbol_data in symbols {
637 if let Some(symbol_obj) = symbol_data.as_object() {
639 let symbol = symbol_obj
641 .get("symbol")
642 .and_then(|v| v.as_str())
643 .ok_or_else(|| anyhow!("Missing symbol field"))?;
644
645 let status = symbol_obj
646 .get("status")
647 .and_then(|v| v.as_str())
648 .unwrap_or("");
649
650 if status != "TRADING" {
652 continue;
653 }
654
655 let base_asset = symbol_obj
656 .get("baseAsset")
657 .and_then(|v| v.as_str())
658 .ok_or_else(|| anyhow!("Missing baseAsset field"))?;
659
660 let quote_asset = symbol_obj
661 .get("quoteAsset")
662 .and_then(|v| v.as_str())
663 .ok_or_else(|| anyhow!("Missing quoteAsset field"))?;
664
665 let instrument =
668 SpotInstrument::new(symbol, base_asset, quote_asset, Venue::Binance);
669
670 instruments.push(Box::new(instrument) as Box<dyn Instrument>);
671 }
672 }
673 }
674
675 log::info!(
676 "Fetched {} instruments from Binance Futures",
677 instruments.len()
678 );
679 Ok(instruments)
680 }
681}
682
683#[async_trait]
685impl Provider for BinanceFuturesProvider {
686 type TradeMessage = AggTradeMessage;
687 type DepthMessage = OrderbookMessage;
688 type InstrumentMessage = json::Value;
689
690 fn name(&self) -> &'static str {
691 match self.market_type {
692 BinanceFuturesMarketType::UsdM => "BinanceFuturesUSDM",
693 BinanceFuturesMarketType::CoinM => "BinanceFuturesCOINM",
694 }
695 }
696
697 fn venue(&self) -> Venue {
698 Venue::Binance
699 }
700
701 fn config(&self) -> &ConnectionConfig {
702 &self.config
703 }
704
705 async fn init(&mut self) -> Result<()> {
706 let ws_url = match self.market_type {
708 BinanceFuturesMarketType::UsdM => BINANCE_USD_FUTURES_WS_COMBINED_URL.to_string(),
709 BinanceFuturesMarketType::CoinM => BINANCE_COIN_FUTURES_WS_COMBINED_URL.to_string(),
710 };
711
712 let ws_config = binance::default_config(ws_url);
713 self.ws_client = Some(WebSocketClient::new(ws_config));
714
715 *self.connection_status.write() = ConnectionState::Connected;
717 Ok(())
718 }
719
720 async fn subscribe_trades(
721 &self,
722 symbols: SmallVec<[String; 8]>,
723 ) -> Result<mpsc::Receiver<Self::TradeMessage>> {
724 let (tx, rx) = mpsc::channel(1024);
725
726 let sub_msg = create_trade_subscription("trades", symbols.clone(), Some(1), None, false);
728
729 if let Some(client) = &self.ws_client {
730 let message = simd_json::to_string(&sub_msg)
731 .map_err(|e| anyhow!("Failed to serialize subscription: {}", e))?;
732
733 client
734 .send(Message::Text(message.into()))
735 .await
736 .map_err(|e| anyhow!("Failed to send subscription: {:?}", e))?;
737
738 log::info!("Subscribed to trades for symbols: {symbols:?}");
739 } else {
740 return Err(anyhow!("WebSocket client not initialized"));
741 }
742
743 Ok(rx)
744 }
745
746 async fn unsubscribe_trades(&self) -> Result<()> {
747 Ok(())
749 }
750
751 async fn subscribe_orderbook(
752 &self,
753 symbols: SmallVec<[String; 8]>,
754 ) -> Result<mpsc::Receiver<Self::DepthMessage>> {
755 let (tx, rx) = mpsc::channel(1024);
756
757 let sub_msg = create_orderbook_subscription(
759 "orderbook",
760 symbols.clone(),
761 Some(20), Some(1),
763 None,
764 false,
765 );
766
767 if let Some(client) = &self.ws_client {
768 let message = simd_json::to_string(&sub_msg)
769 .map_err(|e| anyhow!("Failed to serialize subscription: {}", e))?;
770
771 client
772 .send(Message::Text(message.into()))
773 .await
774 .map_err(|e| anyhow!("Failed to send subscription: {:?}", e))?;
775
776 log::info!("Subscribed to orderbook for symbols: {symbols:?}");
777 } else {
778 return Err(anyhow!("WebSocket client not initialized"));
779 }
780
781 Ok(rx)
782 }
783
784 async fn unsubscribe_orderbook(&self) -> Result<()> {
785 Ok(())
787 }
788
789 async fn get_instruments(&self) -> Result<Vec<Box<dyn Instrument>>> {
790 self.fetch_instruments().await
792 }
793
794 async fn get_historical_trades(
795 &self,
796 _symbol: &str,
797 _limit: Option<u32>,
798 ) -> Result<Vec<MarketTrade>> {
799 Err(anyhow!("Historical trades not implemented yet"))
801 }
802
803 async fn get_orderbook_snapshot(
804 &self,
805 symbol: &str,
806 depth: Option<u32>,
807 ) -> Result<OrderBookSnapshot> {
808 let limit = depth.unwrap_or(20).min(5000);
810 let url = match self.market_type {
811 BinanceFuturesMarketType::UsdM => {
812 format!("{BINANCE_USD_FUTURES_API_URL}/fapi/v1/depth?symbol={symbol}&limit={limit}")
813 }
814 BinanceFuturesMarketType::CoinM => {
815 format!(
816 "{BINANCE_COIN_FUTURES_API_URL}/dapi/v1/depth?symbol={symbol}&limit={limit}"
817 )
818 }
819 };
820
821 let response = self.http_client.get(&url).send().await?;
822
823 use crate::exchange::binance::common::rate_limit::{
825 handle_rate_limit_error, parse_order_count_headers,
826 };
827 if response.status() == 429 || response.status() == 418 {
828 let headers = response.headers().clone();
829 handle_rate_limit_error(response.status().as_u16(), &headers)?;
830 }
831
832 let order_counts = parse_order_count_headers(response.headers());
834 if !order_counts.is_empty() {
835 log::debug!("Binance order count limits: {order_counts:?}");
836 }
837
838 let body = response.text().await?;
839 let data: json::Value = unsafe { simd_json::from_str(&mut body.clone()) }?;
840
841 let instrument_id = InstrumentId::new(symbol, Venue::Binance);
843 let timestamp = self.clock.raw();
844 let last_update_id = data
845 .get("lastUpdateId")
846 .and_then(|v| v.as_u64())
847 .unwrap_or(0);
848
849 let mut snapshot = OrderBookSnapshot::new_empty(instrument_id, timestamp, last_update_id);
850
851 if let Some(bids) = data.get("bids").and_then(|v| v.as_array()) {
853 for bid in bids {
854 if let Some(arr) = bid.as_array()
855 && arr.len() >= 2
856 && let (Some(price_str), Some(qty_str)) = (arr[0].as_str(), arr[1].as_str())
857 && let (Ok(price), Ok(quantity)) = (
858 rust_decimal::Decimal::from_str(price_str),
859 rust_decimal::Decimal::from_str(qty_str),
860 )
861 {
862 snapshot.add_bid(price, quantity);
863 }
864 }
865 }
866
867 if let Some(asks) = data.get("asks").and_then(|v| v.as_array()) {
869 for ask in asks {
870 if let Some(arr) = ask.as_array()
871 && arr.len() >= 2
872 && let (Some(price_str), Some(qty_str)) = (arr[0].as_str(), arr[1].as_str())
873 && let (Ok(price), Ok(quantity)) = (
874 rust_decimal::Decimal::from_str(price_str),
875 rust_decimal::Decimal::from_str(qty_str),
876 )
877 {
878 snapshot.add_ask(price, quantity);
879 }
880 }
881 }
882
883 Ok(snapshot)
884 }
885
886 async fn get_realtime_orderbook(&self, _symbol: &str) -> Result<SharedSimdOrderBook> {
887 Err(anyhow!("Realtime orderbook not implemented yet"))
889 }
890
891 async fn is_connected(&self) -> bool {
892 *self.connection_status.read() == ConnectionState::Connected
893 }
894
895 async fn connection_status(&self) -> ConnectionState {
896 *self.connection_status.read()
897 }
898
899 async fn get_stats(&self) -> ConnectionStats {
900 self.stats.read().clone()
901 }
902
903 async fn ping(&self) -> Result<u64> {
904 if let Some(client) = &self.ws_client {
905 let start = std::time::Instant::now();
906
907 client.send(Message::Ping(vec![])).await?;
909
910 let elapsed = start.elapsed();
912 Ok(elapsed.as_millis() as u64)
913 } else {
914 Err(anyhow!("WebSocket client not initialized"))
915 }
916 }
917
918 async fn reset_connection(&self) -> Result<()> {
919 *self.connection_status.write() = ConnectionState::Disconnected;
921
922 *self.stats.write() = ConnectionStats::default();
924
925 Ok(())
927 }
928
929 fn get_rate_limits(&self) -> Vec<RateLimit> {
930 vec![
931 RateLimit {
932 limit_type: "REQUEST_WEIGHT",
933 interval: "MINUTE",
934 interval_num: 1,
935 limit: 1200,
936 },
937 RateLimit {
938 limit_type: "ORDERS",
939 interval: "SECOND",
940 interval_num: 10,
941 limit: 300,
942 },
943 ]
944 }
945
946 async fn shutdown(&mut self) -> Result<()> {
947 if let Some(client) = &self.ws_client {
949 client.shutdown();
950 }
951
952 *self.connection_status.write() = ConnectionState::Disconnected;
954
955 self.subscriptions.write().clear();
957
958 Ok(())
959 }
960}
961
962#[async_trait]
966impl crate::provider::ext::MonitorExt for BinanceFuturesProvider {
967 async fn get_instrument_metrics(
968 &self,
969 instrument_id: &InstrumentId,
970 ) -> Result<crate::provider::ext::InstrumentMetrics> {
971 use crate::feeder::FeedStats;
972 use crate::provider::ext::{InstrumentMetrics, LatencyMetrics, OrderFlowMetrics};
973
974 let connection_stats = self.connection_stats();
975 let current_time = self.clock.raw();
976
977 let order_flow_metrics = OrderFlowMetrics {
979 volume_imbalance: 0.0, book_pressure: 0.0, trade_intensity: connection_stats.messages_received as f64 / 60.0, avg_trade_size: rust_decimal::Decimal::ZERO, price_volatility: 0.0, avg_spread_bps: 0.0, max_spread_bps: 0.0, market_impact_bps: 0.0, };
988
989 let latency_metrics = LatencyMetrics {
991 network_latency_ns: connection_stats.avg_latency_ns,
992 parsing_latency_ns: 50_000, processing_latency_ns: 25_000, total_latency_ns: connection_stats.avg_latency_ns + 75_000,
995 latency_jitter_ns: connection_stats.avg_latency_ns / 10, };
997
998 let health_score = if connection_stats.errors == 0 {
1000 1.0
1001 } else {
1002 (1.0 - (connection_stats.errors as f64
1003 / connection_stats.messages_received.max(1) as f64))
1004 .max(0.0)
1005 };
1006
1007 let mut feed_stats = FeedStats::default();
1009 feed_stats.messages_processed = connection_stats.messages_received;
1010 feed_stats.messages_per_second = 0.0; feed_stats.avg_process_latency_ns = connection_stats.avg_latency_ns;
1012 feed_stats.max_process_latency_ns = connection_stats.avg_latency_ns; feed_stats.p99_process_latency_ns = connection_stats.avg_latency_ns;
1014 feed_stats.dropped_messages = 0; feed_stats.last_update_time = current_time;
1016 feed_stats.memory_usage_bytes = std::mem::size_of::<Self>();
1017
1018 Ok(InstrumentMetrics {
1019 instrument_id: instrument_id.clone(),
1020 feed_stats,
1021 connection_stats,
1022 order_flow_metrics,
1023 latency_metrics,
1024 anomaly_score: 0.0, health_score,
1026 last_updated: current_time,
1027 })
1028 }
1029
1030 async fn get_aggregated_metrics(&self) -> Result<crate::provider::ext::AggregatedMetrics> {
1031 use crate::provider::ext::AggregatedMetrics;
1032
1033 let connection_stats = self.connection_stats();
1034 let subscriptions = self.subscriptions.read();
1035 let current_time = self.clock.raw();
1036
1037 Ok(AggregatedMetrics {
1038 total_instruments: subscriptions.len(),
1039 total_messages_processed: connection_stats.messages_received,
1040 total_dropped_messages: 0, avg_latency_ns: connection_stats.avg_latency_ns,
1042 max_latency_ns: connection_stats.avg_latency_ns,
1043 p99_latency_ns: connection_stats.avg_latency_ns, total_memory_usage_bytes: std::mem::size_of::<Self>(),
1045 total_errors: connection_stats.errors,
1046 overall_health_score: if connection_stats.errors == 0 {
1047 1.0
1048 } else {
1049 0.8
1050 },
1051 active_alerts: 0, timestamp: current_time,
1053 })
1054 }
1055
1056 async fn register_metrics_collector(
1057 &mut self,
1058 _collector: Box<dyn crate::provider::ext::MetricsCollector>,
1059 ) -> Result<()> {
1060 log::info!("Metrics collector registered for BinanceFuturesProvider");
1062 Ok(())
1063 }
1064
1065 async fn enable_alert(
1066 &mut self,
1067 alert_config: crate::provider::ext::AlertConfig,
1068 ) -> Result<crate::provider::ext::AlertHandle> {
1069 log::info!(
1071 "Alert enabled: {} with threshold {}",
1072 alert_config.name,
1073 alert_config.threshold
1074 );
1075 Ok(crate::provider::ext::AlertHandle(1)) }
1077
1078 async fn disable_alert(&mut self, _handle: crate::provider::ext::AlertHandle) -> Result<()> {
1079 log::info!("Alert disabled");
1081 Ok(())
1082 }
1083
1084 async fn get_health_status(&self) -> Result<crate::provider::ext::HealthStatus> {
1085 use crate::provider::ext::{ComponentHealth, HealthState, HealthStatus};
1086
1087 let connection_stats = self.connection_stats();
1088 let connected = self.is_connected();
1089 let current_time = self.clock.raw();
1090
1091 let overall_status = if connected {
1092 if connection_stats.errors == 0 {
1093 HealthState::Healthy
1094 } else {
1095 HealthState::Degraded
1096 }
1097 } else {
1098 HealthState::Offline
1099 };
1100
1101 let component_health = vec![
1102 ComponentHealth {
1103 component_name: "WebSocket Connection".to_string(),
1104 status: if connected {
1105 HealthState::Healthy
1106 } else {
1107 HealthState::Offline
1108 },
1109 last_heartbeat: current_time,
1110 error_rate: connection_stats.errors as f64
1111 / connection_stats.messages_received.max(1) as f64,
1112 latency_p99_ns: connection_stats.avg_latency_ns,
1113 },
1114 ComponentHealth {
1115 component_name: "Message Processing".to_string(),
1116 status: HealthState::Healthy, last_heartbeat: current_time,
1118 error_rate: 0.0, latency_p99_ns: connection_stats.avg_latency_ns,
1120 },
1121 ];
1122
1123 let performance_grade = match overall_status {
1124 HealthState::Healthy => 'A',
1125 HealthState::Degraded => 'B',
1126 HealthState::Critical => 'D',
1127 HealthState::Offline => 'F',
1128 };
1129
1130 Ok(HealthStatus {
1131 overall_status,
1132 component_health,
1133 active_issues: vec![], performance_grade,
1135 uptime_seconds: (current_time - connection_stats.connected_time) / 1_000_000_000, last_check_time: current_time,
1137 })
1138 }
1139
1140 async fn start_anomaly_detection(
1141 &mut self,
1142 _config: crate::provider::ext::AnomalyConfig,
1143 ) -> Result<()> {
1144 log::info!("Anomaly detection started for BinanceFuturesProvider");
1146 Ok(())
1147 }
1148
1149 async fn stop_anomaly_detection(&mut self) -> Result<()> {
1150 log::info!("Anomaly detection stopped for BinanceFuturesProvider");
1152 Ok(())
1153 }
1154
1155 async fn get_latency_distribution(
1156 &self,
1157 _instrument_id: &InstrumentId,
1158 ) -> Result<crate::provider::ext::LatencyDistribution> {
1159 use crate::provider::ext::{LatencyBucket, LatencyDistribution};
1160
1161 let connection_stats = self.connection_stats();
1162
1163 Ok(LatencyDistribution {
1165 p50_ns: connection_stats.avg_latency_ns,
1166 p90_ns: connection_stats.avg_latency_ns + (connection_stats.avg_latency_ns / 4),
1167 p95_ns: connection_stats.avg_latency_ns + (connection_stats.avg_latency_ns / 2),
1168 p99_ns: connection_stats.avg_latency_ns,
1169 p99_9_ns: connection_stats.avg_latency_ns + (connection_stats.avg_latency_ns / 10),
1170 max_ns: connection_stats.avg_latency_ns,
1171 histogram: vec![
1172 LatencyBucket {
1173 min_ns: 0,
1174 max_ns: 100_000,
1175 count: 10,
1176 },
1177 LatencyBucket {
1178 min_ns: 100_000,
1179 max_ns: 500_000,
1180 count: 50,
1181 },
1182 LatencyBucket {
1183 min_ns: 500_000,
1184 max_ns: 1_000_000,
1185 count: 30,
1186 },
1187 LatencyBucket {
1188 min_ns: 1_000_000,
1189 max_ns: 5_000_000,
1190 count: 10,
1191 },
1192 ],
1193 })
1194 }
1195
1196 async fn export_prometheus_metrics(&self) -> Result<std::string::String> {
1197 let connection_stats = self.connection_stats();
1198 let subscriptions = self.subscriptions.read();
1199 let provider_name = self.name();
1200
1201 let prometheus_metrics = format!(
1202 "# HELP binance_futures_messages_received_total Total messages received\n\
1203 # TYPE binance_futures_messages_received_total counter\n\
1204 binance_futures_messages_received_total{{provider=\"{provider_name}\"}} {}\n\
1205 # HELP binance_futures_messages_dropped_total Total messages dropped\n\
1206 # TYPE binance_futures_messages_dropped_total counter\n\
1207 binance_futures_messages_dropped_total{{provider=\"{provider_name}\"}} {}\n\
1208 # HELP binance_futures_latency_avg_ns Average latency in nanoseconds\n\
1209 # TYPE binance_futures_latency_avg_ns gauge\n\
1210 binance_futures_latency_avg_ns{{provider=\"{provider_name}\"}} {}\n\
1211 # HELP binance_futures_active_subscriptions Number of active subscriptions\n\
1212 # TYPE binance_futures_active_subscriptions gauge\n\
1213 binance_futures_active_subscriptions{{provider=\"{provider_name}\"}} {}\n",
1214 connection_stats.messages_received,
1215 0, connection_stats.avg_latency_ns,
1217 subscriptions.len()
1218 );
1219
1220 Ok(prometheus_metrics)
1221 }
1222
1223 async fn reset_statistics(&mut self) -> Result<()> {
1224 let mut stats = self.stats.write();
1226 *stats = ConnectionStats::default();
1227 log::info!("Statistics reset for BinanceFuturesProvider");
1228 Ok(())
1229 }
1230
1231 async fn get_trading_metrics(
1232 &self,
1233 _instrument_id: &InstrumentId,
1234 ) -> Result<crate::provider::ext::TradingMetrics> {
1235 use crate::provider::ext::{
1236 ExecutionQualityMetrics, MicrostructureSignals, TradingMetrics,
1237 };
1238
1239 Ok(TradingMetrics {
1241 order_flow_imbalance: 0.0,
1242 vwap: rust_decimal::Decimal::ZERO,
1243 twap: rust_decimal::Decimal::ZERO,
1244 microstructure_signals: MicrostructureSignals {
1245 book_imbalance: 0.0,
1246 trade_sign_accuracy: 0.5,
1247 alpha_signal: 0.0,
1248 momentum: 0.0,
1249 mean_reversion: 0.0,
1250 },
1251 execution_quality: ExecutionQualityMetrics {
1252 slippage_bps: 0.0,
1253 market_impact_bps: 0.0,
1254 fill_rate: 1.0,
1255 avg_fill_time_ns: 1_000_000,
1256 implementation_shortfall_bps: 0.0,
1257 },
1258 })
1259 }
1260}