rusty_feeder/exchange/binance/futures/
provider.rs

1//! Binance Futures market provider implementation using consolidated WebSocket
2//!
3//! This module implements the Provider trait for Binance Futures markets,
4//! enabling high-performance data acquisition and normalization.
5
6use rusty_common::collections::FxHashMap;
7use simd_json::prelude::{ValueAsArray, ValueAsObject, ValueAsScalar, ValueObjectAccess};
8use smartstring::alias::String;
9use std::fmt::Debug;
10use std::sync::Arc;
11
12use anyhow::{Result, anyhow};
13use async_trait::async_trait;
14use parking_lot::RwLock;
15use quanta::Clock;
16use rusty_common::json::{self};
17use rusty_model::{
18    data::{book_snapshot::OrderBookSnapshot, market_trade::MarketTrade},
19    instruments::{Instrument, InstrumentId},
20    venues::Venue,
21};
22use smallvec::{SmallVec, smallvec};
23use tokio::sync::{mpsc, watch};
24
25// Use the new consolidated WebSocket module
26use rusty_common::websocket::exchanges::binance;
27use rusty_common::websocket::{
28    Message, MessageHandler, WebSocketClient, WebSocketConfig, WebSocketError, WebSocketResult,
29};
30
31use crate::exchange::binance::common::messages::SubscriptionResult;
32use crate::provider::ext::DataHandler;
33use crate::provider::prelude::*;
34use rust_decimal::prelude::FromStr;
35use rusty_model::data::simd_orderbook::SharedSimdOrderBook;
36
37use super::data::{
38    orderbook::OrderbookMessage,
39    subscription::{create_orderbook_subscription, create_trade_subscription},
40    trade::AggTradeMessage,
41};
42use super::types::{
43    BINANCE_COIN_FUTURES_API_URL, BINANCE_COIN_FUTURES_WS_COMBINED_URL,
44    BINANCE_COIN_FUTURES_WS_URL, BINANCE_USD_FUTURES_API_URL, BINANCE_USD_FUTURES_WS_COMBINED_URL,
45    BINANCE_USD_FUTURES_WS_URL,
46};
47
48/// Futures market type (USD-M or COIN-M)
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum BinanceFuturesMarketType {
51    /// USD-M Futures (settled in USDT or BUSD)
52    UsdM,
53
54    /// COIN-M Futures (settled in cryptocurrency)
55    CoinM,
56}
57
58/// Binance Futures exchange provider implementation
59pub struct BinanceFuturesProvider {
60    /// Connection configuration
61    config: ConnectionConfig,
62
63    /// Market type (USD-M or COIN-M)
64    market_type: BinanceFuturesMarketType,
65
66    /// WebSocket connection status
67    connection_status: Arc<RwLock<ConnectionState>>,
68
69    /// Connection statistics
70    stats: Arc<RwLock<ConnectionStats>>,
71
72    /// Shared clock for time synchronization
73    clock: Clock,
74
75    /// Active subscriptions
76    subscriptions: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
77
78    /// Message sender for internal communication
79    message_tx: mpsc::Sender<ProviderMessage>,
80
81    /// Message receiver
82    message_rx: Arc<tokio::sync::RwLock<mpsc::Receiver<ProviderMessage>>>,
83
84    /// WebSocket client
85    ws_client: Option<WebSocketClient>,
86
87    /// Data handler
88    data_handler: Arc<tokio::sync::RwLock<Option<Box<dyn DataHandler + Send + Sync>>>>,
89
90    /// HTTP client for REST API calls
91    http_client: reqwest::Client,
92}
93
94impl Debug for BinanceFuturesProvider {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        f.debug_struct("BinanceFuturesProvider")
97            .field("market_type", &self.market_type)
98            .field("connection_status", &self.connection_status)
99            .field("stats", &self.stats)
100            .field("ws_client", &self.ws_client.is_some())
101            .field("data_handler_present", &true) // Can't check async in Debug
102            .finish()
103    }
104}
105
106/// Message handler for Binance Futures WebSocket
107struct BinanceFuturesHandler {
108    data_handler: Arc<tokio::sync::RwLock<Option<Box<dyn DataHandler + Send + Sync>>>>,
109    clock: Clock,
110    stats: Arc<RwLock<ConnectionStats>>,
111    message_tx: mpsc::Sender<ProviderMessage>,
112}
113
114#[async_trait]
115impl MessageHandler for BinanceFuturesHandler {
116    async fn on_connected(&mut self) -> WebSocketResult<()> {
117        log::info!("Binance Futures WebSocket connected");
118        let _ = self.message_tx.send(ProviderMessage::Connected).await;
119        Ok(())
120    }
121
122    async fn on_message(&mut self, message: Message) -> WebSocketResult<()> {
123        match message {
124            Message::Text(text) => {
125                // Parse the message
126                // Parse the message safely
127                match unsafe { simd_json::from_str::<json::Value>(&mut text.clone()) } {
128                    Ok(value) => {
129                        if let Err(e) = self.handle_market_data(&value).await {
130                            log::error!("Failed to handle market data: {e}");
131                        }
132                    }
133                    Err(e) => {
134                        log::error!("Failed to parse WebSocket message: {e}");
135                    }
136                }
137            }
138            Message::Binary(_) => {
139                log::warn!("Received unexpected binary message from Binance");
140            }
141            _ => {}
142        }
143        Ok(())
144    }
145
146    async fn on_error(&mut self, error: WebSocketError) -> WebSocketResult<()> {
147        log::error!("Binance Futures WebSocket error: {}", error);
148        let _ = self
149            .message_tx
150            .send(ProviderMessage::Error(error.to_string().into()))
151            .await;
152        Ok(())
153    }
154
155    async fn on_disconnected(&mut self) -> WebSocketResult<()> {
156        log::info!("Binance Futures WebSocket disconnected");
157        let _ = self.message_tx.send(ProviderMessage::Disconnected).await;
158        Ok(())
159    }
160}
161
162impl BinanceFuturesHandler {
163    async fn handle_market_data(&self, value: &json::Value) -> Result<()> {
164        // Try to parse as different message types
165        // First check if it's an orderbook message
166        if let Ok(msg) = json::from_value::<OrderbookMessage>(value.clone()) {
167            return self.handle_orderbook_update(msg).await;
168        }
169
170        // Check if it's an aggregated trade message
171        if let Ok(msg) = json::from_value::<AggTradeMessage>(value.clone()) {
172            return self.handle_agg_trade(msg).await;
173        }
174
175        // Check if it's a subscription confirmation
176        if let Ok(msg) = json::from_value::<SubscriptionResult>(value.clone()) {
177            log::debug!("Received subscription confirmation: {msg:?}");
178            return Ok(());
179        }
180
181        log::debug!("Unhandled message type");
182        Ok(())
183    }
184
185    async fn handle_orderbook_update(&self, msg: OrderbookMessage) -> Result<()> {
186        // Update stats
187        self.stats.write().messages_received += 1;
188
189        // Forward to data handler if available
190        if let Some(handler) = &*self.data_handler.read().await {
191            // Convert to OrderBookSnapshot
192            let snapshot = self.convert_to_snapshot(msg)?;
193            handler.handle_orderbook_snapshot(snapshot).await?;
194        }
195
196        Ok(())
197    }
198
199    async fn handle_trade(&self, msg: MarketTrade) -> Result<()> {
200        // Update stats
201        self.stats.write().messages_received += 1;
202
203        // Forward to data handler
204        if let Some(handler) = &*self.data_handler.read().await {
205            handler.handle_market_trade(msg).await?;
206        }
207
208        Ok(())
209    }
210
211    async fn handle_agg_trade(&self, msg: AggTradeMessage) -> Result<()> {
212        // Update stats
213        self.stats.write().messages_received += 1;
214
215        // Convert to MarketTrade and forward
216        if let Some(handler) = &*self.data_handler.read().await {
217            let trade = self.convert_agg_trade(msg)?;
218            handler.handle_market_trade(trade).await?;
219        }
220
221        Ok(())
222    }
223
224    fn convert_to_snapshot(&self, msg: OrderbookMessage) -> Result<OrderBookSnapshot> {
225        use rust_decimal::prelude::FromStr;
226        use rusty_model::data::orderbook::PriceLevel;
227
228        // Get system timestamp
229        let timestamp_init = self.clock.raw();
230
231        // Create instrument ID from symbol
232        let instrument_id = InstrumentId::new(&msg.symbol, Venue::Binance);
233
234        // Convert bids
235        let mut bids = SmallVec::<[PriceLevel; 64]>::new();
236        for bid in msg.bids {
237            if bid.len() >= 2 {
238                let price = rust_decimal::Decimal::from_str(&bid[0])
239                    .map_err(|e| anyhow!("Failed to parse bid price: {}", e))?;
240                let quantity = rust_decimal::Decimal::from_str(&bid[1])
241                    .map_err(|e| anyhow!("Failed to parse bid quantity: {}", e))?;
242
243                bids.push(PriceLevel { price, quantity });
244            }
245        }
246
247        // Convert asks
248        let mut asks = SmallVec::<[PriceLevel; 64]>::new();
249        for ask in msg.asks {
250            if ask.len() >= 2 {
251                let price = rust_decimal::Decimal::from_str(&ask[0])
252                    .map_err(|e| anyhow!("Failed to parse ask price: {}", e))?;
253                let quantity = rust_decimal::Decimal::from_str(&ask[1])
254                    .map_err(|e| anyhow!("Failed to parse ask quantity: {}", e))?;
255
256                asks.push(PriceLevel { price, quantity });
257            }
258        }
259
260        // Create snapshot
261        let snapshot = OrderBookSnapshot::new(
262            instrument_id,
263            bids,
264            asks,
265            msg.final_update_id,
266            msg.event_time * 1_000_000, // Convert ms to ns
267            timestamp_init,
268        );
269
270        Ok(snapshot)
271    }
272
273    fn convert_agg_trade(&self, msg: AggTradeMessage) -> Result<MarketTrade> {
274        use rust_decimal::prelude::FromStr;
275        use rusty_model::enums::OrderSide;
276
277        // Parse price and quantity
278        let price = rust_decimal::Decimal::from_str(&msg.price)
279            .map_err(|e| anyhow!("Failed to parse trade price: {}", e))?;
280        let quantity = rust_decimal::Decimal::from_str(&msg.quantity)
281            .map_err(|e| anyhow!("Failed to parse trade quantity: {}", e))?;
282
283        // Determine trade side
284        // In Binance, if buyer is market maker, the actual aggressor is seller (taker)
285        let direction = if msg.is_buyer_market_maker {
286            OrderSide::Sell // Seller is aggressor
287        } else {
288            OrderSide::Buy // Buyer is aggressor
289        };
290
291        // Create instrument ID
292        let instrument_id = InstrumentId::new(&msg.symbol, Venue::Binance);
293
294        // Create market trade
295        let trade = MarketTrade {
296            timestamp: self.clock.now(),
297            exchange_time_ns: msg.trade_time * 1_000_000, // Convert ms to ns
298            price,
299            quantity,
300            direction,
301            instrument_id,
302        };
303
304        Ok(trade)
305    }
306}
307
308/// Internal provider messages
309enum ProviderMessage {
310    Connected,
311    Disconnected,
312    Error(String),
313    Subscribe(String),
314    Unsubscribe(String),
315}
316
317impl HttpClientProvider for BinanceFuturesProvider {
318    fn http_client(&self) -> &reqwest::Client {
319        &self.http_client
320    }
321}
322
323impl BinanceFuturesProvider {
324    /// Create a new Binance Futures provider
325    #[must_use]
326    pub fn new(config: ConnectionConfig, market_type: BinanceFuturesMarketType) -> Self {
327        let (message_tx, message_rx) = mpsc::channel(1000);
328
329        // Create HTTP client before moving config
330        let http_client = Self::create_http_client(&config);
331
332        Self {
333            config,
334            market_type,
335            connection_status: Arc::new(RwLock::new(ConnectionState::Disconnected)),
336            stats: Arc::new(RwLock::new(ConnectionStats::default())),
337            clock: Clock::new(),
338            subscriptions: Arc::new(RwLock::new(FxHashMap::default())),
339            message_tx,
340            message_rx: Arc::new(tokio::sync::RwLock::new(message_rx)),
341            ws_client: None,
342            data_handler: Arc::new(tokio::sync::RwLock::new(None)),
343            http_client,
344        }
345    }
346
347    /// Get the WebSocket URL based on market type
348    const fn get_ws_url(&self, use_combined: bool) -> &'static str {
349        match (self.market_type, use_combined) {
350            (BinanceFuturesMarketType::UsdM, true) => BINANCE_USD_FUTURES_WS_COMBINED_URL,
351            (BinanceFuturesMarketType::UsdM, false) => BINANCE_USD_FUTURES_WS_URL,
352            (BinanceFuturesMarketType::CoinM, true) => BINANCE_COIN_FUTURES_WS_COMBINED_URL,
353            (BinanceFuturesMarketType::CoinM, false) => BINANCE_COIN_FUTURES_WS_URL,
354        }
355    }
356
357    /// Create WebSocket configuration
358    fn create_ws_config(&self) -> WebSocketConfig {
359        binance::default_config(self.get_ws_url(true).to_string())
360    }
361}
362
363impl BinanceFuturesProvider {
364    /// Get provider name
365    pub const fn name(&self) -> &str {
366        match self.market_type {
367            BinanceFuturesMarketType::UsdM => "binance_futures_usdm",
368            BinanceFuturesMarketType::CoinM => "binance_futures_coinm",
369        }
370    }
371
372    /// Get venue
373    pub const fn venue(&self) -> Venue {
374        Venue::Binance
375    }
376
377    /// Connect to WebSocket
378    pub async fn connect(&mut self) -> Result<()> {
379        // Update connection state
380        *self.connection_status.write() = ConnectionState::Connecting;
381
382        // Create WebSocket client
383        let config = self.create_ws_config();
384        let client = WebSocketClient::new(config);
385
386        // Create handler
387        let handler = BinanceFuturesHandler {
388            data_handler: self.data_handler.clone(),
389            clock: self.clock.clone(),
390            stats: self.stats.clone(),
391            message_tx: self.message_tx.clone(),
392        };
393
394        // Store client reference
395        self.ws_client = Some(client);
396
397        // Run the client (this will handle the WebSocket connection)
398        // Note: In a real implementation, you'd want to spawn this in a separate task
399        // or handle it differently based on your architecture
400
401        // Update connection state
402        *self.connection_status.write() = ConnectionState::Connected;
403
404        Ok(())
405    }
406
407    /// Disconnect from WebSocket
408    pub async fn disconnect(&mut self) -> Result<()> {
409        if let Some(client) = &self.ws_client {
410            client.shutdown();
411        }
412
413        *self.connection_status.write() = ConnectionState::Disconnected;
414        self.ws_client = None;
415
416        Ok(())
417    }
418
419    /// Subscribe to orderbook updates
420    pub async fn subscribe_orderbook(
421        &mut self,
422        instrument_id: &InstrumentId,
423        depth: usize,
424    ) -> Result<()> {
425        let symbol = instrument_id.symbol.to_lowercase();
426        let stream = format!("{symbol}@depth{depth}@100ms");
427
428        // Create subscription message
429        let mut symbols: SmallVec<[String; 8]> = SmallVec::<[String; 8]>::new();
430        symbols.push(symbol.into());
431        let sub_msg =
432            create_orderbook_subscription("orderbook", symbols, Some(depth), Some(1), None, false);
433
434        // Send subscription
435        if let Some(client) = &self.ws_client {
436            let message = simd_json::to_string(&sub_msg)
437                .map_err(|e| anyhow!("Failed to serialize subscription: {}", e))?;
438
439            client
440                .send(Message::Text(message.into()))
441                .await
442                .map_err(|e| anyhow!("Failed to send subscription: {:?}", e))?;
443
444            log::info!("Subscribed to orderbook: {stream}");
445        } else {
446            return Err(anyhow!("WebSocket client not connected"));
447        }
448
449        Ok(())
450    }
451
452    /// Subscribe to trade updates
453    pub async fn subscribe_trades(&mut self, instrument_id: &InstrumentId) -> Result<()> {
454        let symbol = instrument_id.symbol.to_lowercase();
455        let stream = format!("{symbol}@aggTrade");
456
457        // Create subscription message
458        let mut symbols: SmallVec<[String; 8]> = SmallVec::<[String; 8]>::new();
459        symbols.push(symbol.into());
460        let sub_msg = create_trade_subscription("trades", symbols, Some(1), None, false);
461
462        // Send subscription
463        if let Some(client) = &self.ws_client {
464            let message = simd_json::to_string(&sub_msg)
465                .map_err(|e| anyhow!("Failed to serialize subscription: {}", e))?;
466
467            client
468                .send(Message::Text(message.into()))
469                .await
470                .map_err(|e| anyhow!("Failed to send subscription: {:?}", e))?;
471
472            log::info!("Subscribed to trades: {stream}");
473        } else {
474            return Err(anyhow!("WebSocket client not connected"));
475        }
476
477        Ok(())
478    }
479
480    /// Unsubscribe from orderbook updates
481    pub async fn unsubscribe_orderbook(&mut self, instrument_id: &InstrumentId) -> Result<()> {
482        let symbol = instrument_id.symbol.to_lowercase();
483
484        // Remove from subscriptions
485        self.subscriptions.write().remove(symbol.as_str());
486
487        // Send unsubscribe message
488        if let Some(client) = &self.ws_client {
489            use super::data::subscription::WebSocketSubscribeRequest;
490
491            // Get the depth from stored subscription info (default to 10)
492            let depth = 10; // You might want to store this in subscriptions map
493
494            let unsub_msg = WebSocketSubscribeRequest {
495                id: 2,
496                method: "UNSUBSCRIBE".into(),
497                params: smallvec![format!("{symbol}@depth{depth}").into()],
498            };
499
500            let message = simd_json::to_string(&unsub_msg)
501                .map_err(|e| anyhow!("Failed to serialize unsubscription: {}", e))?;
502
503            client
504                .send(Message::Text(message.into()))
505                .await
506                .map_err(|e| anyhow!("Failed to send unsubscription: {:?}", e))?;
507
508            log::info!("Unsubscribed from orderbook: {symbol}");
509        }
510
511        Ok(())
512    }
513
514    /// Unsubscribe from trade updates
515    pub async fn unsubscribe_trades(&mut self, instrument_id: &InstrumentId) -> Result<()> {
516        let symbol = instrument_id.symbol.to_lowercase();
517
518        // Remove from subscriptions
519        self.subscriptions.write().remove(symbol.as_str());
520
521        // Send unsubscribe message
522        if let Some(client) = &self.ws_client {
523            use super::data::subscription::WebSocketSubscribeRequest;
524
525            let unsub_msg = WebSocketSubscribeRequest {
526                id: 2,
527                method: "UNSUBSCRIBE".into(),
528                params: smallvec![format!("{symbol}@aggTrade").into()],
529            };
530
531            let message = simd_json::to_string(&unsub_msg)
532                .map_err(|e| anyhow!("Failed to serialize unsubscription: {}", e))?;
533
534            client
535                .send(Message::Text(message.into()))
536                .await
537                .map_err(|e| anyhow!("Failed to send unsubscription: {:?}", e))?;
538
539            log::info!("Unsubscribed from trades: {symbol}");
540        }
541
542        Ok(())
543    }
544
545    /// Check if connected
546    pub fn is_connected(&self) -> bool {
547        matches!(*self.connection_status.read(), ConnectionState::Connected)
548    }
549
550    /// Get connection statistics
551    pub fn connection_stats(&self) -> ConnectionStats {
552        self.stats.read().clone()
553    }
554
555    /// Run with data handler
556    pub async fn run_with_handler(
557        &mut self,
558        handler: Box<dyn DataHandler + Send + Sync>,
559    ) -> Result<()> {
560        // Store the handler
561        *self.data_handler.write().await = Some(handler);
562
563        // Connect if not already connected
564        if !self.is_connected() {
565            self.connect().await?;
566        }
567
568        // Process internal messages
569        let mut rx = self.message_rx.write().await;
570        while let Some(msg) = rx.recv().await {
571            match msg {
572                ProviderMessage::Connected => {
573                    log::info!("Provider connected");
574                }
575                ProviderMessage::Disconnected => {
576                    log::warn!("Provider disconnected");
577                    // Attempt reconnection is handled by WebSocketClient
578                }
579                ProviderMessage::Error(e) => {
580                    log::error!("Provider error: {e}");
581                }
582                _ => {}
583            }
584        }
585
586        Ok(())
587    }
588
589    /// Fetch available instruments
590    pub async fn fetch_instruments(&self) -> Result<Vec<Box<dyn Instrument>>> {
591        use reqwest::Client;
592        use rusty_model::instruments::SpotInstrument;
593
594        // Get API URL based on market type
595        let api_url = match self.market_type {
596            BinanceFuturesMarketType::UsdM => BINANCE_USD_FUTURES_API_URL,
597            BinanceFuturesMarketType::CoinM => BINANCE_COIN_FUTURES_API_URL,
598        };
599
600        // Create HTTP client
601        let client = Client::new();
602
603        // Fetch exchange info
604        let url = format!("{api_url}/fapi/v1/exchangeInfo");
605        let response = client
606            .get(&url)
607            .send()
608            .await
609            .map_err(|e| anyhow!("Failed to fetch exchange info: {}", e))?;
610
611        // Check response status
612        if !response.status().is_success() {
613            let status = response.status();
614            let body = response.text().await.unwrap_or_default();
615            return Err(anyhow!(
616                "Failed to fetch instruments: {} - {}",
617                status,
618                body
619            ));
620        }
621
622        // Parse JSON response
623        let body = response
624            .text()
625            .await
626            .map_err(|e| anyhow!("Failed to read response body: {}", e))?;
627        let data: json::Value = unsafe { simd_json::from_str(&mut body.clone()) }
628            .map_err(|e| anyhow!("Failed to parse exchange info: {}", e))?;
629
630        // Extract instruments
631        let mut instruments = Vec::new();
632
633        if let Some(obj) = data.as_object()
634            && let Some(symbols) = obj.get("symbols").and_then(|v| v.as_array())
635        {
636            for symbol_data in symbols {
637                // Check if it's an object first
638                if let Some(symbol_obj) = symbol_data.as_object() {
639                    // Extract symbol info
640                    let symbol = symbol_obj
641                        .get("symbol")
642                        .and_then(|v| v.as_str())
643                        .ok_or_else(|| anyhow!("Missing symbol field"))?;
644
645                    let status = symbol_obj
646                        .get("status")
647                        .and_then(|v| v.as_str())
648                        .unwrap_or("");
649
650                    // Only include active trading symbols
651                    if status != "TRADING" {
652                        continue;
653                    }
654
655                    let base_asset = symbol_obj
656                        .get("baseAsset")
657                        .and_then(|v| v.as_str())
658                        .ok_or_else(|| anyhow!("Missing baseAsset field"))?;
659
660                    let quote_asset = symbol_obj
661                        .get("quoteAsset")
662                        .and_then(|v| v.as_str())
663                        .ok_or_else(|| anyhow!("Missing quoteAsset field"))?;
664
665                    // Create instrument (using SpotInstrument for now)
666                    // TODO: Create proper FuturesInstrument type with contract details
667                    let instrument =
668                        SpotInstrument::new(symbol, base_asset, quote_asset, Venue::Binance);
669
670                    instruments.push(Box::new(instrument) as Box<dyn Instrument>);
671                }
672            }
673        }
674
675        log::info!(
676            "Fetched {} instruments from Binance Futures",
677            instruments.len()
678        );
679        Ok(instruments)
680    }
681}
682
683/// Provider trait implementation for Binance Futures
684#[async_trait]
685impl Provider for BinanceFuturesProvider {
686    type TradeMessage = AggTradeMessage;
687    type DepthMessage = OrderbookMessage;
688    type InstrumentMessage = json::Value;
689
690    fn name(&self) -> &'static str {
691        match self.market_type {
692            BinanceFuturesMarketType::UsdM => "BinanceFuturesUSDM",
693            BinanceFuturesMarketType::CoinM => "BinanceFuturesCOINM",
694        }
695    }
696
697    fn venue(&self) -> Venue {
698        Venue::Binance
699    }
700
701    fn config(&self) -> &ConnectionConfig {
702        &self.config
703    }
704
705    async fn init(&mut self) -> Result<()> {
706        // Initialize WebSocket client using consolidated implementation
707        let ws_url = match self.market_type {
708            BinanceFuturesMarketType::UsdM => BINANCE_USD_FUTURES_WS_COMBINED_URL.to_string(),
709            BinanceFuturesMarketType::CoinM => BINANCE_COIN_FUTURES_WS_COMBINED_URL.to_string(),
710        };
711
712        let ws_config = binance::default_config(ws_url);
713        self.ws_client = Some(WebSocketClient::new(ws_config));
714
715        // Update connection status
716        *self.connection_status.write() = ConnectionState::Connected;
717        Ok(())
718    }
719
720    async fn subscribe_trades(
721        &self,
722        symbols: SmallVec<[String; 8]>,
723    ) -> Result<mpsc::Receiver<Self::TradeMessage>> {
724        let (tx, rx) = mpsc::channel(1024);
725
726        // Create subscription message for multiple symbols
727        let sub_msg = create_trade_subscription("trades", symbols.clone(), Some(1), None, false);
728
729        if let Some(client) = &self.ws_client {
730            let message = simd_json::to_string(&sub_msg)
731                .map_err(|e| anyhow!("Failed to serialize subscription: {}", e))?;
732
733            client
734                .send(Message::Text(message.into()))
735                .await
736                .map_err(|e| anyhow!("Failed to send subscription: {:?}", e))?;
737
738            log::info!("Subscribed to trades for symbols: {symbols:?}");
739        } else {
740            return Err(anyhow!("WebSocket client not initialized"));
741        }
742
743        Ok(rx)
744    }
745
746    async fn unsubscribe_trades(&self) -> Result<()> {
747        // TODO: Implement unsubscribe logic
748        Ok(())
749    }
750
751    async fn subscribe_orderbook(
752        &self,
753        symbols: SmallVec<[String; 8]>,
754    ) -> Result<mpsc::Receiver<Self::DepthMessage>> {
755        let (tx, rx) = mpsc::channel(1024);
756
757        // Create subscription message for multiple symbols with depth
758        let sub_msg = create_orderbook_subscription(
759            "orderbook",
760            symbols.clone(),
761            Some(20), // Default depth
762            Some(1),
763            None,
764            false,
765        );
766
767        if let Some(client) = &self.ws_client {
768            let message = simd_json::to_string(&sub_msg)
769                .map_err(|e| anyhow!("Failed to serialize subscription: {}", e))?;
770
771            client
772                .send(Message::Text(message.into()))
773                .await
774                .map_err(|e| anyhow!("Failed to send subscription: {:?}", e))?;
775
776            log::info!("Subscribed to orderbook for symbols: {symbols:?}");
777        } else {
778            return Err(anyhow!("WebSocket client not initialized"));
779        }
780
781        Ok(rx)
782    }
783
784    async fn unsubscribe_orderbook(&self) -> Result<()> {
785        // TODO: Implement unsubscribe logic
786        Ok(())
787    }
788
789    async fn get_instruments(&self) -> Result<Vec<Box<dyn Instrument>>> {
790        // Use the existing fetch_instruments method
791        self.fetch_instruments().await
792    }
793
794    async fn get_historical_trades(
795        &self,
796        _symbol: &str,
797        _limit: Option<u32>,
798    ) -> Result<Vec<MarketTrade>> {
799        // TODO: Implement historical trades fetching
800        Err(anyhow!("Historical trades not implemented yet"))
801    }
802
803    async fn get_orderbook_snapshot(
804        &self,
805        symbol: &str,
806        depth: Option<u32>,
807    ) -> Result<OrderBookSnapshot> {
808        // Fetch orderbook snapshot from REST API
809        let limit = depth.unwrap_or(20).min(5000);
810        let url = match self.market_type {
811            BinanceFuturesMarketType::UsdM => {
812                format!("{BINANCE_USD_FUTURES_API_URL}/fapi/v1/depth?symbol={symbol}&limit={limit}")
813            }
814            BinanceFuturesMarketType::CoinM => {
815                format!(
816                    "{BINANCE_COIN_FUTURES_API_URL}/dapi/v1/depth?symbol={symbol}&limit={limit}"
817                )
818            }
819        };
820
821        let response = self.http_client.get(&url).send().await?;
822
823        // Check for rate limit errors
824        use crate::exchange::binance::common::rate_limit::{
825            handle_rate_limit_error, parse_order_count_headers,
826        };
827        if response.status() == 429 || response.status() == 418 {
828            let headers = response.headers().clone();
829            handle_rate_limit_error(response.status().as_u16(), &headers)?;
830        }
831
832        // Parse order count headers for monitoring
833        let order_counts = parse_order_count_headers(response.headers());
834        if !order_counts.is_empty() {
835            log::debug!("Binance order count limits: {order_counts:?}");
836        }
837
838        let body = response.text().await?;
839        let data: json::Value = unsafe { simd_json::from_str(&mut body.clone()) }?;
840
841        // Parse the response
842        let instrument_id = InstrumentId::new(symbol, Venue::Binance);
843        let timestamp = self.clock.raw();
844        let last_update_id = data
845            .get("lastUpdateId")
846            .and_then(|v| v.as_u64())
847            .unwrap_or(0);
848
849        let mut snapshot = OrderBookSnapshot::new_empty(instrument_id, timestamp, last_update_id);
850
851        // Parse bids
852        if let Some(bids) = data.get("bids").and_then(|v| v.as_array()) {
853            for bid in bids {
854                if let Some(arr) = bid.as_array()
855                    && arr.len() >= 2
856                    && let (Some(price_str), Some(qty_str)) = (arr[0].as_str(), arr[1].as_str())
857                    && let (Ok(price), Ok(quantity)) = (
858                        rust_decimal::Decimal::from_str(price_str),
859                        rust_decimal::Decimal::from_str(qty_str),
860                    )
861                {
862                    snapshot.add_bid(price, quantity);
863                }
864            }
865        }
866
867        // Parse asks
868        if let Some(asks) = data.get("asks").and_then(|v| v.as_array()) {
869            for ask in asks {
870                if let Some(arr) = ask.as_array()
871                    && arr.len() >= 2
872                    && let (Some(price_str), Some(qty_str)) = (arr[0].as_str(), arr[1].as_str())
873                    && let (Ok(price), Ok(quantity)) = (
874                        rust_decimal::Decimal::from_str(price_str),
875                        rust_decimal::Decimal::from_str(qty_str),
876                    )
877                {
878                    snapshot.add_ask(price, quantity);
879                }
880            }
881        }
882
883        Ok(snapshot)
884    }
885
886    async fn get_realtime_orderbook(&self, _symbol: &str) -> Result<SharedSimdOrderBook> {
887        // TODO: Implement realtime orderbook access
888        Err(anyhow!("Realtime orderbook not implemented yet"))
889    }
890
891    async fn is_connected(&self) -> bool {
892        *self.connection_status.read() == ConnectionState::Connected
893    }
894
895    async fn connection_status(&self) -> ConnectionState {
896        *self.connection_status.read()
897    }
898
899    async fn get_stats(&self) -> ConnectionStats {
900        self.stats.read().clone()
901    }
902
903    async fn ping(&self) -> Result<u64> {
904        if let Some(client) = &self.ws_client {
905            let start = std::time::Instant::now();
906
907            // Send ping frame
908            client.send(Message::Ping(vec![])).await?;
909
910            // Calculate round-trip time
911            let elapsed = start.elapsed();
912            Ok(elapsed.as_millis() as u64)
913        } else {
914            Err(anyhow!("WebSocket client not initialized"))
915        }
916    }
917
918    async fn reset_connection(&self) -> Result<()> {
919        // Reset connection status
920        *self.connection_status.write() = ConnectionState::Disconnected;
921
922        // Reset stats
923        *self.stats.write() = ConnectionStats::default();
924
925        // TODO: Implement proper reconnection logic
926        Ok(())
927    }
928
929    fn get_rate_limits(&self) -> Vec<RateLimit> {
930        vec![
931            RateLimit {
932                limit_type: "REQUEST_WEIGHT",
933                interval: "MINUTE",
934                interval_num: 1,
935                limit: 1200,
936            },
937            RateLimit {
938                limit_type: "ORDERS",
939                interval: "SECOND",
940                interval_num: 10,
941                limit: 300,
942            },
943        ]
944    }
945
946    async fn shutdown(&mut self) -> Result<()> {
947        // Shutdown WebSocket client
948        if let Some(client) = &self.ws_client {
949            client.shutdown();
950        }
951
952        // Update connection status
953        *self.connection_status.write() = ConnectionState::Disconnected;
954
955        // Clear subscriptions
956        self.subscriptions.write().clear();
957
958        Ok(())
959    }
960}
961
962/// MonitorExt implementation for BinanceFuturesProvider
963///
964/// Provides advanced monitoring capabilities for HFT trading systems
965#[async_trait]
966impl crate::provider::ext::MonitorExt for BinanceFuturesProvider {
967    async fn get_instrument_metrics(
968        &self,
969        instrument_id: &InstrumentId,
970    ) -> Result<crate::provider::ext::InstrumentMetrics> {
971        use crate::feeder::FeedStats;
972        use crate::provider::ext::{InstrumentMetrics, LatencyMetrics, OrderFlowMetrics};
973
974        let connection_stats = self.connection_stats();
975        let current_time = self.clock.raw();
976
977        // Calculate order flow metrics based on recent activity
978        let order_flow_metrics = OrderFlowMetrics {
979            volume_imbalance: 0.0, // Would be calculated from recent trades
980            book_pressure: 0.0,    // Would be calculated from order book data
981            trade_intensity: connection_stats.messages_received as f64 / 60.0, // Messages per minute
982            avg_trade_size: rust_decimal::Decimal::ZERO, // Would track from trades
983            price_volatility: 0.0,                       // Would calculate from price movements
984            avg_spread_bps: 0.0,                         // Would track from order book updates
985            max_spread_bps: 0.0,                         // Would track from order book updates
986            market_impact_bps: 0.0,                      // Would calculate from trade analysis
987        };
988
989        // Calculate latency metrics
990        let latency_metrics = LatencyMetrics {
991            network_latency_ns: connection_stats.avg_latency_ns,
992            parsing_latency_ns: 50_000,    // Estimated parsing time
993            processing_latency_ns: 25_000, // Estimated processing time
994            total_latency_ns: connection_stats.avg_latency_ns + 75_000,
995            latency_jitter_ns: connection_stats.avg_latency_ns / 10, // Estimated jitter
996        };
997
998        // Calculate health score based on performance metrics
999        let health_score = if connection_stats.errors == 0 {
1000            1.0
1001        } else {
1002            (1.0 - (connection_stats.errors as f64
1003                / connection_stats.messages_received.max(1) as f64))
1004                .max(0.0)
1005        };
1006
1007        // Basic feed stats (would be enhanced with real-time data)
1008        let mut feed_stats = FeedStats::default();
1009        feed_stats.messages_processed = connection_stats.messages_received;
1010        feed_stats.messages_per_second = 0.0; // Would calculate from timing
1011        feed_stats.avg_process_latency_ns = connection_stats.avg_latency_ns;
1012        feed_stats.max_process_latency_ns = connection_stats.avg_latency_ns; // Simplified
1013        feed_stats.p99_process_latency_ns = connection_stats.avg_latency_ns;
1014        feed_stats.dropped_messages = 0; // Would track from actual feed drops
1015        feed_stats.last_update_time = current_time;
1016        feed_stats.memory_usage_bytes = std::mem::size_of::<Self>();
1017
1018        Ok(InstrumentMetrics {
1019            instrument_id: instrument_id.clone(),
1020            feed_stats,
1021            connection_stats,
1022            order_flow_metrics,
1023            latency_metrics,
1024            anomaly_score: 0.0, // Would be calculated by anomaly detection
1025            health_score,
1026            last_updated: current_time,
1027        })
1028    }
1029
1030    async fn get_aggregated_metrics(&self) -> Result<crate::provider::ext::AggregatedMetrics> {
1031        use crate::provider::ext::AggregatedMetrics;
1032
1033        let connection_stats = self.connection_stats();
1034        let subscriptions = self.subscriptions.read();
1035        let current_time = self.clock.raw();
1036
1037        Ok(AggregatedMetrics {
1038            total_instruments: subscriptions.len(),
1039            total_messages_processed: connection_stats.messages_received,
1040            total_dropped_messages: 0, // Would track from feed stats
1041            avg_latency_ns: connection_stats.avg_latency_ns,
1042            max_latency_ns: connection_stats.avg_latency_ns,
1043            p99_latency_ns: connection_stats.avg_latency_ns, // Simplified
1044            total_memory_usage_bytes: std::mem::size_of::<Self>(),
1045            total_errors: connection_stats.errors,
1046            overall_health_score: if connection_stats.errors == 0 {
1047                1.0
1048            } else {
1049                0.8
1050            },
1051            active_alerts: 0, // Would track active alerts
1052            timestamp: current_time,
1053        })
1054    }
1055
1056    async fn register_metrics_collector(
1057        &mut self,
1058        _collector: Box<dyn crate::provider::ext::MetricsCollector>,
1059    ) -> Result<()> {
1060        // Would store collector and start collection
1061        log::info!("Metrics collector registered for BinanceFuturesProvider");
1062        Ok(())
1063    }
1064
1065    async fn enable_alert(
1066        &mut self,
1067        alert_config: crate::provider::ext::AlertConfig,
1068    ) -> Result<crate::provider::ext::AlertHandle> {
1069        // Would implement alerting system
1070        log::info!(
1071            "Alert enabled: {} with threshold {}",
1072            alert_config.name,
1073            alert_config.threshold
1074        );
1075        Ok(crate::provider::ext::AlertHandle(1)) // Placeholder handle
1076    }
1077
1078    async fn disable_alert(&mut self, _handle: crate::provider::ext::AlertHandle) -> Result<()> {
1079        // Would disable specific alert
1080        log::info!("Alert disabled");
1081        Ok(())
1082    }
1083
1084    async fn get_health_status(&self) -> Result<crate::provider::ext::HealthStatus> {
1085        use crate::provider::ext::{ComponentHealth, HealthState, HealthStatus};
1086
1087        let connection_stats = self.connection_stats();
1088        let connected = self.is_connected();
1089        let current_time = self.clock.raw();
1090
1091        let overall_status = if connected {
1092            if connection_stats.errors == 0 {
1093                HealthState::Healthy
1094            } else {
1095                HealthState::Degraded
1096            }
1097        } else {
1098            HealthState::Offline
1099        };
1100
1101        let component_health = vec![
1102            ComponentHealth {
1103                component_name: "WebSocket Connection".to_string(),
1104                status: if connected {
1105                    HealthState::Healthy
1106                } else {
1107                    HealthState::Offline
1108                },
1109                last_heartbeat: current_time,
1110                error_rate: connection_stats.errors as f64
1111                    / connection_stats.messages_received.max(1) as f64,
1112                latency_p99_ns: connection_stats.avg_latency_ns,
1113            },
1114            ComponentHealth {
1115                component_name: "Message Processing".to_string(),
1116                status: HealthState::Healthy, // Would check actual message processing stats
1117                last_heartbeat: current_time,
1118                error_rate: 0.0, // Would calculate from actual processing stats
1119                latency_p99_ns: connection_stats.avg_latency_ns,
1120            },
1121        ];
1122
1123        let performance_grade = match overall_status {
1124            HealthState::Healthy => 'A',
1125            HealthState::Degraded => 'B',
1126            HealthState::Critical => 'D',
1127            HealthState::Offline => 'F',
1128        };
1129
1130        Ok(HealthStatus {
1131            overall_status,
1132            component_health,
1133            active_issues: vec![], // Would track actual issues
1134            performance_grade,
1135            uptime_seconds: (current_time - connection_stats.connected_time) / 1_000_000_000, // Convert ns to seconds
1136            last_check_time: current_time,
1137        })
1138    }
1139
1140    async fn start_anomaly_detection(
1141        &mut self,
1142        _config: crate::provider::ext::AnomalyConfig,
1143    ) -> Result<()> {
1144        // Would implement anomaly detection algorithms
1145        log::info!("Anomaly detection started for BinanceFuturesProvider");
1146        Ok(())
1147    }
1148
1149    async fn stop_anomaly_detection(&mut self) -> Result<()> {
1150        // Would stop anomaly detection
1151        log::info!("Anomaly detection stopped for BinanceFuturesProvider");
1152        Ok(())
1153    }
1154
1155    async fn get_latency_distribution(
1156        &self,
1157        _instrument_id: &InstrumentId,
1158    ) -> Result<crate::provider::ext::LatencyDistribution> {
1159        use crate::provider::ext::{LatencyBucket, LatencyDistribution};
1160
1161        let connection_stats = self.connection_stats();
1162
1163        // Simplified latency distribution based on available stats
1164        Ok(LatencyDistribution {
1165            p50_ns: connection_stats.avg_latency_ns,
1166            p90_ns: connection_stats.avg_latency_ns + (connection_stats.avg_latency_ns / 4),
1167            p95_ns: connection_stats.avg_latency_ns + (connection_stats.avg_latency_ns / 2),
1168            p99_ns: connection_stats.avg_latency_ns,
1169            p99_9_ns: connection_stats.avg_latency_ns + (connection_stats.avg_latency_ns / 10),
1170            max_ns: connection_stats.avg_latency_ns,
1171            histogram: vec![
1172                LatencyBucket {
1173                    min_ns: 0,
1174                    max_ns: 100_000,
1175                    count: 10,
1176                },
1177                LatencyBucket {
1178                    min_ns: 100_000,
1179                    max_ns: 500_000,
1180                    count: 50,
1181                },
1182                LatencyBucket {
1183                    min_ns: 500_000,
1184                    max_ns: 1_000_000,
1185                    count: 30,
1186                },
1187                LatencyBucket {
1188                    min_ns: 1_000_000,
1189                    max_ns: 5_000_000,
1190                    count: 10,
1191                },
1192            ],
1193        })
1194    }
1195
1196    async fn export_prometheus_metrics(&self) -> Result<std::string::String> {
1197        let connection_stats = self.connection_stats();
1198        let subscriptions = self.subscriptions.read();
1199        let provider_name = self.name();
1200
1201        let prometheus_metrics = format!(
1202            "# HELP binance_futures_messages_received_total Total messages received\n\
1203             # TYPE binance_futures_messages_received_total counter\n\
1204             binance_futures_messages_received_total{{provider=\"{provider_name}\"}} {}\n\
1205             # HELP binance_futures_messages_dropped_total Total messages dropped\n\
1206             # TYPE binance_futures_messages_dropped_total counter\n\
1207             binance_futures_messages_dropped_total{{provider=\"{provider_name}\"}} {}\n\
1208             # HELP binance_futures_latency_avg_ns Average latency in nanoseconds\n\
1209             # TYPE binance_futures_latency_avg_ns gauge\n\
1210             binance_futures_latency_avg_ns{{provider=\"{provider_name}\"}} {}\n\
1211             # HELP binance_futures_active_subscriptions Number of active subscriptions\n\
1212             # TYPE binance_futures_active_subscriptions gauge\n\
1213             binance_futures_active_subscriptions{{provider=\"{provider_name}\"}} {}\n",
1214            connection_stats.messages_received,
1215            0, // Would track actual dropped messages
1216            connection_stats.avg_latency_ns,
1217            subscriptions.len()
1218        );
1219
1220        Ok(prometheus_metrics)
1221    }
1222
1223    async fn reset_statistics(&mut self) -> Result<()> {
1224        // Reset connection statistics
1225        let mut stats = self.stats.write();
1226        *stats = ConnectionStats::default();
1227        log::info!("Statistics reset for BinanceFuturesProvider");
1228        Ok(())
1229    }
1230
1231    async fn get_trading_metrics(
1232        &self,
1233        _instrument_id: &InstrumentId,
1234    ) -> Result<crate::provider::ext::TradingMetrics> {
1235        use crate::provider::ext::{
1236            ExecutionQualityMetrics, MicrostructureSignals, TradingMetrics,
1237        };
1238
1239        // Would calculate from actual trading data
1240        Ok(TradingMetrics {
1241            order_flow_imbalance: 0.0,
1242            vwap: rust_decimal::Decimal::ZERO,
1243            twap: rust_decimal::Decimal::ZERO,
1244            microstructure_signals: MicrostructureSignals {
1245                book_imbalance: 0.0,
1246                trade_sign_accuracy: 0.5,
1247                alpha_signal: 0.0,
1248                momentum: 0.0,
1249                mean_reversion: 0.0,
1250            },
1251            execution_quality: ExecutionQualityMetrics {
1252                slippage_bps: 0.0,
1253                market_impact_bps: 0.0,
1254                fill_rate: 1.0,
1255                avg_fill_time_ns: 1_000_000,
1256                implementation_shortfall_bps: 0.0,
1257            },
1258        })
1259    }
1260}