rusty_feeder/exchange/binance/spot/
message_handler.rs1use crate::exchange::zerocopy_helpers::{deserialize_from_slice_borrowed, deserialize_from_vec};
4use async_trait::async_trait;
5use parking_lot::RwLock;
6use quanta::Clock;
7use rusty_common::json::{self, Value};
8use rusty_common::websocket::{Message, MessageHandler, WebSocketError, WebSocketResult};
9use simd_json::prelude::ValueObjectAccess;
10use smartstring::alias::String;
11use std::sync::Arc;
12use tokio::sync::mpsc;
13
14use super::data::{orderbook::OrderbookMessage, trade::TradeMessage};
15use crate::exchange::binance::common::messages::BinanceWebsocketResponse;
16use crate::provider::prelude::ConnectionStats;
17
18pub struct BinanceSpotMessageHandler {
20 clock: Clock,
22
23 stats: Arc<RwLock<ConnectionStats>>,
25
26 trade_tx: Option<mpsc::Sender<TradeMessage>>,
28
29 orderbook_tx: Option<mpsc::Sender<OrderbookMessage>>,
31
32 is_combined: bool,
34
35 subscription_message: Option<String>,
37}
38
39impl BinanceSpotMessageHandler {
40 pub const fn new_trade_handler(
42 clock: Clock,
43 stats: Arc<RwLock<ConnectionStats>>,
44 trade_tx: mpsc::Sender<TradeMessage>,
45 is_combined: bool,
46 subscription_message: Option<String>,
47 ) -> Self {
48 Self {
49 clock,
50 stats,
51 trade_tx: Some(trade_tx),
52 orderbook_tx: None,
53 is_combined,
54 subscription_message,
55 }
56 }
57
58 pub const fn new_orderbook_handler(
60 clock: Clock,
61 stats: Arc<RwLock<ConnectionStats>>,
62 orderbook_tx: mpsc::Sender<OrderbookMessage>,
63 is_combined: bool,
64 subscription_message: Option<String>,
65 ) -> Self {
66 Self {
67 clock,
68 stats,
69 trade_tx: None,
70 orderbook_tx: Some(orderbook_tx),
71 is_combined,
72 subscription_message,
73 }
74 }
75
76 #[inline]
78 fn update_receive_stats(&self, message_size: usize) {
79 let local_time = self.clock.raw();
80 let mut s = self.stats.write();
81 s.messages_received += 1;
82 s.bytes_received += message_size as u64;
83 s.last_message_time = local_time;
84 }
85
86 #[inline]
88 async fn process_trade_text_message(&self, text: String) -> WebSocketResult<()> {
89 let trade_tx = self
90 .trade_tx
91 .as_ref()
92 .ok_or(WebSocketError::ConnectionClosed(
93 "No trade sender configured".to_string(),
94 ))?;
95
96 match deserialize_from_slice_borrowed::<BinanceWebsocketResponse<TradeMessage>>(
98 text.as_bytes(),
99 ) {
100 Ok(BinanceWebsocketResponse::Data(trade_msg)) => {
101 self.update_receive_stats(text.len());
102 if let Err(e) = trade_tx.send(trade_msg).await {
103 log::error!("Failed to send TradeMessage: {e}");
104 }
105 }
106 Ok(BinanceWebsocketResponse::Confirmation(conf_msg)) => {
107 log::debug!("Received subscription confirmation: {conf_msg:?}");
108 }
109 Err(e_direct) => {
110 if !self.is_combined {
111 log::error!("Failed to parse TradeMessage text: {e_direct}");
112 return Ok(());
113 }
114
115 let val = match deserialize_from_slice_borrowed::<Value>(text.as_bytes()) {
116 Ok(val) => val,
117 Err(e_combined) => {
118 log::error!(
119 "Failed to parse combined stream text message: {e_combined}, original error: {e_direct}"
120 );
121 return Ok(());
122 }
123 };
124
125 let Some(data) = val.get("data") else {
126 log::error!("Failed to parse trade text message: {e_direct}");
127 return Ok(());
128 };
129
130 let trade_msg =
131 match simd_json::serde::from_owned_value::<TradeMessage>(data.to_owned()) {
132 Ok(trade_msg) => trade_msg,
133 Err(e) => {
134 log::error!("Failed to parse trade text message: {e}");
135 return Ok(());
136 }
137 };
138
139 self.update_receive_stats(text.len());
140
141 if let Err(e) = trade_tx.try_send(trade_msg) {
142 log::error!("Failed to send trade message: {e}");
143 }
144 }
145 }
146
147 Ok(())
148 }
149
150 #[inline]
152 async fn process_trade_binary_message(&self, bin: Vec<u8>) -> WebSocketResult<()> {
153 let trade_tx = self
154 .trade_tx
155 .as_ref()
156 .ok_or(WebSocketError::ConnectionClosed(
157 "No trade sender configured".to_string(),
158 ))?;
159
160 let bin_len = bin.len();
161
162 if self.is_combined {
165 match deserialize_from_vec::<TradeMessage>(bin.clone()) {
167 Ok(trade_msg) => {
168 self.update_receive_stats(bin_len);
169 if let Err(e) = trade_tx.try_send(trade_msg) {
170 log::error!("Failed to send trade message: {e}");
171 }
172 }
173 Err(e_direct) => {
174 let val = match deserialize_from_vec::<Value>(bin) {
176 Ok(v) => v,
177 Err(e_value) => {
178 log::error!(
179 "Failed to parse combined stream binary message as Value: {e_value}, original error: {e_direct}"
180 );
181 return Ok(());
182 }
183 };
184
185 let Some(data) = val.get("data") else {
186 log::error!(
187 "Combined stream binary message is missing 'data' field, original error trying to parse as TradeMessage: {e_direct}"
188 );
189 return Ok(());
190 };
191
192 let trade_msg = match simd_json::serde::from_owned_value::<TradeMessage>(
193 data.to_owned(),
194 ) {
195 Ok(msg) => msg,
196 Err(e_data) => {
197 log::error!(
198 "Failed to parse 'data' field from combined stream binary message into TradeMessage: {e_data}, original error trying to parse root as TradeMessage: {e_direct}"
199 );
200 return Ok(());
201 }
202 };
203
204 self.update_receive_stats(bin_len);
205 if let Err(e_send) = trade_tx.try_send(trade_msg) {
206 log::error!(
207 "Failed to send trade message from combined binary stream: {e_send}"
208 );
209 }
210 }
211 }
212 } else {
213 match deserialize_from_vec::<TradeMessage>(bin) {
215 Ok(trade_msg) => {
216 self.update_receive_stats(bin_len);
217 if let Err(e) = trade_tx.try_send(trade_msg) {
218 log::error!("Failed to send trade message: {e}");
219 }
220 }
221 Err(e_direct) => {
222 log::error!("Failed to parse trade binary message: {e_direct}");
223 return Ok(());
224 }
225 }
226 }
227
228 Ok(())
229 }
230
231 #[inline]
233 async fn process_orderbook_text_message(&self, text: String) -> WebSocketResult<()> {
234 let orderbook_tx = self
235 .orderbook_tx
236 .as_ref()
237 .ok_or(WebSocketError::ConnectionClosed(
238 "No orderbook sender configured".to_string(),
239 ))?;
240
241 match json::parse_fast_owned::<BinanceWebsocketResponse<OrderbookMessage>>(text.clone()) {
242 Ok(BinanceWebsocketResponse::Data(orderbook_msg)) => {
243 self.update_receive_stats(text.len());
244 if let Err(e) = orderbook_tx.send(orderbook_msg).await {
245 log::error!("Failed to send OrderbookMessage: {e}");
246 }
247 }
248 Ok(BinanceWebsocketResponse::Confirmation(conf_msg)) => {
249 log::debug!("Received subscription confirmation: {conf_msg:?}");
250 }
251 Err(e_direct) => {
252 if !self.is_combined {
253 log::error!("Failed to parse OrderbookMessage text: {e_direct}");
254 return Ok(());
255 }
256
257 let val = match json::parse::<Value>(text.as_str()) {
258 Ok(v) => v,
259 Err(e_value) => {
260 log::error!(
261 "Failed to parse combined stream text message for orderbook as Value: {e_value}, original error: {e_direct}"
262 );
263 return Ok(());
264 }
265 };
266
267 let Some(data) = val.get("data") else {
268 log::error!(
269 "Combined stream text message for orderbook is missing 'data' field, original error trying to parse as OrderbookMessage: {e_direct}"
270 );
271 return Ok(());
272 };
273
274 let ob_msg = match simd_json::serde::from_owned_value::<OrderbookMessage>(
275 data.to_owned(),
276 ) {
277 Ok(msg) => msg,
278 Err(e_data) => {
279 log::error!(
280 "Failed to parse 'data' field from combined stream text message for orderbook into OrderbookMessage: {e_data}, original error: {e_direct}"
281 );
282 return Ok(());
283 }
284 };
285
286 self.update_receive_stats(text.len());
287 if let Err(e_send) = orderbook_tx.send(ob_msg).await {
288 log::error!(
289 "Failed to send orderbook message from combined text stream: {e_send}"
290 );
291 }
292 }
293 }
294
295 Ok(())
296 }
297
298 #[inline]
300 async fn process_orderbook_binary_message(&self, bin: Vec<u8>) -> WebSocketResult<()> {
301 let orderbook_tx = self
302 .orderbook_tx
303 .as_ref()
304 .ok_or(WebSocketError::ConnectionClosed(
305 "No orderbook sender configured".to_string(),
306 ))?;
307
308 let bin_len = bin.len();
309
310 if self.is_combined {
313 match deserialize_from_vec::<OrderbookMessage>(bin.clone()) {
315 Ok(ob_msg) => {
316 self.update_receive_stats(bin_len);
317 if let Err(e) = orderbook_tx.send(ob_msg).await {
318 log::error!("Failed to send orderbook message: {e}");
319 }
320 }
321 Err(e_direct) => {
322 let val = match deserialize_from_vec::<Value>(bin) {
324 Ok(v) => v,
325 Err(e_value) => {
326 log::error!(
327 "Failed to parse combined stream binary message for orderbook as Value: {e_value}, original error: {e_direct}"
328 );
329 return Ok(());
330 }
331 };
332
333 let Some(data) = val.get("data") else {
334 log::error!(
335 "Combined stream binary message for orderbook is missing 'data' field, original error trying to parse as OrderbookMessage: {e_direct}"
336 );
337 return Ok(());
338 };
339
340 let ob_msg = match simd_json::serde::from_owned_value::<OrderbookMessage>(
341 data.to_owned(),
342 ) {
343 Ok(msg) => msg,
344 Err(e_data) => {
345 log::error!(
346 "Failed to parse 'data' field from combined stream binary message for orderbook into OrderbookMessage: {e_data}, original error: {e_direct}"
347 );
348 return Ok(());
349 }
350 };
351
352 self.update_receive_stats(bin_len);
353 if let Err(e_send) = orderbook_tx.send(ob_msg).await {
354 log::error!(
355 "Failed to send orderbook message from combined binary stream: {e_send}"
356 );
357 }
358 }
359 }
360 } else {
361 match deserialize_from_vec::<OrderbookMessage>(bin) {
363 Ok(ob_msg) => {
364 self.update_receive_stats(bin_len);
365 if let Err(e) = orderbook_tx.send(ob_msg).await {
366 log::error!("Failed to send orderbook message: {e}");
367 }
368 }
369 Err(e_direct) => {
370 log::error!("Failed to parse orderbook binary message: {e_direct}");
371 return Ok(());
372 }
373 }
374 }
375
376 Ok(())
377 }
378}
379
380#[async_trait]
381impl MessageHandler for BinanceSpotMessageHandler {
382 async fn on_connected(&mut self) -> WebSocketResult<()> {
383 log::info!("Binance Spot WebSocket connected");
384 self.stats.write().connected_time = self.clock.raw();
385
386 Ok(())
388 }
389
390 async fn on_message(&mut self, message: Message) -> WebSocketResult<()> {
391 match message {
392 Message::Text(text) => {
393 if self.trade_tx.is_some() {
394 self.process_trade_text_message(text).await?;
395 } else if self.orderbook_tx.is_some() {
396 self.process_orderbook_text_message(text).await?;
397 }
398 }
399 Message::Binary(bin) => {
400 if self.trade_tx.is_some() {
401 self.process_trade_binary_message(bin).await?;
402 } else if self.orderbook_tx.is_some() {
403 self.process_orderbook_binary_message(bin).await?;
404 }
405 }
406 Message::Pong(_) => {
407 self.stats.write().last_pong_time = self.clock.raw();
409 }
410 Message::Close(close_frame) => {
411 log::info!("WebSocket connection closed: {close_frame:?}");
412 }
413 Message::Ping(payload) => {
414 log::trace!("Received ping: {} bytes", payload.len());
416 }
417 Message::Frame(_) => {
418 log::trace!("Received unhandled Message::Frame");
420 }
421 }
422
423 Ok(())
424 }
425
426 async fn on_disconnected(&mut self) -> WebSocketResult<()> {
427 log::info!("Binance Spot WebSocket disconnected");
428 Ok(())
429 }
430}