rusty_feeder/exchange/binance/spot/
message_handler_zerocopy.rs1use async_trait::async_trait;
7use parking_lot::RwLock;
8use quanta::Clock;
9use rusty_common::json::{self};
10use rusty_common::memory::trading_pools::with_thread_local_pools;
11use rusty_common::websocket::{Message, MessageHandler, WebSocketError, WebSocketResult};
12use simd_json::prelude::ValueObjectAccess;
13use smartstring::alias::String;
14use std::sync::Arc;
15use tokio::sync::mpsc;
16
17use super::data::{orderbook::OrderbookMessage, trade::TradeMessage};
18use crate::exchange::binance::common::messages::BinanceWebsocketResponse;
19use crate::provider::prelude::ConnectionStats;
20
21pub struct BinanceSpotZeroCopyMessageHandler {
23 clock: Clock,
25
26 stats: Arc<RwLock<ConnectionStats>>,
28
29 trade_tx: Option<mpsc::Sender<TradeMessage>>,
31
32 orderbook_tx: Option<mpsc::Sender<OrderbookMessage>>,
34
35 is_combined: bool,
37
38 subscription_message: Option<String>,
40}
41
42impl BinanceSpotZeroCopyMessageHandler {
43 pub const fn new_trade_handler(
45 clock: Clock,
46 stats: Arc<RwLock<ConnectionStats>>,
47 trade_tx: mpsc::Sender<TradeMessage>,
48 is_combined: bool,
49 subscription_message: Option<String>,
50 ) -> Self {
51 Self {
52 clock,
53 stats,
54 trade_tx: Some(trade_tx),
55 orderbook_tx: None,
56 is_combined,
57 subscription_message,
58 }
59 }
60
61 pub const fn new_orderbook_handler(
63 clock: Clock,
64 stats: Arc<RwLock<ConnectionStats>>,
65 orderbook_tx: mpsc::Sender<OrderbookMessage>,
66 is_combined: bool,
67 subscription_message: Option<String>,
68 ) -> Self {
69 Self {
70 clock,
71 stats,
72 trade_tx: None,
73 orderbook_tx: Some(orderbook_tx),
74 is_combined,
75 subscription_message,
76 }
77 }
78
79 #[inline]
81 fn update_receive_stats(&self, message_size: usize) {
82 let local_time = self.clock.raw();
83 let mut s = self.stats.write();
84 s.messages_received += 1;
85 s.bytes_received += message_size as u64;
86 s.last_message_time = local_time;
87 }
88
89 #[inline]
91 async fn process_trade_zerocopy(&self, buffer: &mut [u8]) -> WebSocketResult<()> {
92 let trade_tx = self
93 .trade_tx
94 .as_ref()
95 .ok_or(WebSocketError::ConnectionClosed(
96 "No trade sender configured".to_string(),
97 ))?;
98
99 match json::parse_zerocopy::<BinanceWebsocketResponse<TradeMessage>>(buffer) {
101 Ok(BinanceWebsocketResponse::Data(trade_msg)) => {
102 if let Err(e) = trade_tx.send(trade_msg).await {
103 log::error!("Failed to send TradeMessage: {e}");
104 }
105 return Ok(());
106 }
107 Ok(BinanceWebsocketResponse::Confirmation(conf_msg)) => {
108 log::debug!("Received subscription confirmation: {conf_msg:?}");
109 return Ok(());
110 }
111 Err(e_direct) => {
112 if !self.is_combined {
113 log::error!("Failed to parse TradeMessage: {e_direct}");
114 return Ok(());
115 }
116
117 let borrowed_val = match json::parse_to_borrowed_value(buffer) {
119 Ok(val) => val,
120 Err(e) => {
121 log::error!(
122 "Failed to parse combined stream: {e}, original error: {e_direct}"
123 );
124 return Ok(());
125 }
126 };
127
128 let Some(data) = borrowed_val.get("data") else {
130 log::error!("Combined stream missing 'data' field");
131 return Ok(());
132 };
133
134 let owned_data = simd_json::serde::to_owned_value(data).map_err(|e| {
136 WebSocketError::MessageProcessingError(format!(
137 "Failed to convert to owned value: {e}"
138 ))
139 })?;
140
141 let trade_msg = match simd_json::serde::from_owned_value::<TradeMessage>(owned_data)
142 {
143 Ok(msg) => msg,
144 Err(e) => {
145 log::error!("Failed to parse trade from combined stream: {e}");
146 return Ok(());
147 }
148 };
149
150 if let Err(e) = trade_tx.send(trade_msg).await {
151 log::error!("Failed to send trade message: {e}");
152 }
153 }
154 }
155
156 Ok(())
157 }
158
159 #[inline]
161 async fn process_orderbook_zerocopy(&self, buffer: &mut [u8]) -> WebSocketResult<()> {
162 let orderbook_tx = self
163 .orderbook_tx
164 .as_ref()
165 .ok_or(WebSocketError::ConnectionClosed(
166 "No orderbook sender configured".to_string(),
167 ))?;
168
169 match json::parse_zerocopy::<BinanceWebsocketResponse<OrderbookMessage>>(buffer) {
171 Ok(BinanceWebsocketResponse::Data(orderbook_msg)) => {
172 if let Err(e) = orderbook_tx.send(orderbook_msg).await {
173 log::error!("Failed to send OrderbookMessage: {e}");
174 }
175 return Ok(());
176 }
177 Ok(BinanceWebsocketResponse::Confirmation(conf_msg)) => {
178 log::debug!("Received subscription confirmation: {conf_msg:?}");
179 return Ok(());
180 }
181 Err(e_direct) => {
182 if !self.is_combined {
183 log::error!("Failed to parse OrderbookMessage: {e_direct}");
184 return Ok(());
185 }
186
187 let borrowed_val = match json::parse_to_borrowed_value(buffer) {
189 Ok(val) => val,
190 Err(e) => {
191 log::error!(
192 "Failed to parse combined stream: {e}, original error: {e_direct}"
193 );
194 return Ok(());
195 }
196 };
197
198 let Some(data) = borrowed_val.get("data") else {
200 log::error!("Combined stream missing 'data' field");
201 return Ok(());
202 };
203
204 let owned_data = simd_json::serde::to_owned_value(data).map_err(|e| {
206 WebSocketError::MessageProcessingError(format!(
207 "Failed to convert to owned value: {e}"
208 ))
209 })?;
210
211 let orderbook_msg =
212 match simd_json::serde::from_owned_value::<OrderbookMessage>(owned_data) {
213 Ok(msg) => msg,
214 Err(e) => {
215 log::error!("Failed to parse orderbook from combined stream: {e}");
216 return Ok(());
217 }
218 };
219
220 if let Err(e) = orderbook_tx.send(orderbook_msg).await {
221 log::error!("Failed to send orderbook message: {e}");
222 }
223 }
224 }
225
226 Ok(())
227 }
228}
229
230#[async_trait]
231impl MessageHandler for BinanceSpotZeroCopyMessageHandler {
232 async fn on_connected(&mut self) -> WebSocketResult<()> {
233 log::info!("Binance Spot WebSocket connected (zero-copy mode)");
234 self.stats.write().connected_time = self.clock.raw();
235 Ok(())
236 }
237
238 async fn on_message(&mut self, message: Message) -> WebSocketResult<()> {
239 let buffer_result = with_thread_local_pools(|pools| {
241 let json_pool = pools.json_pool();
242 json_pool.get_parsing_buffer().ok_or_else(|| {
243 WebSocketError::MessageProcessingError("Failed to get parsing buffer".into())
244 })
245 });
246
247 let mut buffer_handle = buffer_result?;
248
249 match message {
250 Message::Text(text) => {
251 let text_bytes = text.as_bytes();
252 self.update_receive_stats(text_bytes.len());
253
254 let buffer = buffer_handle.as_slice_mut();
256
257 if buffer.len() < text_bytes.len() {
259 log::error!(
260 "Buffer too small for message: {} < {}",
261 buffer.len(),
262 text_bytes.len()
263 );
264 return Ok(());
265 }
266
267 buffer[..text_bytes.len()].copy_from_slice(text_bytes);
269 let parse_buffer = &mut buffer[..text_bytes.len()];
270
271 if self.trade_tx.is_some() {
272 self.process_trade_zerocopy(parse_buffer).await?;
273 } else if self.orderbook_tx.is_some() {
274 self.process_orderbook_zerocopy(parse_buffer).await?;
275 }
276 }
277 Message::Binary(bin) => {
278 self.update_receive_stats(bin.len());
279
280 let buffer = buffer_handle.as_slice_mut();
282
283 if buffer.len() < bin.len() {
285 log::error!(
286 "Buffer too small for message: {} < {}",
287 buffer.len(),
288 bin.len()
289 );
290 return Ok(());
291 }
292
293 buffer[..bin.len()].copy_from_slice(&bin);
295 let parse_buffer = &mut buffer[..bin.len()];
296
297 if self.trade_tx.is_some() {
298 self.process_trade_zerocopy(parse_buffer).await?;
299 } else if self.orderbook_tx.is_some() {
300 self.process_orderbook_zerocopy(parse_buffer).await?;
301 }
302 }
303 Message::Pong(_) => {
304 self.stats.write().last_pong_time = self.clock.raw();
305 }
306 Message::Close(close_frame) => {
307 log::info!("WebSocket connection closed: {close_frame:?}");
308 }
309 Message::Ping(payload) => {
310 log::trace!("Received ping: {} bytes", payload.len());
311 }
312 Message::Frame(_) => {
313 log::trace!("Received unhandled Message::Frame");
314 }
315 }
316
317 Ok(())
318 }
319
320 async fn on_disconnected(&mut self) -> WebSocketResult<()> {
321 log::info!("Binance Spot WebSocket disconnected (zero-copy mode)");
322 Ok(())
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329 use rusty_common::memory::trading_pools::initialize_trading_pools;
330
331 #[test]
332 fn test_zerocopy_handler_creation() {
333 initialize_trading_pools();
335
336 let clock = Clock::new();
337 let stats = Arc::new(RwLock::new(ConnectionStats::default()));
338 let (tx, _rx) = mpsc::channel(100);
339
340 let handler =
341 BinanceSpotZeroCopyMessageHandler::new_trade_handler(clock, stats, tx, false, None);
342
343 assert!(handler.trade_tx.is_some());
344 assert!(handler.orderbook_tx.is_none());
345 }
346}