rusty_common/
zero_alloc_message.rs

1//! Zero-allocation message processing for WebSocket exchanges
2//!
3//! This module provides concrete implementations showing how to process
4//! WebSocket messages from exchanges with zero allocations using borrowed values.
5
6use crate::SmartString;
7use crate::error::{CommonError, Result};
8use crate::memory::hft_pools::{HftBufferHandle, OrderHandle, TradeHandle, global_hft_pools};
9use crate::zerocopy::{BorrowedValueExt, WebSocketJsonZeroCopy};
10use log::warn;
11use rust_decimal::Decimal;
12use simd_json::BorrowedValue;
13use smallvec::SmallVec;
14
15/// Zero-allocation order update processing
16pub struct ZeroAllocOrderUpdate<'a> {
17    /// The parsed JSON value (borrowed)
18    value: BorrowedValue<'a>,
19}
20
21impl<'a> ZeroAllocOrderUpdate<'a> {
22    /// Parse an order update message with zero allocations
23    pub fn parse(message_bytes: &'a mut [u8]) -> Result<Self> {
24        let value = WebSocketJsonZeroCopy::parse_to_borrowed(message_bytes)?;
25        Ok(Self { value })
26    }
27
28    /// Get the message type without allocation
29    #[inline]
30    pub fn message_type(&self) -> Option<&str> {
31        self.value.get_str("type")
32    }
33
34    /// Get the order ID without allocation
35    #[inline]
36    pub fn order_id(&self) -> Option<&str> {
37        self.value.get_str("order_id")
38    }
39
40    /// Get the client order ID without allocation
41    #[inline]
42    pub fn client_order_id(&self) -> Option<&str> {
43        self.value
44            .get_str("client_order_id")
45            .or_else(|| self.value.get_str("client_oid"))
46            .or_else(|| self.value.get_str("clOrdId"))
47    }
48
49    /// Get the symbol without allocation
50    #[inline]
51    pub fn symbol(&self) -> Option<&str> {
52        self.value
53            .get_str("symbol")
54            .or_else(|| self.value.get_str("product_id"))
55            .or_else(|| self.value.get_str("instrument"))
56    }
57
58    /// Get the price as string without allocation
59    #[inline]
60    pub fn price_str(&self) -> Option<&str> {
61        self.value.get_str("price")
62    }
63
64    /// Get the quantity as string without allocation
65    #[inline]
66    pub fn quantity_str(&self) -> Option<&str> {
67        self.value
68            .get_str("quantity")
69            .or_else(|| self.value.get_str("size"))
70            .or_else(|| self.value.get_str("amount"))
71    }
72
73    /// Get the order status without allocation
74    #[inline]
75    pub fn status(&self) -> Option<&str> {
76        self.value
77            .get_str("status")
78            .or_else(|| self.value.get_str("order_status"))
79    }
80
81    /// Get the side without allocation
82    #[inline]
83    pub fn side(&self) -> Option<&str> {
84        self.value.get_str("side")
85    }
86
87    /// Get the timestamp as u64 without allocation
88    #[inline]
89    pub fn timestamp(&self) -> Option<u64> {
90        self.value
91            .get_u64("timestamp")
92            .or_else(|| self.value.get_u64("time"))
93    }
94
95    /// Convert to owned values only when necessary
96    pub fn to_owned_values(&self) -> OrderUpdateOwned {
97        OrderUpdateOwned {
98            order_id: self.order_id().map(SmartString::from),
99            client_order_id: self.client_order_id().map(SmartString::from),
100            symbol: self.symbol().map(SmartString::from),
101            price: self.price_str().and_then(|s| {
102                Decimal::from_str_exact(s)
103                    .map_err(|e| {
104                        warn!("Failed to parse price '{s}': {e}");
105                        e
106                    })
107                    .ok()
108            }),
109            quantity: self.quantity_str().and_then(|s| {
110                Decimal::from_str_exact(s)
111                    .map_err(|e| {
112                        warn!("Failed to parse quantity '{s}': {e}");
113                        e
114                    })
115                    .ok()
116            }),
117            status: self.status().map(SmartString::from),
118            side: self.side().map(SmartString::from),
119            timestamp: self.timestamp(),
120        }
121    }
122}
123
124/// Owned version of order update (only allocate when necessary)
125#[derive(Debug, Clone)]
126pub struct OrderUpdateOwned {
127    /// The order ID.
128    pub order_id: Option<SmartString>,
129    /// The client order ID.
130    pub client_order_id: Option<SmartString>,
131    /// The trading symbol.
132    pub symbol: Option<SmartString>,
133    /// The order price.
134    pub price: Option<Decimal>,
135    /// The order quantity.
136    pub quantity: Option<Decimal>,
137    /// The order status.
138    pub status: Option<SmartString>,
139    /// The order side.
140    pub side: Option<SmartString>,
141    /// The timestamp of the order.
142    pub timestamp: Option<u64>,
143}
144
145/// Zero-allocation market data processing
146pub struct ZeroAllocMarketData<'a> {
147    /// The parsed JSON value (borrowed)
148    value: BorrowedValue<'a>,
149}
150
151impl<'a> ZeroAllocMarketData<'a> {
152    /// Parse a market data message with zero allocations
153    pub fn parse(message_bytes: &'a mut [u8]) -> Result<Self> {
154        let value = WebSocketJsonZeroCopy::parse_to_borrowed(message_bytes)?;
155        Ok(Self { value })
156    }
157
158    /// Get the message type without allocation
159    #[inline]
160    pub fn message_type(&self) -> Option<&str> {
161        self.value
162            .get_str("type")
163            .or_else(|| self.value.get_str("e")) // Binance uses "e" for event type
164    }
165
166    /// Get the symbol without allocation
167    #[inline]
168    pub fn symbol(&self) -> Option<&str> {
169        self.value
170            .get_str("symbol")
171            .or_else(|| self.value.get_str("s")) // Binance uses "s" for symbol
172            .or_else(|| self.value.get_str("product_id"))
173    }
174
175    /// Get the event time without allocation
176    #[inline]
177    pub fn event_time(&self) -> Option<u64> {
178        self.value
179            .get_u64("E") // Binance event time
180            .or_else(|| self.value.get_u64("T")) // Trade time
181            .or_else(|| self.value.get_u64("timestamp"))
182    }
183
184    /// Get bid price for ticker updates
185    #[inline]
186    pub fn bid_price_str(&self) -> Option<&str> {
187        self.value
188            .get_str("b") // Binance bid price
189            .or_else(|| self.value.get_str("best_bid"))
190    }
191
192    /// Get ask price for ticker updates
193    #[inline]
194    pub fn ask_price_str(&self) -> Option<&str> {
195        self.value
196            .get_str("a") // Binance ask price
197            .or_else(|| self.value.get_str("best_ask"))
198    }
199
200    /// Get last price for ticker updates
201    #[inline]
202    pub fn last_price_str(&self) -> Option<&str> {
203        self.value
204            .get_str("c") // Binance close/last price
205            .or_else(|| self.value.get_str("p")) // Price
206            .or_else(|| self.value.get_str("price"))
207    }
208
209    /// Get volume for ticker updates
210    #[inline]
211    pub fn volume_str(&self) -> Option<&str> {
212        self.value
213            .get_str("v") // Binance volume
214            .or_else(|| self.value.get_str("q")) // Quote volume
215            .or_else(|| self.value.get_str("volume"))
216    }
217
218    /// Check if this is a snapshot or update
219    #[inline]
220    pub fn is_snapshot(&self) -> bool {
221        self.message_type()
222            .map(|t| t == "snapshot" || t == "depthUpdate")
223            .unwrap_or(false)
224    }
225}
226
227/// Stack buffer size for small JSON messages (4KB on stack)
228const MESSAGE_BUFFER_SIZE: usize = 4096;
229
230/// Stack-allocated buffer for small messages
231pub type MessageBuffer = SmallVec<[u8; MESSAGE_BUFFER_SIZE]>;
232
233/// Zero-allocation WebSocket message processor with buffer reuse
234pub struct ZeroAllocMessageProcessor {
235    /// Reusable buffer for parsing - stays on stack for small messages
236    buffer: MessageBuffer,
237    /// Pool buffer handle for large messages
238    pool_buffer: Option<HftBufferHandle>,
239}
240
241impl Default for ZeroAllocMessageProcessor {
242    fn default() -> Self {
243        Self::new()
244    }
245}
246
247impl ZeroAllocMessageProcessor {
248    /// Create a new zero-allocation message processor
249    #[must_use]
250    pub fn new() -> Self {
251        Self {
252            buffer: MessageBuffer::new(),
253            pool_buffer: None,
254        }
255    }
256
257    /// Process JSON message with zero heap allocations for small messages
258    ///
259    /// Uses SmallVec for messages under 4KB (stack allocation),
260    /// falls back to pooled buffers for larger messages.
261    /// Note: This borrows from the internal buffer, so cannot call other methods
262    /// on the processor while the returned value is alive.
263    #[inline]
264    fn process_json_bytes_internal(&mut self, data: &[u8]) -> Result<BorrowedValue<'_>> {
265        // Clear previous buffer content
266        self.buffer.clear();
267
268        // For small messages, use stack-allocated SmallVec
269        if data.len() <= MESSAGE_BUFFER_SIZE {
270            self.buffer.extend_from_slice(data);
271            // Parse in-place using borrowed parsing
272            WebSocketJsonZeroCopy::parse_to_borrowed(&mut self.buffer)
273        } else {
274            // For large messages, use pooled buffer
275            if self.pool_buffer.is_none() {
276                self.pool_buffer = Some(global_hft_pools().acquire_buffer());
277            }
278
279            let pool_buf = self.pool_buffer.as_mut().unwrap();
280            pool_buf.clear();
281            pool_buf.extend_from_slice(data);
282
283            WebSocketJsonZeroCopy::parse_to_borrowed(pool_buf.as_mut_slice())
284        }
285    }
286
287    /// Parse order message with pooled object recycling
288    ///
289    /// Returns a pooled order handle that automatically returns to pool on drop.
290    #[inline]
291    pub fn parse_order_to_pooled(&mut self, data: &[u8]) -> Result<OrderHandle> {
292        let json_value = self.process_json_bytes_internal(data)?;
293        let mut order = global_hft_pools().acquire_order();
294
295        // Parse order fields using borrowed JSON values (no string allocations)
296        if let Some(order_id) = json_value.get_str("order_id") {
297            order.order_id = SmartString::from(order_id);
298        }
299
300        if let Some(client_order_id) = json_value
301            .get_str("client_order_id")
302            .or_else(|| json_value.get_str("client_oid"))
303            .or_else(|| json_value.get_str("clOrdId"))
304        {
305            order.client_order_id = SmartString::from(client_order_id);
306        }
307
308        if let Some(symbol) = json_value
309            .get_str("symbol")
310            .or_else(|| json_value.get_str("product_id"))
311            .or_else(|| json_value.get_str("instrument"))
312        {
313            order.symbol = SmartString::from(symbol);
314        }
315
316        if let Some(side) = json_value.get_str("side") {
317            order.side = SmartString::from(side);
318        }
319
320        if let Some(order_type) = json_value
321            .get_str("type")
322            .or_else(|| json_value.get_str("order_type"))
323        {
324            order.order_type = SmartString::from(order_type);
325        }
326
327        if let Some(status) = json_value
328            .get_str("status")
329            .or_else(|| json_value.get_str("order_status"))
330        {
331            order.status = SmartString::from(status);
332        }
333
334        // Parse decimal fields safely - return error if critical fields are invalid
335        if let Some(price_str) = json_value.get_str("price") {
336            match Decimal::from_str_exact(price_str) {
337                Ok(price) => order.price = price,
338                Err(e) => {
339                    warn!("Failed to parse order price '{price_str}': {e}");
340                    return Err(CommonError::Parse(
341                        format!("Invalid price format: {price_str}").into(),
342                    ));
343                }
344            }
345        }
346
347        if let Some(quantity_str) = json_value
348            .get_str("quantity")
349            .or_else(|| json_value.get_str("size"))
350            .or_else(|| json_value.get_str("amount"))
351        {
352            match Decimal::from_str_exact(quantity_str) {
353                Ok(quantity) => order.quantity = quantity,
354                Err(e) => {
355                    warn!("Failed to parse order quantity '{quantity_str}': {e}");
356                    return Err(CommonError::Parse(
357                        format!("Invalid quantity format: {quantity_str}").into(),
358                    ));
359                }
360            }
361        }
362
363        if let Some(filled_str) = json_value
364            .get_str("filled_quantity")
365            .or_else(|| json_value.get_str("filled_size"))
366            .or_else(|| json_value.get_str("executed_qty"))
367        {
368            match Decimal::from_str_exact(filled_str) {
369                Ok(filled) => order.filled_quantity = filled,
370                Err(e) => warn!("Failed to parse order filled quantity '{filled_str}': {e}"),
371            }
372        }
373
374        // Parse timestamps
375        if let Some(ts) = json_value
376            .get_u64("timestamp")
377            .or_else(|| json_value.get_u64("time"))
378        {
379            order.timestamp = ts;
380        }
381
382        if let Some(ex_ts) = json_value
383            .get_u64("exchange_timestamp")
384            .or_else(|| json_value.get_u64("exchange_time"))
385            .or_else(|| json_value.get_u64("T"))
386        {
387            // Binance timestamp
388            order.exchange_timestamp = ex_ts;
389        }
390
391        // Calculate remaining quantity
392        order.remaining_quantity = order.quantity - order.filled_quantity;
393
394        Ok(order)
395    }
396
397    /// Parse trade message with pooled object recycling
398    ///
399    /// Returns a pooled trade handle that automatically returns to pool on drop.
400    #[inline]
401    pub fn parse_trade_to_pooled(&mut self, data: &[u8]) -> Result<TradeHandle> {
402        let json_value = self.process_json_bytes_internal(data)?;
403        let mut trade = global_hft_pools().acquire_trade();
404
405        // Parse trade fields using borrowed JSON values (no string allocations)
406        if let Some(trade_id) = json_value
407            .get_str("trade_id")
408            .or_else(|| json_value.get_str("t"))
409        {
410            // Binance trade ID
411            trade.trade_id = SmartString::from(trade_id);
412        }
413
414        if let Some(order_id) = json_value
415            .get_str("order_id")
416            .or_else(|| json_value.get_str("o"))
417        {
418            trade.order_id = SmartString::from(order_id);
419        }
420
421        if let Some(symbol) = json_value
422            .get_str("symbol")
423            .or_else(|| json_value.get_str("s")) // Binance symbol
424            .or_else(|| json_value.get_str("product_id"))
425        {
426            trade.symbol = SmartString::from(symbol);
427        }
428
429        if let Some(side) = json_value
430            .get_str("side")
431            .or_else(|| json_value.get_str("S"))
432        {
433            // Binance side
434            trade.side = SmartString::from(side);
435        }
436
437        // Parse decimal fields safely - return error if critical fields are invalid
438        if let Some(price_str) = json_value
439            .get_str("price")
440            .or_else(|| json_value.get_str("p"))
441        {
442            // Binance price
443            match Decimal::from_str_exact(price_str) {
444                Ok(price) => trade.price = price,
445                Err(e) => {
446                    warn!("Failed to parse trade price '{price_str}': {e}");
447                    return Err(CommonError::Parse(
448                        format!("Invalid price format: {price_str}").into(),
449                    ));
450                }
451            }
452        }
453
454        if let Some(quantity_str) = json_value
455            .get_str("quantity")
456            .or_else(|| json_value.get_str("q")) // Binance quantity
457            .or_else(|| json_value.get_str("size"))
458        {
459            match Decimal::from_str_exact(quantity_str) {
460                Ok(quantity) => trade.quantity = quantity,
461                Err(e) => {
462                    warn!("Failed to parse trade quantity '{quantity_str}': {e}");
463                    return Err(CommonError::Parse(
464                        format!("Invalid quantity format: {quantity_str}").into(),
465                    ));
466                }
467            }
468        }
469
470        // Parse timestamps
471        if let Some(ts) = json_value
472            .get_u64("timestamp")
473            .or_else(|| json_value.get_u64("time"))
474            .or_else(|| json_value.get_u64("T"))
475        {
476            // Binance trade time
477            trade.timestamp = ts;
478        }
479
480        if let Some(ex_ts) = json_value
481            .get_u64("exchange_timestamp")
482            .or_else(|| json_value.get_u64("E"))
483        {
484            // Binance event time
485            trade.exchange_timestamp = ex_ts;
486        }
487
488        // Parse maker flag
489        if let Some(is_maker) = json_value
490            .get_bool("is_maker")
491            .or_else(|| json_value.get_bool("m"))
492        {
493            // Binance maker flag
494            trade.is_maker = is_maker;
495        }
496
497        Ok(trade)
498    }
499
500    /// Reset the processor for reuse
501    ///
502    /// Clears internal buffers while keeping allocated capacity.
503    #[inline]
504    pub fn reset(&mut self) {
505        self.buffer.clear();
506        if let Some(ref mut pool_buf) = self.pool_buffer {
507            pool_buf.clear();
508        }
509    }
510
511    /// Get buffer statistics for monitoring
512    #[inline]
513    pub fn buffer_stats(&self) -> BufferStats {
514        BufferStats {
515            stack_buffer_len: self.buffer.len(),
516            stack_buffer_capacity: self.buffer.capacity(),
517            stack_buffer_spilled: self.buffer.spilled(),
518            pool_buffer_active: self.pool_buffer.is_some(),
519            pool_buffer_len: self.pool_buffer.as_ref().map(|b| b.len()).unwrap_or(0),
520        }
521    }
522
523    /// Process a message and determine its type without allocation
524    pub fn identify_message_type(&mut self, message_bytes: &[u8]) -> Result<MessageType> {
525        let value = self.process_json_bytes_internal(message_bytes)?;
526
527        // Check common type fields
528        if let Some(msg_type) = value.get_str("type") {
529            Ok(match msg_type {
530                "heartbeat" => MessageType::Heartbeat,
531                "error" => MessageType::Error,
532                "order" | "execution" | "fill" => MessageType::OrderUpdate,
533                "ticker" | "trade" | "quote" => MessageType::MarketData,
534                "snapshot" | "l2update" | "depthUpdate" => MessageType::OrderBook,
535                _ => MessageType::Unknown,
536            })
537        } else if let Some(event_type) = value.get_str("e") {
538            // Binance format
539            Ok(match event_type {
540                "executionReport" => MessageType::OrderUpdate,
541                "24hrTicker" | "trade" => MessageType::MarketData,
542                "depthUpdate" => MessageType::OrderBook,
543                _ => MessageType::Unknown,
544            })
545        } else {
546            Ok(MessageType::Unknown)
547        }
548    }
549
550    /// Extract error message without allocation
551    #[inline]
552    pub fn extract_error_message(&mut self, data: &[u8]) -> Result<Option<SmartString>> {
553        let value = self.process_json_bytes_internal(data)?;
554        Ok(value
555            .get_str("message")
556            .or_else(|| value.get_str("msg"))
557            .or_else(|| value.get_str("error"))
558            .map(SmartString::from))
559    }
560
561    /// Extract request ID without allocation
562    #[inline]
563    pub fn extract_request_id(&mut self, data: &[u8]) -> Result<Option<SmartString>> {
564        let value = self.process_json_bytes_internal(data)?;
565        Ok(value
566            .get_str("id")
567            .or_else(|| value.get_str("request_id"))
568            .or_else(|| value.get_str("req_id"))
569            .map(SmartString::from))
570    }
571
572    /// Extract string field without allocation using borrowed parsing
573    #[inline]
574    pub fn extract_string_field(
575        &mut self,
576        data: &[u8],
577        field: &str,
578    ) -> Result<Option<SmartString>> {
579        let value = self.process_json_bytes_internal(data)?;
580        Ok(value.get_str(field).map(SmartString::from))
581    }
582
583    /// Extract decimal field without string allocation
584    #[inline]
585    pub fn extract_decimal_field(&mut self, data: &[u8], field: &str) -> Result<Option<Decimal>> {
586        let value = self.process_json_bytes_internal(data)?;
587
588        if let Some(str_val) = value.get_str(field) {
589            match Decimal::from_str_exact(str_val) {
590                Ok(decimal) => return Ok(Some(decimal)),
591                Err(e) => {
592                    warn!("Failed to parse decimal field '{field}' with value '{str_val}': {e}");
593                    return Err(e.into());
594                }
595            }
596        } else if let Some(num_val) = value.get_f64(field) {
597            match Decimal::try_from(num_val) {
598                Ok(decimal) => return Ok(Some(decimal)),
599                Err(e) => {
600                    warn!(
601                        "Failed to convert f64 field '{field}' with value '{num_val}' to Decimal: {e}"
602                    );
603                    return Err(e.into());
604                }
605            }
606        }
607
608        Ok(None)
609    }
610
611    /// Extract u64 field without allocation
612    #[inline]
613    pub fn extract_u64_field(&mut self, data: &[u8], field: &str) -> Result<Option<u64>> {
614        let value = self.process_json_bytes_internal(data)?;
615        Ok(value.get_u64(field))
616    }
617
618    /// Extract boolean field without allocation
619    #[inline]
620    pub fn extract_bool_field(&mut self, data: &[u8], field: &str) -> Result<Option<bool>> {
621        let value = self.process_json_bytes_internal(data)?;
622        Ok(value.get_bool(field))
623    }
624}
625
626/// Message type classification
627#[derive(Debug, Clone, Copy, PartialEq, Eq)]
628pub enum MessageType {
629    /// A heartbeat message.
630    Heartbeat,
631    /// An error message.
632    Error,
633    /// An order update message.
634    OrderUpdate,
635    /// A market data message.
636    MarketData,
637    /// An order book message.
638    OrderBook,
639    /// An unknown message type.
640    Unknown,
641}
642
643/// Buffer usage statistics for monitoring performance
644#[derive(Debug, Clone, Copy)]
645pub struct BufferStats {
646    /// The length of the stack buffer.
647    pub stack_buffer_len: usize,
648    /// The capacity of the stack buffer.
649    pub stack_buffer_capacity: usize,
650    /// Whether the stack buffer has spilled to the heap.
651    pub stack_buffer_spilled: bool,
652    /// Whether a pool buffer is active.
653    pub pool_buffer_active: bool,
654    /// The length of the pool buffer.
655    pub pool_buffer_len: usize,
656}
657
658// Thread-local zero-allocation message processor
659thread_local! {
660    static LOCAL_PROCESSOR: std::cell::RefCell<ZeroAllocMessageProcessor> =
661        std::cell::RefCell::new(ZeroAllocMessageProcessor::new());
662}
663
664/// Access thread-local zero-allocation message processor
665///
666/// This avoids the overhead of creating new processors for each message.
667pub fn with_zero_alloc_processor<F, R>(f: F) -> R
668where
669    F: FnOnce(&mut ZeroAllocMessageProcessor) -> R,
670{
671    LOCAL_PROCESSOR.with(|processor| f(&mut processor.borrow_mut()))
672}
673
674/// Convenience function for zero-allocation order parsing
675#[inline]
676pub fn parse_order_zero_alloc(data: &[u8]) -> Result<OrderHandle> {
677    with_zero_alloc_processor(|processor| processor.parse_order_to_pooled(data))
678}
679
680/// Convenience function for zero-allocation trade parsing
681#[inline]
682pub fn parse_trade_zero_alloc(data: &[u8]) -> Result<TradeHandle> {
683    with_zero_alloc_processor(|processor| processor.parse_trade_to_pooled(data))
684}
685
686/// Convenience function for zero-allocation JSON field extraction
687#[inline]
688pub fn extract_string_field_zero_alloc(data: &[u8], field: &str) -> Result<Option<SmartString>> {
689    with_zero_alloc_processor(|processor| processor.extract_string_field(data, field))
690}
691
692/// Convenience function for zero-allocation message type identification
693#[inline]
694pub fn identify_message_type_zero_alloc(data: &[u8]) -> Result<MessageType> {
695    with_zero_alloc_processor(|processor| processor.identify_message_type(data))
696}
697
698/// Example usage in WebSocket handler
699pub async fn example_zero_alloc_handler(message: &mut [u8]) -> Result<()> {
700    // Use thread-local processor for best performance
701    with_zero_alloc_processor(|processor| {
702        // Identify message type with zero allocations
703        let msg_type = processor.identify_message_type(message)?;
704
705        match msg_type {
706            MessageType::OrderUpdate => {
707                // Process order update with zero allocations and pooled objects
708                let order_update = ZeroAllocOrderUpdate::parse(message)?;
709
710                // Access fields without allocation
711                if let Some(order_id) = order_update.order_id() {
712                    // Use borrowed string directly
713                    println!("Order ID: {order_id}");
714                }
715
716                // Use pooled objects for storage when needed
717                if order_update.status() == Some("filled") {
718                    // Note: Cannot use both order_update and processor simultaneously
719                    // due to lifetime constraints. In practice, choose one approach.
720                    // For example, drop order_update and parse with pooled object:
721                    drop(order_update);
722                    let _pooled_order = processor.parse_order_to_pooled(message)?;
723                    // Process with pooled object...
724                }
725            }
726            MessageType::MarketData => {
727                // Process market data with zero allocations
728                let market_data = ZeroAllocMarketData::parse(message)?;
729
730                // Use borrowed values directly
731                if let Some(symbol) = market_data.symbol()
732                    && let Some(price) = market_data.last_price_str()
733                {
734                    // Process without allocation
735                    println!("{symbol}: {price}");
736                }
737
738                // Or parse as trade with pooled object if needed
739                if market_data.message_type() == Some("trade") {
740                    // Note: Cannot use both market_data and processor simultaneously
741                    // due to lifetime constraints. Drop market_data first:
742                    drop(market_data);
743                    let _pooled_trade = processor.parse_trade_to_pooled(message)?;
744                    // Process with pooled object...
745                }
746            }
747            _ => {
748                // Handle other message types
749            }
750        }
751
752        Ok(())
753    })
754}
755
756#[cfg(test)]
757mod tests {
758    use super::*;
759
760    #[test]
761    fn test_zero_alloc_order_update() {
762        let mut json = r#"{"type":"order","order_id":"123","symbol":"BTC-USDT","price":"50000.5","quantity":"0.1","status":"new","side":"buy","timestamp":1234567890}"#.to_string();
763        let bytes = unsafe { json.as_bytes_mut() };
764
765        let update = ZeroAllocOrderUpdate::parse(bytes).unwrap();
766        assert_eq!(update.message_type(), Some("order"));
767        assert_eq!(update.order_id(), Some("123"));
768        assert_eq!(update.symbol(), Some("BTC-USDT"));
769        assert_eq!(update.price_str(), Some("50000.5"));
770        assert_eq!(update.quantity_str(), Some("0.1"));
771        assert_eq!(update.status(), Some("new"));
772        assert_eq!(update.side(), Some("buy"));
773        assert_eq!(update.timestamp(), Some(1234567890));
774    }
775
776    #[test]
777    fn test_zero_alloc_market_data() {
778        let mut json = r#"{"e":"24hrTicker","s":"BTCUSDT","b":"49999.0","a":"50001.0","c":"50000.0","v":"1000.5","E":1234567890}"#.to_string();
779        let bytes = unsafe { json.as_bytes_mut() };
780
781        let data = ZeroAllocMarketData::parse(bytes).unwrap();
782        assert_eq!(data.message_type(), Some("24hrTicker"));
783        assert_eq!(data.symbol(), Some("BTCUSDT"));
784        assert_eq!(data.bid_price_str(), Some("49999.0"));
785        assert_eq!(data.ask_price_str(), Some("50001.0"));
786        assert_eq!(data.last_price_str(), Some("50000.0"));
787        assert_eq!(data.volume_str(), Some("1000.5"));
788        assert_eq!(data.event_time(), Some(1234567890));
789    }
790
791    #[test]
792    fn test_pooled_order_parsing() {
793        let order_json = r#"{
794            "order_id": "ORDER123",
795            "client_order_id": "CLIENT456",
796            "symbol": "BTC-USDT",
797            "side": "buy",
798            "type": "limit",
799            "status": "open",
800            "price": "50000.00",
801            "quantity": "0.001",
802            "filled_quantity": "0.0005",
803            "timestamp": 1640995200000,
804            "exchange_timestamp": 1640995200100
805        }"#;
806
807        let mut processor = ZeroAllocMessageProcessor::new();
808        let order = processor
809            .parse_order_to_pooled(order_json.as_bytes())
810            .unwrap();
811
812        assert_eq!(order.order_id.as_str(), "ORDER123");
813        assert_eq!(order.symbol.as_str(), "BTC-USDT");
814        assert_eq!(order.side.as_str(), "buy");
815        assert_eq!(order.price, Decimal::from_str_exact("50000.00").unwrap());
816        assert_eq!(order.quantity, Decimal::from_str_exact("0.001").unwrap());
817        assert_eq!(
818            order.filled_quantity,
819            Decimal::from_str_exact("0.0005").unwrap()
820        );
821        assert_eq!(
822            order.remaining_quantity,
823            Decimal::from_str_exact("0.0005").unwrap()
824        );
825        assert_eq!(order.timestamp, 1640995200000);
826    }
827
828    #[test]
829    fn test_pooled_trade_parsing() {
830        let trade_json = r#"{
831            "trade_id": "TRADE789",
832            "order_id": "ORDER123",
833            "symbol": "ETH-USDT",
834            "side": "sell",
835            "price": "3000.50",
836            "quantity": "0.1",
837            "timestamp": 1640995200000,
838            "exchange_timestamp": 1640995200050,
839            "is_maker": true
840        }"#;
841
842        let mut processor = ZeroAllocMessageProcessor::new();
843        let trade = processor
844            .parse_trade_to_pooled(trade_json.as_bytes())
845            .unwrap();
846
847        assert_eq!(trade.trade_id.as_str(), "TRADE789");
848        assert_eq!(trade.symbol.as_str(), "ETH-USDT");
849        assert_eq!(trade.side.as_str(), "sell");
850        assert_eq!(trade.price, Decimal::from_str_exact("3000.50").unwrap());
851        assert_eq!(trade.quantity, Decimal::from_str_exact("0.1").unwrap());
852        assert!(trade.is_maker);
853    }
854
855    #[test]
856    fn test_small_message_stack_allocation() {
857        let mut processor = ZeroAllocMessageProcessor::new();
858
859        // Small JSON message should stay on stack
860        let small_json = r#"{"id":1,"price":100.0}"#;
861        {
862            let _result = processor
863                .process_json_bytes_internal(small_json.as_bytes())
864                .unwrap();
865            // _result drops here, releasing the borrow
866        }
867
868        let stats = processor.buffer_stats();
869        assert!(!stats.stack_buffer_spilled);
870        assert!(!stats.pool_buffer_active);
871    }
872
873    #[test]
874    fn test_large_message_pool_allocation() {
875        let mut processor = ZeroAllocMessageProcessor::new();
876
877        // Large JSON message should use pool buffer
878        let large_json = format!(r#"{{"data":"{}"}}"#, "x".repeat(5000));
879        {
880            let _result = processor
881                .process_json_bytes_internal(large_json.as_bytes())
882                .unwrap();
883            // _result drops here, releasing the borrow
884        }
885
886        let stats = processor.buffer_stats();
887        assert!(stats.pool_buffer_active);
888    }
889
890    #[test]
891    fn test_thread_local_processor() {
892        let order_json = r#"{
893            "order_id": "TL123",
894            "symbol": "BTC-USDT",
895            "side": "buy",
896            "price": "40000.00",
897            "quantity": "0.01"
898        }"#;
899
900        let order = parse_order_zero_alloc(order_json.as_bytes()).unwrap();
901        assert_eq!(order.order_id.as_str(), "TL123");
902        assert_eq!(order.symbol.as_str(), "BTC-USDT");
903    }
904
905    #[test]
906    fn test_buffer_reuse() {
907        let mut processor = ZeroAllocMessageProcessor::new();
908
909        // Process first message
910        let json1 = r#"{"id":1,"value":"test1"}"#;
911        {
912            let _result1 = processor
913                .process_json_bytes_internal(json1.as_bytes())
914                .unwrap();
915            // _result1 drops here
916        }
917
918        // Process second message - should reuse buffer
919        let json2 = r#"{"id":2,"value":"test2"}"#;
920        {
921            let _result2 = processor
922                .process_json_bytes_internal(json2.as_bytes())
923                .unwrap();
924            // _result2 drops here
925        }
926
927        // Buffer should maintain capacity from previous use
928        let stats = processor.buffer_stats();
929        assert!(stats.stack_buffer_capacity >= json1.len().max(json2.len()));
930    }
931
932    #[test]
933    fn test_message_type_identification() {
934        let test_cases = vec![
935            (r#"{"type":"heartbeat"}"#, MessageType::Heartbeat),
936            (
937                r#"{"type":"error","message":"Invalid request"}"#,
938                MessageType::Error,
939            ),
940            (
941                r#"{"type":"order","order_id":"123"}"#,
942                MessageType::OrderUpdate,
943            ),
944            (r#"{"e":"executionReport"}"#, MessageType::OrderUpdate),
945            (r#"{"type":"ticker"}"#, MessageType::MarketData),
946            (r#"{"type":"snapshot"}"#, MessageType::OrderBook),
947            (r#"{"e":"depthUpdate"}"#, MessageType::OrderBook),
948            (r#"{"unknown":"field"}"#, MessageType::Unknown),
949        ];
950
951        let mut processor = ZeroAllocMessageProcessor::new();
952        for (json, expected) in test_cases {
953            let msg_type = processor.identify_message_type(json.as_bytes()).unwrap();
954            assert_eq!(msg_type, expected, "Failed for JSON: {json}");
955        }
956    }
957}