1use crate::SmartString;
7use crate::error::{CommonError, Result};
8use crate::memory::hft_pools::{HftBufferHandle, OrderHandle, TradeHandle, global_hft_pools};
9use crate::zerocopy::{BorrowedValueExt, WebSocketJsonZeroCopy};
10use log::warn;
11use rust_decimal::Decimal;
12use simd_json::BorrowedValue;
13use smallvec::SmallVec;
14
15pub struct ZeroAllocOrderUpdate<'a> {
17 value: BorrowedValue<'a>,
19}
20
21impl<'a> ZeroAllocOrderUpdate<'a> {
22 pub fn parse(message_bytes: &'a mut [u8]) -> Result<Self> {
24 let value = WebSocketJsonZeroCopy::parse_to_borrowed(message_bytes)?;
25 Ok(Self { value })
26 }
27
28 #[inline]
30 pub fn message_type(&self) -> Option<&str> {
31 self.value.get_str("type")
32 }
33
34 #[inline]
36 pub fn order_id(&self) -> Option<&str> {
37 self.value.get_str("order_id")
38 }
39
40 #[inline]
42 pub fn client_order_id(&self) -> Option<&str> {
43 self.value
44 .get_str("client_order_id")
45 .or_else(|| self.value.get_str("client_oid"))
46 .or_else(|| self.value.get_str("clOrdId"))
47 }
48
49 #[inline]
51 pub fn symbol(&self) -> Option<&str> {
52 self.value
53 .get_str("symbol")
54 .or_else(|| self.value.get_str("product_id"))
55 .or_else(|| self.value.get_str("instrument"))
56 }
57
58 #[inline]
60 pub fn price_str(&self) -> Option<&str> {
61 self.value.get_str("price")
62 }
63
64 #[inline]
66 pub fn quantity_str(&self) -> Option<&str> {
67 self.value
68 .get_str("quantity")
69 .or_else(|| self.value.get_str("size"))
70 .or_else(|| self.value.get_str("amount"))
71 }
72
73 #[inline]
75 pub fn status(&self) -> Option<&str> {
76 self.value
77 .get_str("status")
78 .or_else(|| self.value.get_str("order_status"))
79 }
80
81 #[inline]
83 pub fn side(&self) -> Option<&str> {
84 self.value.get_str("side")
85 }
86
87 #[inline]
89 pub fn timestamp(&self) -> Option<u64> {
90 self.value
91 .get_u64("timestamp")
92 .or_else(|| self.value.get_u64("time"))
93 }
94
95 pub fn to_owned_values(&self) -> OrderUpdateOwned {
97 OrderUpdateOwned {
98 order_id: self.order_id().map(SmartString::from),
99 client_order_id: self.client_order_id().map(SmartString::from),
100 symbol: self.symbol().map(SmartString::from),
101 price: self.price_str().and_then(|s| {
102 Decimal::from_str_exact(s)
103 .map_err(|e| {
104 warn!("Failed to parse price '{s}': {e}");
105 e
106 })
107 .ok()
108 }),
109 quantity: self.quantity_str().and_then(|s| {
110 Decimal::from_str_exact(s)
111 .map_err(|e| {
112 warn!("Failed to parse quantity '{s}': {e}");
113 e
114 })
115 .ok()
116 }),
117 status: self.status().map(SmartString::from),
118 side: self.side().map(SmartString::from),
119 timestamp: self.timestamp(),
120 }
121 }
122}
123
124#[derive(Debug, Clone)]
126pub struct OrderUpdateOwned {
127 pub order_id: Option<SmartString>,
129 pub client_order_id: Option<SmartString>,
131 pub symbol: Option<SmartString>,
133 pub price: Option<Decimal>,
135 pub quantity: Option<Decimal>,
137 pub status: Option<SmartString>,
139 pub side: Option<SmartString>,
141 pub timestamp: Option<u64>,
143}
144
145pub struct ZeroAllocMarketData<'a> {
147 value: BorrowedValue<'a>,
149}
150
151impl<'a> ZeroAllocMarketData<'a> {
152 pub fn parse(message_bytes: &'a mut [u8]) -> Result<Self> {
154 let value = WebSocketJsonZeroCopy::parse_to_borrowed(message_bytes)?;
155 Ok(Self { value })
156 }
157
158 #[inline]
160 pub fn message_type(&self) -> Option<&str> {
161 self.value
162 .get_str("type")
163 .or_else(|| self.value.get_str("e")) }
165
166 #[inline]
168 pub fn symbol(&self) -> Option<&str> {
169 self.value
170 .get_str("symbol")
171 .or_else(|| self.value.get_str("s")) .or_else(|| self.value.get_str("product_id"))
173 }
174
175 #[inline]
177 pub fn event_time(&self) -> Option<u64> {
178 self.value
179 .get_u64("E") .or_else(|| self.value.get_u64("T")) .or_else(|| self.value.get_u64("timestamp"))
182 }
183
184 #[inline]
186 pub fn bid_price_str(&self) -> Option<&str> {
187 self.value
188 .get_str("b") .or_else(|| self.value.get_str("best_bid"))
190 }
191
192 #[inline]
194 pub fn ask_price_str(&self) -> Option<&str> {
195 self.value
196 .get_str("a") .or_else(|| self.value.get_str("best_ask"))
198 }
199
200 #[inline]
202 pub fn last_price_str(&self) -> Option<&str> {
203 self.value
204 .get_str("c") .or_else(|| self.value.get_str("p")) .or_else(|| self.value.get_str("price"))
207 }
208
209 #[inline]
211 pub fn volume_str(&self) -> Option<&str> {
212 self.value
213 .get_str("v") .or_else(|| self.value.get_str("q")) .or_else(|| self.value.get_str("volume"))
216 }
217
218 #[inline]
220 pub fn is_snapshot(&self) -> bool {
221 self.message_type()
222 .map(|t| t == "snapshot" || t == "depthUpdate")
223 .unwrap_or(false)
224 }
225}
226
227const MESSAGE_BUFFER_SIZE: usize = 4096;
229
230pub type MessageBuffer = SmallVec<[u8; MESSAGE_BUFFER_SIZE]>;
232
233pub struct ZeroAllocMessageProcessor {
235 buffer: MessageBuffer,
237 pool_buffer: Option<HftBufferHandle>,
239}
240
241impl Default for ZeroAllocMessageProcessor {
242 fn default() -> Self {
243 Self::new()
244 }
245}
246
247impl ZeroAllocMessageProcessor {
248 #[must_use]
250 pub fn new() -> Self {
251 Self {
252 buffer: MessageBuffer::new(),
253 pool_buffer: None,
254 }
255 }
256
257 #[inline]
264 fn process_json_bytes_internal(&mut self, data: &[u8]) -> Result<BorrowedValue<'_>> {
265 self.buffer.clear();
267
268 if data.len() <= MESSAGE_BUFFER_SIZE {
270 self.buffer.extend_from_slice(data);
271 WebSocketJsonZeroCopy::parse_to_borrowed(&mut self.buffer)
273 } else {
274 if self.pool_buffer.is_none() {
276 self.pool_buffer = Some(global_hft_pools().acquire_buffer());
277 }
278
279 let pool_buf = self.pool_buffer.as_mut().unwrap();
280 pool_buf.clear();
281 pool_buf.extend_from_slice(data);
282
283 WebSocketJsonZeroCopy::parse_to_borrowed(pool_buf.as_mut_slice())
284 }
285 }
286
287 #[inline]
291 pub fn parse_order_to_pooled(&mut self, data: &[u8]) -> Result<OrderHandle> {
292 let json_value = self.process_json_bytes_internal(data)?;
293 let mut order = global_hft_pools().acquire_order();
294
295 if let Some(order_id) = json_value.get_str("order_id") {
297 order.order_id = SmartString::from(order_id);
298 }
299
300 if let Some(client_order_id) = json_value
301 .get_str("client_order_id")
302 .or_else(|| json_value.get_str("client_oid"))
303 .or_else(|| json_value.get_str("clOrdId"))
304 {
305 order.client_order_id = SmartString::from(client_order_id);
306 }
307
308 if let Some(symbol) = json_value
309 .get_str("symbol")
310 .or_else(|| json_value.get_str("product_id"))
311 .or_else(|| json_value.get_str("instrument"))
312 {
313 order.symbol = SmartString::from(symbol);
314 }
315
316 if let Some(side) = json_value.get_str("side") {
317 order.side = SmartString::from(side);
318 }
319
320 if let Some(order_type) = json_value
321 .get_str("type")
322 .or_else(|| json_value.get_str("order_type"))
323 {
324 order.order_type = SmartString::from(order_type);
325 }
326
327 if let Some(status) = json_value
328 .get_str("status")
329 .or_else(|| json_value.get_str("order_status"))
330 {
331 order.status = SmartString::from(status);
332 }
333
334 if let Some(price_str) = json_value.get_str("price") {
336 match Decimal::from_str_exact(price_str) {
337 Ok(price) => order.price = price,
338 Err(e) => {
339 warn!("Failed to parse order price '{price_str}': {e}");
340 return Err(CommonError::Parse(
341 format!("Invalid price format: {price_str}").into(),
342 ));
343 }
344 }
345 }
346
347 if let Some(quantity_str) = json_value
348 .get_str("quantity")
349 .or_else(|| json_value.get_str("size"))
350 .or_else(|| json_value.get_str("amount"))
351 {
352 match Decimal::from_str_exact(quantity_str) {
353 Ok(quantity) => order.quantity = quantity,
354 Err(e) => {
355 warn!("Failed to parse order quantity '{quantity_str}': {e}");
356 return Err(CommonError::Parse(
357 format!("Invalid quantity format: {quantity_str}").into(),
358 ));
359 }
360 }
361 }
362
363 if let Some(filled_str) = json_value
364 .get_str("filled_quantity")
365 .or_else(|| json_value.get_str("filled_size"))
366 .or_else(|| json_value.get_str("executed_qty"))
367 {
368 match Decimal::from_str_exact(filled_str) {
369 Ok(filled) => order.filled_quantity = filled,
370 Err(e) => warn!("Failed to parse order filled quantity '{filled_str}': {e}"),
371 }
372 }
373
374 if let Some(ts) = json_value
376 .get_u64("timestamp")
377 .or_else(|| json_value.get_u64("time"))
378 {
379 order.timestamp = ts;
380 }
381
382 if let Some(ex_ts) = json_value
383 .get_u64("exchange_timestamp")
384 .or_else(|| json_value.get_u64("exchange_time"))
385 .or_else(|| json_value.get_u64("T"))
386 {
387 order.exchange_timestamp = ex_ts;
389 }
390
391 order.remaining_quantity = order.quantity - order.filled_quantity;
393
394 Ok(order)
395 }
396
397 #[inline]
401 pub fn parse_trade_to_pooled(&mut self, data: &[u8]) -> Result<TradeHandle> {
402 let json_value = self.process_json_bytes_internal(data)?;
403 let mut trade = global_hft_pools().acquire_trade();
404
405 if let Some(trade_id) = json_value
407 .get_str("trade_id")
408 .or_else(|| json_value.get_str("t"))
409 {
410 trade.trade_id = SmartString::from(trade_id);
412 }
413
414 if let Some(order_id) = json_value
415 .get_str("order_id")
416 .or_else(|| json_value.get_str("o"))
417 {
418 trade.order_id = SmartString::from(order_id);
419 }
420
421 if let Some(symbol) = json_value
422 .get_str("symbol")
423 .or_else(|| json_value.get_str("s")) .or_else(|| json_value.get_str("product_id"))
425 {
426 trade.symbol = SmartString::from(symbol);
427 }
428
429 if let Some(side) = json_value
430 .get_str("side")
431 .or_else(|| json_value.get_str("S"))
432 {
433 trade.side = SmartString::from(side);
435 }
436
437 if let Some(price_str) = json_value
439 .get_str("price")
440 .or_else(|| json_value.get_str("p"))
441 {
442 match Decimal::from_str_exact(price_str) {
444 Ok(price) => trade.price = price,
445 Err(e) => {
446 warn!("Failed to parse trade price '{price_str}': {e}");
447 return Err(CommonError::Parse(
448 format!("Invalid price format: {price_str}").into(),
449 ));
450 }
451 }
452 }
453
454 if let Some(quantity_str) = json_value
455 .get_str("quantity")
456 .or_else(|| json_value.get_str("q")) .or_else(|| json_value.get_str("size"))
458 {
459 match Decimal::from_str_exact(quantity_str) {
460 Ok(quantity) => trade.quantity = quantity,
461 Err(e) => {
462 warn!("Failed to parse trade quantity '{quantity_str}': {e}");
463 return Err(CommonError::Parse(
464 format!("Invalid quantity format: {quantity_str}").into(),
465 ));
466 }
467 }
468 }
469
470 if let Some(ts) = json_value
472 .get_u64("timestamp")
473 .or_else(|| json_value.get_u64("time"))
474 .or_else(|| json_value.get_u64("T"))
475 {
476 trade.timestamp = ts;
478 }
479
480 if let Some(ex_ts) = json_value
481 .get_u64("exchange_timestamp")
482 .or_else(|| json_value.get_u64("E"))
483 {
484 trade.exchange_timestamp = ex_ts;
486 }
487
488 if let Some(is_maker) = json_value
490 .get_bool("is_maker")
491 .or_else(|| json_value.get_bool("m"))
492 {
493 trade.is_maker = is_maker;
495 }
496
497 Ok(trade)
498 }
499
500 #[inline]
504 pub fn reset(&mut self) {
505 self.buffer.clear();
506 if let Some(ref mut pool_buf) = self.pool_buffer {
507 pool_buf.clear();
508 }
509 }
510
511 #[inline]
513 pub fn buffer_stats(&self) -> BufferStats {
514 BufferStats {
515 stack_buffer_len: self.buffer.len(),
516 stack_buffer_capacity: self.buffer.capacity(),
517 stack_buffer_spilled: self.buffer.spilled(),
518 pool_buffer_active: self.pool_buffer.is_some(),
519 pool_buffer_len: self.pool_buffer.as_ref().map(|b| b.len()).unwrap_or(0),
520 }
521 }
522
523 pub fn identify_message_type(&mut self, message_bytes: &[u8]) -> Result<MessageType> {
525 let value = self.process_json_bytes_internal(message_bytes)?;
526
527 if let Some(msg_type) = value.get_str("type") {
529 Ok(match msg_type {
530 "heartbeat" => MessageType::Heartbeat,
531 "error" => MessageType::Error,
532 "order" | "execution" | "fill" => MessageType::OrderUpdate,
533 "ticker" | "trade" | "quote" => MessageType::MarketData,
534 "snapshot" | "l2update" | "depthUpdate" => MessageType::OrderBook,
535 _ => MessageType::Unknown,
536 })
537 } else if let Some(event_type) = value.get_str("e") {
538 Ok(match event_type {
540 "executionReport" => MessageType::OrderUpdate,
541 "24hrTicker" | "trade" => MessageType::MarketData,
542 "depthUpdate" => MessageType::OrderBook,
543 _ => MessageType::Unknown,
544 })
545 } else {
546 Ok(MessageType::Unknown)
547 }
548 }
549
550 #[inline]
552 pub fn extract_error_message(&mut self, data: &[u8]) -> Result<Option<SmartString>> {
553 let value = self.process_json_bytes_internal(data)?;
554 Ok(value
555 .get_str("message")
556 .or_else(|| value.get_str("msg"))
557 .or_else(|| value.get_str("error"))
558 .map(SmartString::from))
559 }
560
561 #[inline]
563 pub fn extract_request_id(&mut self, data: &[u8]) -> Result<Option<SmartString>> {
564 let value = self.process_json_bytes_internal(data)?;
565 Ok(value
566 .get_str("id")
567 .or_else(|| value.get_str("request_id"))
568 .or_else(|| value.get_str("req_id"))
569 .map(SmartString::from))
570 }
571
572 #[inline]
574 pub fn extract_string_field(
575 &mut self,
576 data: &[u8],
577 field: &str,
578 ) -> Result<Option<SmartString>> {
579 let value = self.process_json_bytes_internal(data)?;
580 Ok(value.get_str(field).map(SmartString::from))
581 }
582
583 #[inline]
585 pub fn extract_decimal_field(&mut self, data: &[u8], field: &str) -> Result<Option<Decimal>> {
586 let value = self.process_json_bytes_internal(data)?;
587
588 if let Some(str_val) = value.get_str(field) {
589 match Decimal::from_str_exact(str_val) {
590 Ok(decimal) => return Ok(Some(decimal)),
591 Err(e) => {
592 warn!("Failed to parse decimal field '{field}' with value '{str_val}': {e}");
593 return Err(e.into());
594 }
595 }
596 } else if let Some(num_val) = value.get_f64(field) {
597 match Decimal::try_from(num_val) {
598 Ok(decimal) => return Ok(Some(decimal)),
599 Err(e) => {
600 warn!(
601 "Failed to convert f64 field '{field}' with value '{num_val}' to Decimal: {e}"
602 );
603 return Err(e.into());
604 }
605 }
606 }
607
608 Ok(None)
609 }
610
611 #[inline]
613 pub fn extract_u64_field(&mut self, data: &[u8], field: &str) -> Result<Option<u64>> {
614 let value = self.process_json_bytes_internal(data)?;
615 Ok(value.get_u64(field))
616 }
617
618 #[inline]
620 pub fn extract_bool_field(&mut self, data: &[u8], field: &str) -> Result<Option<bool>> {
621 let value = self.process_json_bytes_internal(data)?;
622 Ok(value.get_bool(field))
623 }
624}
625
626#[derive(Debug, Clone, Copy, PartialEq, Eq)]
628pub enum MessageType {
629 Heartbeat,
631 Error,
633 OrderUpdate,
635 MarketData,
637 OrderBook,
639 Unknown,
641}
642
643#[derive(Debug, Clone, Copy)]
645pub struct BufferStats {
646 pub stack_buffer_len: usize,
648 pub stack_buffer_capacity: usize,
650 pub stack_buffer_spilled: bool,
652 pub pool_buffer_active: bool,
654 pub pool_buffer_len: usize,
656}
657
658thread_local! {
660 static LOCAL_PROCESSOR: std::cell::RefCell<ZeroAllocMessageProcessor> =
661 std::cell::RefCell::new(ZeroAllocMessageProcessor::new());
662}
663
664pub fn with_zero_alloc_processor<F, R>(f: F) -> R
668where
669 F: FnOnce(&mut ZeroAllocMessageProcessor) -> R,
670{
671 LOCAL_PROCESSOR.with(|processor| f(&mut processor.borrow_mut()))
672}
673
674#[inline]
676pub fn parse_order_zero_alloc(data: &[u8]) -> Result<OrderHandle> {
677 with_zero_alloc_processor(|processor| processor.parse_order_to_pooled(data))
678}
679
680#[inline]
682pub fn parse_trade_zero_alloc(data: &[u8]) -> Result<TradeHandle> {
683 with_zero_alloc_processor(|processor| processor.parse_trade_to_pooled(data))
684}
685
686#[inline]
688pub fn extract_string_field_zero_alloc(data: &[u8], field: &str) -> Result<Option<SmartString>> {
689 with_zero_alloc_processor(|processor| processor.extract_string_field(data, field))
690}
691
692#[inline]
694pub fn identify_message_type_zero_alloc(data: &[u8]) -> Result<MessageType> {
695 with_zero_alloc_processor(|processor| processor.identify_message_type(data))
696}
697
698pub async fn example_zero_alloc_handler(message: &mut [u8]) -> Result<()> {
700 with_zero_alloc_processor(|processor| {
702 let msg_type = processor.identify_message_type(message)?;
704
705 match msg_type {
706 MessageType::OrderUpdate => {
707 let order_update = ZeroAllocOrderUpdate::parse(message)?;
709
710 if let Some(order_id) = order_update.order_id() {
712 println!("Order ID: {order_id}");
714 }
715
716 if order_update.status() == Some("filled") {
718 drop(order_update);
722 let _pooled_order = processor.parse_order_to_pooled(message)?;
723 }
725 }
726 MessageType::MarketData => {
727 let market_data = ZeroAllocMarketData::parse(message)?;
729
730 if let Some(symbol) = market_data.symbol()
732 && let Some(price) = market_data.last_price_str()
733 {
734 println!("{symbol}: {price}");
736 }
737
738 if market_data.message_type() == Some("trade") {
740 drop(market_data);
743 let _pooled_trade = processor.parse_trade_to_pooled(message)?;
744 }
746 }
747 _ => {
748 }
750 }
751
752 Ok(())
753 })
754}
755
756#[cfg(test)]
757mod tests {
758 use super::*;
759
760 #[test]
761 fn test_zero_alloc_order_update() {
762 let mut json = r#"{"type":"order","order_id":"123","symbol":"BTC-USDT","price":"50000.5","quantity":"0.1","status":"new","side":"buy","timestamp":1234567890}"#.to_string();
763 let bytes = unsafe { json.as_bytes_mut() };
764
765 let update = ZeroAllocOrderUpdate::parse(bytes).unwrap();
766 assert_eq!(update.message_type(), Some("order"));
767 assert_eq!(update.order_id(), Some("123"));
768 assert_eq!(update.symbol(), Some("BTC-USDT"));
769 assert_eq!(update.price_str(), Some("50000.5"));
770 assert_eq!(update.quantity_str(), Some("0.1"));
771 assert_eq!(update.status(), Some("new"));
772 assert_eq!(update.side(), Some("buy"));
773 assert_eq!(update.timestamp(), Some(1234567890));
774 }
775
776 #[test]
777 fn test_zero_alloc_market_data() {
778 let mut json = r#"{"e":"24hrTicker","s":"BTCUSDT","b":"49999.0","a":"50001.0","c":"50000.0","v":"1000.5","E":1234567890}"#.to_string();
779 let bytes = unsafe { json.as_bytes_mut() };
780
781 let data = ZeroAllocMarketData::parse(bytes).unwrap();
782 assert_eq!(data.message_type(), Some("24hrTicker"));
783 assert_eq!(data.symbol(), Some("BTCUSDT"));
784 assert_eq!(data.bid_price_str(), Some("49999.0"));
785 assert_eq!(data.ask_price_str(), Some("50001.0"));
786 assert_eq!(data.last_price_str(), Some("50000.0"));
787 assert_eq!(data.volume_str(), Some("1000.5"));
788 assert_eq!(data.event_time(), Some(1234567890));
789 }
790
791 #[test]
792 fn test_pooled_order_parsing() {
793 let order_json = r#"{
794 "order_id": "ORDER123",
795 "client_order_id": "CLIENT456",
796 "symbol": "BTC-USDT",
797 "side": "buy",
798 "type": "limit",
799 "status": "open",
800 "price": "50000.00",
801 "quantity": "0.001",
802 "filled_quantity": "0.0005",
803 "timestamp": 1640995200000,
804 "exchange_timestamp": 1640995200100
805 }"#;
806
807 let mut processor = ZeroAllocMessageProcessor::new();
808 let order = processor
809 .parse_order_to_pooled(order_json.as_bytes())
810 .unwrap();
811
812 assert_eq!(order.order_id.as_str(), "ORDER123");
813 assert_eq!(order.symbol.as_str(), "BTC-USDT");
814 assert_eq!(order.side.as_str(), "buy");
815 assert_eq!(order.price, Decimal::from_str_exact("50000.00").unwrap());
816 assert_eq!(order.quantity, Decimal::from_str_exact("0.001").unwrap());
817 assert_eq!(
818 order.filled_quantity,
819 Decimal::from_str_exact("0.0005").unwrap()
820 );
821 assert_eq!(
822 order.remaining_quantity,
823 Decimal::from_str_exact("0.0005").unwrap()
824 );
825 assert_eq!(order.timestamp, 1640995200000);
826 }
827
828 #[test]
829 fn test_pooled_trade_parsing() {
830 let trade_json = r#"{
831 "trade_id": "TRADE789",
832 "order_id": "ORDER123",
833 "symbol": "ETH-USDT",
834 "side": "sell",
835 "price": "3000.50",
836 "quantity": "0.1",
837 "timestamp": 1640995200000,
838 "exchange_timestamp": 1640995200050,
839 "is_maker": true
840 }"#;
841
842 let mut processor = ZeroAllocMessageProcessor::new();
843 let trade = processor
844 .parse_trade_to_pooled(trade_json.as_bytes())
845 .unwrap();
846
847 assert_eq!(trade.trade_id.as_str(), "TRADE789");
848 assert_eq!(trade.symbol.as_str(), "ETH-USDT");
849 assert_eq!(trade.side.as_str(), "sell");
850 assert_eq!(trade.price, Decimal::from_str_exact("3000.50").unwrap());
851 assert_eq!(trade.quantity, Decimal::from_str_exact("0.1").unwrap());
852 assert!(trade.is_maker);
853 }
854
855 #[test]
856 fn test_small_message_stack_allocation() {
857 let mut processor = ZeroAllocMessageProcessor::new();
858
859 let small_json = r#"{"id":1,"price":100.0}"#;
861 {
862 let _result = processor
863 .process_json_bytes_internal(small_json.as_bytes())
864 .unwrap();
865 }
867
868 let stats = processor.buffer_stats();
869 assert!(!stats.stack_buffer_spilled);
870 assert!(!stats.pool_buffer_active);
871 }
872
873 #[test]
874 fn test_large_message_pool_allocation() {
875 let mut processor = ZeroAllocMessageProcessor::new();
876
877 let large_json = format!(r#"{{"data":"{}"}}"#, "x".repeat(5000));
879 {
880 let _result = processor
881 .process_json_bytes_internal(large_json.as_bytes())
882 .unwrap();
883 }
885
886 let stats = processor.buffer_stats();
887 assert!(stats.pool_buffer_active);
888 }
889
890 #[test]
891 fn test_thread_local_processor() {
892 let order_json = r#"{
893 "order_id": "TL123",
894 "symbol": "BTC-USDT",
895 "side": "buy",
896 "price": "40000.00",
897 "quantity": "0.01"
898 }"#;
899
900 let order = parse_order_zero_alloc(order_json.as_bytes()).unwrap();
901 assert_eq!(order.order_id.as_str(), "TL123");
902 assert_eq!(order.symbol.as_str(), "BTC-USDT");
903 }
904
905 #[test]
906 fn test_buffer_reuse() {
907 let mut processor = ZeroAllocMessageProcessor::new();
908
909 let json1 = r#"{"id":1,"value":"test1"}"#;
911 {
912 let _result1 = processor
913 .process_json_bytes_internal(json1.as_bytes())
914 .unwrap();
915 }
917
918 let json2 = r#"{"id":2,"value":"test2"}"#;
920 {
921 let _result2 = processor
922 .process_json_bytes_internal(json2.as_bytes())
923 .unwrap();
924 }
926
927 let stats = processor.buffer_stats();
929 assert!(stats.stack_buffer_capacity >= json1.len().max(json2.len()));
930 }
931
932 #[test]
933 fn test_message_type_identification() {
934 let test_cases = vec![
935 (r#"{"type":"heartbeat"}"#, MessageType::Heartbeat),
936 (
937 r#"{"type":"error","message":"Invalid request"}"#,
938 MessageType::Error,
939 ),
940 (
941 r#"{"type":"order","order_id":"123"}"#,
942 MessageType::OrderUpdate,
943 ),
944 (r#"{"e":"executionReport"}"#, MessageType::OrderUpdate),
945 (r#"{"type":"ticker"}"#, MessageType::MarketData),
946 (r#"{"type":"snapshot"}"#, MessageType::OrderBook),
947 (r#"{"e":"depthUpdate"}"#, MessageType::OrderBook),
948 (r#"{"unknown":"field"}"#, MessageType::Unknown),
949 ];
950
951 let mut processor = ZeroAllocMessageProcessor::new();
952 for (json, expected) in test_cases {
953 let msg_type = processor.identify_message_type(json.as_bytes()).unwrap();
954 assert_eq!(msg_type, expected, "Failed for JSON: {json}");
955 }
956 }
957}