1use anyhow::Result;
7use rust_decimal::Decimal;
8use rustc_hash::FxHashMap;
9use rusty_common::SmallVec;
10use rusty_model::data::zero_copy::{
11 MessageHeader, ZeroCopyOrderBookUpdate, ZeroCopyPriceLevel, ZeroCopyTrade,
12};
13use rusty_model::data::{MarketTrade, OrderBook, PriceLevel};
14use rusty_model::venues::Venue;
15use smartstring::alias::String;
16
17pub struct ZeroCopyConfig {
19 pub price_decimals: u32,
21 pub quantity_decimals: u32,
23 pub symbol_map: FxHashMap<u32, String>,
25}
26
27pub struct MarketDataChannels {
29 pub orderbook_tx: flume::Sender<OrderBook>,
31 pub trade_tx: flume::Sender<MarketTrade>,
33}
34
35impl MarketDataChannels {
36 #[must_use]
38 pub fn new(capacity: usize) -> (Self, MarketDataReceivers) {
39 let (orderbook_tx, orderbook_rx) = flume::bounded(capacity);
40 let (trade_tx, trade_rx) = flume::bounded(capacity);
41
42 let channels = Self {
43 orderbook_tx,
44 trade_tx,
45 };
46
47 let receivers = MarketDataReceivers {
48 orderbook_rx,
49 trade_rx,
50 };
51
52 (channels, receivers)
53 }
54}
55
56pub struct MarketDataReceivers {
58 pub orderbook_rx: flume::Receiver<OrderBook>,
60 pub trade_rx: flume::Receiver<MarketTrade>,
62}
63
64pub struct ZeroCopyAdapter {
66 config: ZeroCopyConfig,
67}
68
69impl ZeroCopyConfig {
70 #[must_use]
75 pub fn new_empty(price_decimals: u32, quantity_decimals: u32) -> Self {
76 Self {
77 price_decimals,
78 quantity_decimals,
79 symbol_map: FxHashMap::default(),
80 }
81 }
82
83 pub fn add_symbol(&mut self, symbol_id: u32, symbol: String) {
85 self.symbol_map.insert(symbol_id, symbol);
86 }
87}
88
89impl ZeroCopyAdapter {
90 #[must_use]
96 pub const fn new(config: ZeroCopyConfig) -> Self {
97 Self { config }
98 }
99
100 fn parse_header<'a>(&self, buffer: &'a [u8]) -> Result<&'a MessageHeader> {
102 if buffer.len() < std::mem::size_of::<MessageHeader>() {
103 return Err(anyhow::anyhow!(
104 "Buffer too small for message header: {} bytes",
105 buffer.len()
106 ));
107 }
108
109 MessageHeader::from_bytes(&buffer[..std::mem::size_of::<MessageHeader>()])
110 .ok_or_else(|| anyhow::anyhow!("Failed to parse message header"))
111 }
112
113 pub fn message_type(&self, buffer: &[u8]) -> Result<MessageType> {
115 let header = self.parse_header(buffer)?;
116
117 match header.msg_type {
118 1001 => Ok(MessageType::OrderBook),
119 1002 => Ok(MessageType::Trade),
120 _ => Err(anyhow::anyhow!("Unknown message type: {}", header.msg_type)),
121 }
122 }
123
124 pub fn parse_and_send(&self, buffer: &[u8], channels: &MarketDataChannels) -> Result<()> {
126 let header = self.parse_header(buffer)?;
128
129 if buffer.len() < header.length as usize {
131 return Err(anyhow::anyhow!(
132 "Incomplete message: expected {} bytes, got {}",
133 header.length,
134 buffer.len()
135 ));
136 }
137
138 match header.msg_type {
140 1001 => {
141 let orderbook = self.parse_orderbook(&buffer[8..])?;
142 channels
143 .orderbook_tx
144 .try_send(orderbook)
145 .map_err(|e| anyhow::anyhow!("Failed to send orderbook update: {e:?}"))?;
146 }
147 1002 => {
148 let trade = self.parse_trade(&buffer[8..])?;
149 channels
150 .trade_tx
151 .try_send(trade)
152 .map_err(|e| anyhow::anyhow!("Failed to send trade message: {e:?}"))?;
153 }
154 _ => return Err(anyhow::anyhow!("Unknown message type: {}", header.msg_type)),
155 }
156
157 Ok(())
158 }
159
160 pub fn parse_orderbook(&self, data: &[u8]) -> Result<OrderBook> {
162 let update = ZeroCopyOrderBookUpdate::<10>::from_bytes(data)
163 .ok_or_else(|| anyhow::anyhow!("Failed to parse orderbook update"))?;
164
165 let symbol = self
167 .config
168 .symbol_map
169 .get(&update.symbol_id)
170 .ok_or_else(|| anyhow::anyhow!("Unknown symbol ID: {}", update.symbol_id))?
171 .clone();
172
173 let bids: SmallVec<[PriceLevel; 64]> = update
175 .valid_bids()
176 .iter()
177 .map(|level| self.convert_price_level(level))
178 .collect();
179
180 let asks: SmallVec<[PriceLevel; 64]> = update
182 .valid_asks()
183 .iter()
184 .map(|level| self.convert_price_level(level))
185 .collect();
186
187 Ok(OrderBook::new(
189 symbol,
190 update.timestamp_ns,
191 quanta::Clock::new().raw(),
192 bids,
193 asks,
194 ))
195 }
196
197 pub fn parse_trade(&self, data: &[u8]) -> Result<MarketTrade> {
199 let trade = ZeroCopyTrade::from_bytes(data)
200 .ok_or_else(|| anyhow::anyhow!("Failed to parse trade"))?;
201
202 let symbol = self
204 .config
205 .symbol_map
206 .get(&trade.symbol_id)
207 .ok_or_else(|| anyhow::anyhow!("Unknown symbol ID: {}", trade.symbol_id))?;
208
209 Ok(MarketTrade {
211 timestamp: quanta::Instant::now(),
212 exchange_time_ns: trade.timestamp_ns,
213 price: trade.to_decimal_price(self.config.price_decimals),
214 quantity: trade.to_decimal_quantity(self.config.quantity_decimals),
215 direction: if trade.side == 0 {
216 rusty_model::enums::OrderSide::Buy
217 } else {
218 rusty_model::enums::OrderSide::Sell
219 },
220 instrument_id: rusty_model::instruments::InstrumentId {
221 symbol: symbol.clone(),
222 venue: Venue::Binance, },
224 })
225 }
226
227 fn convert_price_level(&self, level: &ZeroCopyPriceLevel) -> PriceLevel {
229 PriceLevel::new(
230 level.to_decimal_price(self.config.price_decimals),
231 level.to_decimal_quantity(self.config.quantity_decimals),
232 )
233 }
234}
235
236#[derive(Debug, Clone, Copy, PartialEq, Eq)]
238pub enum MessageType {
239 OrderBook,
241 Trade,
243}
244
245trait TradeDecimalConversions {
247 fn to_decimal_price(&self, decimals: u32) -> Decimal;
248 fn to_decimal_quantity(&self, decimals: u32) -> Decimal;
249}
250
251impl TradeDecimalConversions for ZeroCopyTrade {
252 fn to_decimal_price(&self, decimals: u32) -> Decimal {
253 Decimal::from_i128_with_scale(self.price as i128, decimals)
254 }
255
256 fn to_decimal_quantity(&self, decimals: u32) -> Decimal {
257 Decimal::from_i128_with_scale(self.quantity as i128, decimals)
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264 use rusty_model::data::zero_copy::MessageHeader;
265
266 #[test]
267 fn test_channel_based_parsing() {
268 let mut symbol_map = FxHashMap::default();
270 symbol_map.insert(42, String::from("BTC-USD"));
271 symbol_map.insert(43, String::from("ETH-USD"));
272
273 let config = ZeroCopyConfig {
274 price_decimals: 8,
275 quantity_decimals: 8,
276 symbol_map,
277 };
278
279 let adapter = ZeroCopyAdapter::new(config);
280
281 let (channels, receivers) = MarketDataChannels::new(100);
283
284 {
286 let mut buffer = vec![0u8; 8 + std::mem::size_of::<ZeroCopyOrderBookUpdate>()];
287
288 let header = MessageHeader {
290 length: buffer.len() as u32,
291 msg_type: 1001,
292 version: 1,
293 flags: 0,
294 };
295 buffer[..8].copy_from_slice(header.as_bytes());
296
297 let mut update = ZeroCopyOrderBookUpdate::<10>::new_zeroed();
299 update.symbol_id = 42;
300 update.sequence = 12345;
301 update.timestamp_ns = 1_000_000_000;
302 update.bid_count = 1;
303 update.ask_count = 1;
304
305 update.bids[0] = ZeroCopyPriceLevel {
306 price: 50000_00000000,
307 quantity: 1_00000000,
308 };
309 update.asks[0] = ZeroCopyPriceLevel {
310 price: 50001_00000000,
311 quantity: 1_50000000,
312 };
313
314 buffer[8..].copy_from_slice(update.as_bytes());
315
316 adapter.parse_and_send(&buffer, &channels).unwrap();
318
319 let orderbook = receivers.orderbook_rx.try_recv().unwrap();
321 assert_eq!(orderbook.symbol, "BTC-USD");
322 assert_eq!(orderbook.bids.len(), 1);
323 assert_eq!(orderbook.asks.len(), 1);
324 }
325
326 {
328 let mut buffer = vec![0u8; 8 + std::mem::size_of::<ZeroCopyTrade>()];
329
330 let header = MessageHeader {
332 length: buffer.len() as u32,
333 msg_type: 1002,
334 version: 1,
335 flags: 0,
336 };
337 buffer[..8].copy_from_slice(header.as_bytes());
338
339 let trade = ZeroCopyTrade {
341 symbol_id: 43,
342 side: 1, trade_id: 123456,
344 timestamp_ns: 2_000_000_000,
345 price: 3000_00000000,
346 quantity: 5_00000000,
347 };
348 buffer[8..].copy_from_slice(trade.as_bytes());
349
350 adapter.parse_and_send(&buffer, &channels).unwrap();
352
353 let trade = receivers.trade_rx.try_recv().unwrap();
355 assert_eq!(trade.instrument_id.symbol, "ETH-USD");
356 assert_eq!(trade.price.normalize().to_string(), "3000");
357 assert_eq!(trade.quantity.normalize().to_string(), "5");
358 assert_eq!(trade.direction, rusty_model::enums::OrderSide::Sell);
359 }
360
361 assert!(receivers.orderbook_rx.is_empty());
363 assert!(receivers.trade_rx.is_empty());
364 }
365
366 #[test]
367 fn test_direct_parsing_methods() {
368 let mut symbol_map = FxHashMap::default();
370 symbol_map.insert(42, String::from("BTC-USD"));
371
372 let config = ZeroCopyConfig {
373 price_decimals: 8,
374 quantity_decimals: 8,
375 symbol_map,
376 };
377
378 let adapter = ZeroCopyAdapter::new(config);
379
380 {
382 let mut update = ZeroCopyOrderBookUpdate::<10>::new_zeroed();
383 update.symbol_id = 42;
384 update.sequence = 12345;
385 update.timestamp_ns = 1_000_000_000;
386 update.bid_count = 1;
387 update.ask_count = 1;
388
389 update.bids[0] = ZeroCopyPriceLevel {
390 price: 50000_00000000,
391 quantity: 1_00000000,
392 };
393 update.asks[0] = ZeroCopyPriceLevel {
394 price: 50001_00000000,
395 quantity: 1_50000000,
396 };
397
398 let orderbook = adapter.parse_orderbook(update.as_bytes()).unwrap();
399 assert_eq!(orderbook.symbol, "BTC-USD");
400 assert_eq!(orderbook.bids[0].price.normalize().to_string(), "50000");
401 }
402
403 {
405 let trade = ZeroCopyTrade {
406 symbol_id: 42,
407 side: 0, trade_id: 999999,
409 timestamp_ns: 2_000_000_000,
410 price: 51000_00000000,
411 quantity: 2_00000000,
412 };
413
414 let market_trade = adapter.parse_trade(trade.as_bytes()).unwrap();
415 assert_eq!(market_trade.instrument_id.symbol, "BTC-USD");
416 assert_eq!(market_trade.price.normalize().to_string(), "51000");
417 assert_eq!(market_trade.direction, rusty_model::enums::OrderSide::Buy);
418 }
419 }
420
421 #[test]
422 fn test_message_type_method() {
423 let config = ZeroCopyConfig {
424 price_decimals: 8,
425 quantity_decimals: 8,
426 symbol_map: FxHashMap::default(),
427 };
428 let adapter = ZeroCopyAdapter::new(config);
429
430 let mut header = MessageHeader {
432 length: 100,
433 msg_type: 1001,
434 version: 1,
435 flags: 0,
436 };
437 let mut buffer = vec![0u8; 8];
438 buffer.copy_from_slice(header.as_bytes());
439
440 let msg_type = adapter.message_type(&buffer).unwrap();
441 assert_eq!(msg_type, MessageType::OrderBook);
442
443 header.msg_type = 1002;
445 buffer.copy_from_slice(header.as_bytes());
446
447 let msg_type = adapter.message_type(&buffer).unwrap();
448 assert_eq!(msg_type, MessageType::Trade);
449
450 header.msg_type = 9999;
452 buffer.copy_from_slice(header.as_bytes());
453
454 let result = adapter.message_type(&buffer);
455 assert!(result.is_err());
456 assert!(
457 result
458 .unwrap_err()
459 .to_string()
460 .contains("Unknown message type: 9999")
461 );
462 }
463
464 #[test]
465 fn test_incomplete_message_handling() {
466 let config = ZeroCopyConfig {
467 price_decimals: 8,
468 quantity_decimals: 8,
469 symbol_map: FxHashMap::default(),
470 };
471 let adapter = ZeroCopyAdapter::new(config);
472 let (channels, _) = MarketDataChannels::new(100);
473
474 let small_buffer = vec![0u8; 4];
476 let result = adapter.parse_and_send(&small_buffer, &channels);
477 assert!(result.is_err());
478 assert!(
479 result
480 .unwrap_err()
481 .to_string()
482 .contains("Buffer too small for message header")
483 );
484
485 let header = MessageHeader {
487 length: 100, msg_type: 1001,
489 version: 1,
490 flags: 0,
491 };
492 let mut buffer = vec![0u8; 50]; buffer[..8].copy_from_slice(header.as_bytes());
494
495 let result = adapter.parse_and_send(&buffer, &channels);
496 assert!(result.is_err());
497 assert!(
498 result
499 .unwrap_err()
500 .to_string()
501 .contains("Incomplete message")
502 );
503 }
504
505 #[test]
506 fn test_unknown_symbol_handling() {
507 let config = ZeroCopyConfig {
508 price_decimals: 8,
509 quantity_decimals: 8,
510 symbol_map: FxHashMap::default(), };
512 let adapter = ZeroCopyAdapter::new(config);
513 let (channels, _) = MarketDataChannels::new(100);
514
515 let mut buffer = vec![0u8; 8 + std::mem::size_of::<ZeroCopyOrderBookUpdate>()];
517
518 let header = MessageHeader {
519 length: buffer.len() as u32,
520 msg_type: 1001,
521 version: 1,
522 flags: 0,
523 };
524 buffer[..8].copy_from_slice(header.as_bytes());
525
526 let mut update = ZeroCopyOrderBookUpdate::<10>::new_zeroed();
527 update.symbol_id = 999; buffer[8..].copy_from_slice(update.as_bytes());
529
530 let result = adapter.parse_and_send(&buffer, &channels);
531 assert!(result.is_err());
532 assert!(
533 result
534 .unwrap_err()
535 .to_string()
536 .contains("Unknown symbol ID: 999")
537 );
538 }
539
540 #[test]
541 fn test_channel_disconnected_handling() {
542 let config = ZeroCopyConfig {
543 price_decimals: 8,
544 quantity_decimals: 8,
545 symbol_map: FxHashMap::from_iter([(42, String::from("BTC-USD"))]),
546 };
547 let adapter = ZeroCopyAdapter::new(config);
548
549 let (channels, receivers) = MarketDataChannels::new(1);
551 drop(receivers);
552
553 let mut buffer = vec![0u8; 8 + std::mem::size_of::<ZeroCopyOrderBookUpdate>()];
555
556 let header = MessageHeader {
557 length: buffer.len() as u32,
558 msg_type: 1001,
559 version: 1,
560 flags: 0,
561 };
562 buffer[..8].copy_from_slice(header.as_bytes());
563
564 let mut update = ZeroCopyOrderBookUpdate::<10>::new_zeroed();
565 update.symbol_id = 42;
566 buffer[8..].copy_from_slice(update.as_bytes());
567
568 let result = adapter.parse_and_send(&buffer, &channels);
569 assert!(result.is_err());
570 assert!(
571 result
572 .unwrap_err()
573 .to_string()
574 .contains("Failed to send orderbook update")
575 );
576 }
577
578 #[test]
579 fn test_channel_full_behavior() {
580 let config = ZeroCopyConfig {
581 price_decimals: 8,
582 quantity_decimals: 8,
583 symbol_map: FxHashMap::from_iter([(42, String::from("BTC-USD"))]),
584 };
585 let adapter = ZeroCopyAdapter::new(config);
586
587 let (channels, receivers) = MarketDataChannels::new(1);
589
590 let mut buffer = vec![0u8; 8 + std::mem::size_of::<ZeroCopyOrderBookUpdate>()];
592
593 let header = MessageHeader {
594 length: buffer.len() as u32,
595 msg_type: 1001,
596 version: 1,
597 flags: 0,
598 };
599 buffer[..8].copy_from_slice(header.as_bytes());
600
601 let mut update = ZeroCopyOrderBookUpdate::<10>::new_zeroed();
602 update.symbol_id = 42;
603 update.sequence = 1;
604 buffer[8..].copy_from_slice(update.as_bytes());
605
606 let result = adapter.parse_and_send(&buffer, &channels);
607 assert!(result.is_ok());
608
609 update.sequence = 2;
611 buffer[8..].copy_from_slice(update.as_bytes());
612
613 let result = adapter.parse_and_send(&buffer, &channels);
614 assert!(result.is_err());
615 assert!(
616 result
617 .unwrap_err()
618 .to_string()
619 .contains("Failed to send orderbook update")
620 );
621
622 let _ = receivers.orderbook_rx.try_recv();
624
625 let result = adapter.parse_and_send(&buffer, &channels);
627 assert!(result.is_ok());
628 }
629
630 #[test]
631 fn test_price_quantity_conversion() {
632 let mut symbol_map = FxHashMap::default();
633 symbol_map.insert(42, String::from("BTC-USD"));
634
635 let config = ZeroCopyConfig {
636 price_decimals: 8,
637 quantity_decimals: 8,
638 symbol_map,
639 };
640 let adapter = ZeroCopyAdapter::new(config);
641 let (channels, receivers) = MarketDataChannels::new(100);
642
643 let mut buffer = vec![0u8; 8 + std::mem::size_of::<ZeroCopyOrderBookUpdate>()];
645
646 let header = MessageHeader {
647 length: buffer.len() as u32,
648 msg_type: 1001,
649 version: 1,
650 flags: 0,
651 };
652 buffer[..8].copy_from_slice(header.as_bytes());
653
654 let mut update = ZeroCopyOrderBookUpdate::<10>::new_zeroed();
655 update.symbol_id = 42;
656 update.sequence = 12345;
657 update.timestamp_ns = 1_000_000_000;
658 update.bid_count = 2;
659 update.ask_count = 2;
660
661 update.bids[0] = ZeroCopyPriceLevel {
663 price: 50000_00000000, quantity: 1_00000000, };
666 update.bids[1] = ZeroCopyPriceLevel {
667 price: 49999_50000000, quantity: 2_50000000, };
670 update.asks[0] = ZeroCopyPriceLevel {
671 price: 50001_00000000, quantity: 50000000, };
674 update.asks[1] = ZeroCopyPriceLevel {
675 price: 50002_12345678, quantity: 10_00000000, };
678
679 buffer[8..].copy_from_slice(update.as_bytes());
680
681 adapter.parse_and_send(&buffer, &channels).unwrap();
682
683 let orderbook = receivers.orderbook_rx.try_recv().unwrap();
684
685 assert_eq!(orderbook.bids[0].price.normalize().to_string(), "50000");
687 assert_eq!(orderbook.bids[0].quantity.normalize().to_string(), "1");
688 assert_eq!(orderbook.bids[1].price.normalize().to_string(), "49999.5");
689 assert_eq!(orderbook.bids[1].quantity.normalize().to_string(), "2.5");
690
691 assert_eq!(orderbook.asks[0].price.normalize().to_string(), "50001");
693 assert_eq!(orderbook.asks[0].quantity.normalize().to_string(), "0.5");
694 assert_eq!(
695 orderbook.asks[1].price.normalize().to_string(),
696 "50002.12345678"
697 );
698 assert_eq!(orderbook.asks[1].quantity.normalize().to_string(), "10");
699 }
700
701 #[test]
702 fn test_const_fn_constructor_limitation() {
703 let mut symbol_map = FxHashMap::default();
709 symbol_map.insert(42, String::from("BTC-USD"));
710
711 let config = ZeroCopyConfig {
712 price_decimals: 8,
713 quantity_decimals: 8,
714 symbol_map,
715 };
716 let _adapter = ZeroCopyAdapter::new(config);
717
718 let mut config = ZeroCopyConfig::new_empty(8, 8);
741 config.add_symbol(42, String::from("BTC-USD"));
742 let _adapter2 = ZeroCopyAdapter::new(config);
743 }
744}