rusty_feeder/exchange/binance/spot/
message_handler_zerocopy.rs

1//! Zero-copy Binance Spot WebSocket message handler implementation
2//!
3//! This module provides a high-performance message handler that uses
4//! memory pools and zero-copy parsing for maximum efficiency.
5
6use async_trait::async_trait;
7use parking_lot::RwLock;
8use quanta::Clock;
9use rusty_common::json::{self};
10use rusty_common::memory::trading_pools::with_thread_local_pools;
11use rusty_common::websocket::{Message, MessageHandler, WebSocketError, WebSocketResult};
12use simd_json::prelude::ValueObjectAccess;
13use smartstring::alias::String;
14use std::sync::Arc;
15use tokio::sync::mpsc;
16
17use super::data::{orderbook::OrderbookMessage, trade::TradeMessage};
18use crate::exchange::binance::common::messages::BinanceWebsocketResponse;
19use crate::provider::prelude::ConnectionStats;
20
21/// Zero-copy message handler for Binance Spot WebSocket connections
22pub struct BinanceSpotZeroCopyMessageHandler {
23    /// Clock for timing
24    clock: Clock,
25
26    /// Connection statistics
27    stats: Arc<RwLock<ConnectionStats>>,
28
29    /// Trade message sender
30    trade_tx: Option<mpsc::Sender<TradeMessage>>,
31
32    /// Orderbook message sender
33    orderbook_tx: Option<mpsc::Sender<OrderbookMessage>>,
34
35    /// Whether this is a combined stream
36    is_combined: bool,
37
38    /// Subscription message to send on connection
39    subscription_message: Option<String>,
40}
41
42impl BinanceSpotZeroCopyMessageHandler {
43    /// Create a new zero-copy message handler for trade messages
44    pub const fn new_trade_handler(
45        clock: Clock,
46        stats: Arc<RwLock<ConnectionStats>>,
47        trade_tx: mpsc::Sender<TradeMessage>,
48        is_combined: bool,
49        subscription_message: Option<String>,
50    ) -> Self {
51        Self {
52            clock,
53            stats,
54            trade_tx: Some(trade_tx),
55            orderbook_tx: None,
56            is_combined,
57            subscription_message,
58        }
59    }
60
61    /// Create a new zero-copy message handler for orderbook messages
62    pub const fn new_orderbook_handler(
63        clock: Clock,
64        stats: Arc<RwLock<ConnectionStats>>,
65        orderbook_tx: mpsc::Sender<OrderbookMessage>,
66        is_combined: bool,
67        subscription_message: Option<String>,
68    ) -> Self {
69        Self {
70            clock,
71            stats,
72            trade_tx: None,
73            orderbook_tx: Some(orderbook_tx),
74            is_combined,
75            subscription_message,
76        }
77    }
78
79    /// Update connection statistics
80    #[inline]
81    fn update_receive_stats(&self, message_size: usize) {
82        let local_time = self.clock.raw();
83        let mut s = self.stats.write();
84        s.messages_received += 1;
85        s.bytes_received += message_size as u64;
86        s.last_message_time = local_time;
87    }
88
89    /// Process trade message using zero-copy parsing
90    #[inline]
91    async fn process_trade_zerocopy(&self, buffer: &mut [u8]) -> WebSocketResult<()> {
92        let trade_tx = self
93            .trade_tx
94            .as_ref()
95            .ok_or(WebSocketError::ConnectionClosed(
96                "No trade sender configured".to_string(),
97            ))?;
98
99        // First try to parse as direct trade message
100        match json::parse_zerocopy::<BinanceWebsocketResponse<TradeMessage>>(buffer) {
101            Ok(BinanceWebsocketResponse::Data(trade_msg)) => {
102                if let Err(e) = trade_tx.send(trade_msg).await {
103                    log::error!("Failed to send TradeMessage: {e}");
104                }
105                return Ok(());
106            }
107            Ok(BinanceWebsocketResponse::Confirmation(conf_msg)) => {
108                log::debug!("Received subscription confirmation: {conf_msg:?}");
109                return Ok(());
110            }
111            Err(e_direct) => {
112                if !self.is_combined {
113                    log::error!("Failed to parse TradeMessage: {e_direct}");
114                    return Ok(());
115                }
116
117                // For combined streams, parse as generic value first
118                let borrowed_val = match json::parse_to_borrowed_value(buffer) {
119                    Ok(val) => val,
120                    Err(e) => {
121                        log::error!(
122                            "Failed to parse combined stream: {e}, original error: {e_direct}"
123                        );
124                        return Ok(());
125                    }
126                };
127
128                // Extract data field
129                let Some(data) = borrowed_val.get("data") else {
130                    log::error!("Combined stream missing 'data' field");
131                    return Ok(());
132                };
133
134                // Convert borrowed value to owned for sending
135                let owned_data = simd_json::serde::to_owned_value(data).map_err(|e| {
136                    WebSocketError::MessageProcessingError(format!(
137                        "Failed to convert to owned value: {e}"
138                    ))
139                })?;
140
141                let trade_msg = match simd_json::serde::from_owned_value::<TradeMessage>(owned_data)
142                {
143                    Ok(msg) => msg,
144                    Err(e) => {
145                        log::error!("Failed to parse trade from combined stream: {e}");
146                        return Ok(());
147                    }
148                };
149
150                if let Err(e) = trade_tx.send(trade_msg).await {
151                    log::error!("Failed to send trade message: {e}");
152                }
153            }
154        }
155
156        Ok(())
157    }
158
159    /// Process orderbook message using zero-copy parsing
160    #[inline]
161    async fn process_orderbook_zerocopy(&self, buffer: &mut [u8]) -> WebSocketResult<()> {
162        let orderbook_tx = self
163            .orderbook_tx
164            .as_ref()
165            .ok_or(WebSocketError::ConnectionClosed(
166                "No orderbook sender configured".to_string(),
167            ))?;
168
169        // First try to parse as direct orderbook message
170        match json::parse_zerocopy::<BinanceWebsocketResponse<OrderbookMessage>>(buffer) {
171            Ok(BinanceWebsocketResponse::Data(orderbook_msg)) => {
172                if let Err(e) = orderbook_tx.send(orderbook_msg).await {
173                    log::error!("Failed to send OrderbookMessage: {e}");
174                }
175                return Ok(());
176            }
177            Ok(BinanceWebsocketResponse::Confirmation(conf_msg)) => {
178                log::debug!("Received subscription confirmation: {conf_msg:?}");
179                return Ok(());
180            }
181            Err(e_direct) => {
182                if !self.is_combined {
183                    log::error!("Failed to parse OrderbookMessage: {e_direct}");
184                    return Ok(());
185                }
186
187                // For combined streams, parse as generic value first
188                let borrowed_val = match json::parse_to_borrowed_value(buffer) {
189                    Ok(val) => val,
190                    Err(e) => {
191                        log::error!(
192                            "Failed to parse combined stream: {e}, original error: {e_direct}"
193                        );
194                        return Ok(());
195                    }
196                };
197
198                // Extract data field
199                let Some(data) = borrowed_val.get("data") else {
200                    log::error!("Combined stream missing 'data' field");
201                    return Ok(());
202                };
203
204                // Convert borrowed value to owned for sending
205                let owned_data = simd_json::serde::to_owned_value(data).map_err(|e| {
206                    WebSocketError::MessageProcessingError(format!(
207                        "Failed to convert to owned value: {e}"
208                    ))
209                })?;
210
211                let orderbook_msg =
212                    match simd_json::serde::from_owned_value::<OrderbookMessage>(owned_data) {
213                        Ok(msg) => msg,
214                        Err(e) => {
215                            log::error!("Failed to parse orderbook from combined stream: {e}");
216                            return Ok(());
217                        }
218                    };
219
220                if let Err(e) = orderbook_tx.send(orderbook_msg).await {
221                    log::error!("Failed to send orderbook message: {e}");
222                }
223            }
224        }
225
226        Ok(())
227    }
228}
229
230#[async_trait]
231impl MessageHandler for BinanceSpotZeroCopyMessageHandler {
232    async fn on_connected(&mut self) -> WebSocketResult<()> {
233        log::info!("Binance Spot WebSocket connected (zero-copy mode)");
234        self.stats.write().connected_time = self.clock.raw();
235        Ok(())
236    }
237
238    async fn on_message(&mut self, message: Message) -> WebSocketResult<()> {
239        // Get pooled buffers for parsing
240        let buffer_result = with_thread_local_pools(|pools| {
241            let json_pool = pools.json_pool();
242            json_pool.get_parsing_buffer().ok_or_else(|| {
243                WebSocketError::MessageProcessingError("Failed to get parsing buffer".into())
244            })
245        });
246
247        let mut buffer_handle = buffer_result?;
248
249        match message {
250            Message::Text(text) => {
251                let text_bytes = text.as_bytes();
252                self.update_receive_stats(text_bytes.len());
253
254                // Get mutable slice from buffer (safe zerocopy method)
255                let buffer = buffer_handle.as_slice_mut();
256
257                // Ensure buffer is large enough
258                if buffer.len() < text_bytes.len() {
259                    log::error!(
260                        "Buffer too small for message: {} < {}",
261                        buffer.len(),
262                        text_bytes.len()
263                    );
264                    return Ok(());
265                }
266
267                // Copy text to buffer
268                buffer[..text_bytes.len()].copy_from_slice(text_bytes);
269                let parse_buffer = &mut buffer[..text_bytes.len()];
270
271                if self.trade_tx.is_some() {
272                    self.process_trade_zerocopy(parse_buffer).await?;
273                } else if self.orderbook_tx.is_some() {
274                    self.process_orderbook_zerocopy(parse_buffer).await?;
275                }
276            }
277            Message::Binary(bin) => {
278                self.update_receive_stats(bin.len());
279
280                // Get mutable slice from buffer (safe zerocopy method)
281                let buffer = buffer_handle.as_slice_mut();
282
283                // Ensure buffer is large enough
284                if buffer.len() < bin.len() {
285                    log::error!(
286                        "Buffer too small for message: {} < {}",
287                        buffer.len(),
288                        bin.len()
289                    );
290                    return Ok(());
291                }
292
293                // Copy binary data to buffer
294                buffer[..bin.len()].copy_from_slice(&bin);
295                let parse_buffer = &mut buffer[..bin.len()];
296
297                if self.trade_tx.is_some() {
298                    self.process_trade_zerocopy(parse_buffer).await?;
299                } else if self.orderbook_tx.is_some() {
300                    self.process_orderbook_zerocopy(parse_buffer).await?;
301                }
302            }
303            Message::Pong(_) => {
304                self.stats.write().last_pong_time = self.clock.raw();
305            }
306            Message::Close(close_frame) => {
307                log::info!("WebSocket connection closed: {close_frame:?}");
308            }
309            Message::Ping(payload) => {
310                log::trace!("Received ping: {} bytes", payload.len());
311            }
312            Message::Frame(_) => {
313                log::trace!("Received unhandled Message::Frame");
314            }
315        }
316
317        Ok(())
318    }
319
320    async fn on_disconnected(&mut self) -> WebSocketResult<()> {
321        log::info!("Binance Spot WebSocket disconnected (zero-copy mode)");
322        Ok(())
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329    use rusty_common::memory::trading_pools::initialize_trading_pools;
330
331    #[test]
332    fn test_zerocopy_handler_creation() {
333        // Initialize pools for testing
334        initialize_trading_pools();
335
336        let clock = Clock::new();
337        let stats = Arc::new(RwLock::new(ConnectionStats::default()));
338        let (tx, _rx) = mpsc::channel(100);
339
340        let handler =
341            BinanceSpotZeroCopyMessageHandler::new_trade_handler(clock, stats, tx, false, None);
342
343        assert!(handler.trade_tx.is_some());
344        assert!(handler.orderbook_tx.is_none());
345    }
346}