rusty_feeder/exchange/binance/spot/
message_handler.rs

1//! Binance Spot WebSocket message handler implementation
2
3use crate::exchange::zerocopy_helpers::{deserialize_from_slice_borrowed, deserialize_from_vec};
4use async_trait::async_trait;
5use parking_lot::RwLock;
6use quanta::Clock;
7use rusty_common::json::{self, Value};
8use rusty_common::websocket::{Message, MessageHandler, WebSocketError, WebSocketResult};
9use simd_json::prelude::ValueObjectAccess;
10use smartstring::alias::String;
11use std::sync::Arc;
12use tokio::sync::mpsc;
13
14use super::data::{orderbook::OrderbookMessage, trade::TradeMessage};
15use crate::exchange::binance::common::messages::BinanceWebsocketResponse;
16use crate::provider::prelude::ConnectionStats;
17
18/// Message handler for Binance Spot WebSocket connections
19pub struct BinanceSpotMessageHandler {
20    /// Clock for timing
21    clock: Clock,
22
23    /// Connection statistics
24    stats: Arc<RwLock<ConnectionStats>>,
25
26    /// Trade message sender
27    trade_tx: Option<mpsc::Sender<TradeMessage>>,
28
29    /// Orderbook message sender
30    orderbook_tx: Option<mpsc::Sender<OrderbookMessage>>,
31
32    /// Whether this is a combined stream
33    is_combined: bool,
34
35    /// Subscription message to send on connection
36    subscription_message: Option<String>,
37}
38
39impl BinanceSpotMessageHandler {
40    /// Create a new message handler for trade messages
41    pub const fn new_trade_handler(
42        clock: Clock,
43        stats: Arc<RwLock<ConnectionStats>>,
44        trade_tx: mpsc::Sender<TradeMessage>,
45        is_combined: bool,
46        subscription_message: Option<String>,
47    ) -> Self {
48        Self {
49            clock,
50            stats,
51            trade_tx: Some(trade_tx),
52            orderbook_tx: None,
53            is_combined,
54            subscription_message,
55        }
56    }
57
58    /// Create a new message handler for orderbook messages
59    pub const fn new_orderbook_handler(
60        clock: Clock,
61        stats: Arc<RwLock<ConnectionStats>>,
62        orderbook_tx: mpsc::Sender<OrderbookMessage>,
63        is_combined: bool,
64        subscription_message: Option<String>,
65    ) -> Self {
66        Self {
67            clock,
68            stats,
69            trade_tx: None,
70            orderbook_tx: Some(orderbook_tx),
71            is_combined,
72            subscription_message,
73        }
74    }
75
76    /// Update connection statistics
77    #[inline]
78    fn update_receive_stats(&self, message_size: usize) {
79        let local_time = self.clock.raw();
80        let mut s = self.stats.write();
81        s.messages_received += 1;
82        s.bytes_received += message_size as u64;
83        s.last_message_time = local_time;
84    }
85
86    /// Process trade text message
87    #[inline]
88    async fn process_trade_text_message(&self, text: String) -> WebSocketResult<()> {
89        let trade_tx = self
90            .trade_tx
91            .as_ref()
92            .ok_or(WebSocketError::ConnectionClosed(
93                "No trade sender configured".to_string(),
94            ))?;
95
96        // Zero-copy parsing: convert text to bytes and parse directly
97        match deserialize_from_slice_borrowed::<BinanceWebsocketResponse<TradeMessage>>(
98            text.as_bytes(),
99        ) {
100            Ok(BinanceWebsocketResponse::Data(trade_msg)) => {
101                self.update_receive_stats(text.len());
102                if let Err(e) = trade_tx.send(trade_msg).await {
103                    log::error!("Failed to send TradeMessage: {e}");
104                }
105            }
106            Ok(BinanceWebsocketResponse::Confirmation(conf_msg)) => {
107                log::debug!("Received subscription confirmation: {conf_msg:?}");
108            }
109            Err(e_direct) => {
110                if !self.is_combined {
111                    log::error!("Failed to parse TradeMessage text: {e_direct}");
112                    return Ok(());
113                }
114
115                let val = match deserialize_from_slice_borrowed::<Value>(text.as_bytes()) {
116                    Ok(val) => val,
117                    Err(e_combined) => {
118                        log::error!(
119                            "Failed to parse combined stream text message: {e_combined}, original error: {e_direct}"
120                        );
121                        return Ok(());
122                    }
123                };
124
125                let Some(data) = val.get("data") else {
126                    log::error!("Failed to parse trade text message: {e_direct}");
127                    return Ok(());
128                };
129
130                let trade_msg =
131                    match simd_json::serde::from_owned_value::<TradeMessage>(data.to_owned()) {
132                        Ok(trade_msg) => trade_msg,
133                        Err(e) => {
134                            log::error!("Failed to parse trade text message: {e}");
135                            return Ok(());
136                        }
137                    };
138
139                self.update_receive_stats(text.len());
140
141                if let Err(e) = trade_tx.try_send(trade_msg) {
142                    log::error!("Failed to send trade message: {e}");
143                }
144            }
145        }
146
147        Ok(())
148    }
149
150    /// Process trade binary message
151    #[inline]
152    async fn process_trade_binary_message(&self, bin: Vec<u8>) -> WebSocketResult<()> {
153        let trade_tx = self
154            .trade_tx
155            .as_ref()
156            .ok_or(WebSocketError::ConnectionClosed(
157                "No trade sender configured".to_string(),
158            ))?;
159
160        let bin_len = bin.len();
161
162        // For combined streams, we need to clone for fallback parsing
163        // For direct streams, we can use zero-copy parsing
164        if self.is_combined {
165            // Clone only for combined streams that might need fallback
166            match deserialize_from_vec::<TradeMessage>(bin.clone()) {
167                Ok(trade_msg) => {
168                    self.update_receive_stats(bin_len);
169                    if let Err(e) = trade_tx.try_send(trade_msg) {
170                        log::error!("Failed to send trade message: {e}");
171                    }
172                }
173                Err(e_direct) => {
174                    // Fallback to combined stream parsing
175                    let val = match deserialize_from_vec::<Value>(bin) {
176                        Ok(v) => v,
177                        Err(e_value) => {
178                            log::error!(
179                                "Failed to parse combined stream binary message as Value: {e_value}, original error: {e_direct}"
180                            );
181                            return Ok(());
182                        }
183                    };
184
185                    let Some(data) = val.get("data") else {
186                        log::error!(
187                            "Combined stream binary message is missing 'data' field, original error trying to parse as TradeMessage: {e_direct}"
188                        );
189                        return Ok(());
190                    };
191
192                    let trade_msg = match simd_json::serde::from_owned_value::<TradeMessage>(
193                        data.to_owned(),
194                    ) {
195                        Ok(msg) => msg,
196                        Err(e_data) => {
197                            log::error!(
198                                "Failed to parse 'data' field from combined stream binary message into TradeMessage: {e_data}, original error trying to parse root as TradeMessage: {e_direct}"
199                            );
200                            return Ok(());
201                        }
202                    };
203
204                    self.update_receive_stats(bin_len);
205                    if let Err(e_send) = trade_tx.try_send(trade_msg) {
206                        log::error!(
207                            "Failed to send trade message from combined binary stream: {e_send}"
208                        );
209                    }
210                }
211            }
212        } else {
213            // Zero-copy parsing for direct streams
214            match deserialize_from_vec::<TradeMessage>(bin) {
215                Ok(trade_msg) => {
216                    self.update_receive_stats(bin_len);
217                    if let Err(e) = trade_tx.try_send(trade_msg) {
218                        log::error!("Failed to send trade message: {e}");
219                    }
220                }
221                Err(e_direct) => {
222                    log::error!("Failed to parse trade binary message: {e_direct}");
223                    return Ok(());
224                }
225            }
226        }
227
228        Ok(())
229    }
230
231    /// Process orderbook text message
232    #[inline]
233    async fn process_orderbook_text_message(&self, text: String) -> WebSocketResult<()> {
234        let orderbook_tx = self
235            .orderbook_tx
236            .as_ref()
237            .ok_or(WebSocketError::ConnectionClosed(
238                "No orderbook sender configured".to_string(),
239            ))?;
240
241        match json::parse_fast_owned::<BinanceWebsocketResponse<OrderbookMessage>>(text.clone()) {
242            Ok(BinanceWebsocketResponse::Data(orderbook_msg)) => {
243                self.update_receive_stats(text.len());
244                if let Err(e) = orderbook_tx.send(orderbook_msg).await {
245                    log::error!("Failed to send OrderbookMessage: {e}");
246                }
247            }
248            Ok(BinanceWebsocketResponse::Confirmation(conf_msg)) => {
249                log::debug!("Received subscription confirmation: {conf_msg:?}");
250            }
251            Err(e_direct) => {
252                if !self.is_combined {
253                    log::error!("Failed to parse OrderbookMessage text: {e_direct}");
254                    return Ok(());
255                }
256
257                let val = match json::parse::<Value>(text.as_str()) {
258                    Ok(v) => v,
259                    Err(e_value) => {
260                        log::error!(
261                            "Failed to parse combined stream text message for orderbook as Value: {e_value}, original error: {e_direct}"
262                        );
263                        return Ok(());
264                    }
265                };
266
267                let Some(data) = val.get("data") else {
268                    log::error!(
269                        "Combined stream text message for orderbook is missing 'data' field, original error trying to parse as OrderbookMessage: {e_direct}"
270                    );
271                    return Ok(());
272                };
273
274                let ob_msg = match simd_json::serde::from_owned_value::<OrderbookMessage>(
275                    data.to_owned(),
276                ) {
277                    Ok(msg) => msg,
278                    Err(e_data) => {
279                        log::error!(
280                            "Failed to parse 'data' field from combined stream text message for orderbook into OrderbookMessage: {e_data}, original error: {e_direct}"
281                        );
282                        return Ok(());
283                    }
284                };
285
286                self.update_receive_stats(text.len());
287                if let Err(e_send) = orderbook_tx.send(ob_msg).await {
288                    log::error!(
289                        "Failed to send orderbook message from combined text stream: {e_send}"
290                    );
291                }
292            }
293        }
294
295        Ok(())
296    }
297
298    /// Process orderbook binary message
299    #[inline]
300    async fn process_orderbook_binary_message(&self, bin: Vec<u8>) -> WebSocketResult<()> {
301        let orderbook_tx = self
302            .orderbook_tx
303            .as_ref()
304            .ok_or(WebSocketError::ConnectionClosed(
305                "No orderbook sender configured".to_string(),
306            ))?;
307
308        let bin_len = bin.len();
309
310        // For combined streams, we need to clone for fallback parsing
311        // For direct streams, we can use zero-copy parsing
312        if self.is_combined {
313            // Clone only for combined streams that might need fallback
314            match deserialize_from_vec::<OrderbookMessage>(bin.clone()) {
315                Ok(ob_msg) => {
316                    self.update_receive_stats(bin_len);
317                    if let Err(e) = orderbook_tx.send(ob_msg).await {
318                        log::error!("Failed to send orderbook message: {e}");
319                    }
320                }
321                Err(e_direct) => {
322                    // Fallback to combined stream parsing
323                    let val = match deserialize_from_vec::<Value>(bin) {
324                        Ok(v) => v,
325                        Err(e_value) => {
326                            log::error!(
327                                "Failed to parse combined stream binary message for orderbook as Value: {e_value}, original error: {e_direct}"
328                            );
329                            return Ok(());
330                        }
331                    };
332
333                    let Some(data) = val.get("data") else {
334                        log::error!(
335                            "Combined stream binary message for orderbook is missing 'data' field, original error trying to parse as OrderbookMessage: {e_direct}"
336                        );
337                        return Ok(());
338                    };
339
340                    let ob_msg = match simd_json::serde::from_owned_value::<OrderbookMessage>(
341                        data.to_owned(),
342                    ) {
343                        Ok(msg) => msg,
344                        Err(e_data) => {
345                            log::error!(
346                                "Failed to parse 'data' field from combined stream binary message for orderbook into OrderbookMessage: {e_data}, original error: {e_direct}"
347                            );
348                            return Ok(());
349                        }
350                    };
351
352                    self.update_receive_stats(bin_len);
353                    if let Err(e_send) = orderbook_tx.send(ob_msg).await {
354                        log::error!(
355                            "Failed to send orderbook message from combined binary stream: {e_send}"
356                        );
357                    }
358                }
359            }
360        } else {
361            // Zero-copy parsing for direct streams
362            match deserialize_from_vec::<OrderbookMessage>(bin) {
363                Ok(ob_msg) => {
364                    self.update_receive_stats(bin_len);
365                    if let Err(e) = orderbook_tx.send(ob_msg).await {
366                        log::error!("Failed to send orderbook message: {e}");
367                    }
368                }
369                Err(e_direct) => {
370                    log::error!("Failed to parse orderbook binary message: {e_direct}");
371                    return Ok(());
372                }
373            }
374        }
375
376        Ok(())
377    }
378}
379
380#[async_trait]
381impl MessageHandler for BinanceSpotMessageHandler {
382    async fn on_connected(&mut self) -> WebSocketResult<()> {
383        log::info!("Binance Spot WebSocket connected");
384        self.stats.write().connected_time = self.clock.raw();
385
386        // Subscription will be sent by the provider after connection
387        Ok(())
388    }
389
390    async fn on_message(&mut self, message: Message) -> WebSocketResult<()> {
391        match message {
392            Message::Text(text) => {
393                if self.trade_tx.is_some() {
394                    self.process_trade_text_message(text).await?;
395                } else if self.orderbook_tx.is_some() {
396                    self.process_orderbook_text_message(text).await?;
397                }
398            }
399            Message::Binary(bin) => {
400                if self.trade_tx.is_some() {
401                    self.process_trade_binary_message(bin).await?;
402                } else if self.orderbook_tx.is_some() {
403                    self.process_orderbook_binary_message(bin).await?;
404                }
405            }
406            Message::Pong(_) => {
407                // Update pong time
408                self.stats.write().last_pong_time = self.clock.raw();
409            }
410            Message::Close(close_frame) => {
411                log::info!("WebSocket connection closed: {close_frame:?}");
412            }
413            Message::Ping(payload) => {
414                // Ping responses are handled automatically by WebSocketClient
415                log::trace!("Received ping: {} bytes", payload.len());
416            }
417            Message::Frame(_) => {
418                // Ignoring Frame messages
419                log::trace!("Received unhandled Message::Frame");
420            }
421        }
422
423        Ok(())
424    }
425
426    async fn on_disconnected(&mut self) -> WebSocketResult<()> {
427        log::info!("Binance Spot WebSocket disconnected");
428        Ok(())
429    }
430}