rusty_bin/monitor/schema/
mod.rs

1//! Schema module for FlatBuffers serialization and deserialization
2//!
3//! This module provides high-level interfaces for working with trade and orderbook data
4//! using FlatBuffers for efficient serialization.
5//!
6//! ## Performance Optimizations
7//!
8//! This module implements several JSON parsing optimizations:
9//!
10//! 1. **Binary Decimal Serialization**: Instead of converting `Decimal` to `String` and back,
11//!    we use binary representation for FlatBuffers, eliminating the string conversion overhead.
12//!
13//! 2. **Fast Decimal Parsing**: Uses simd_json's optimized number parsing where possible,
14//!    falling back to standard parsing only for edge cases.
15//!
16//! 3. **Zero-Copy Operations**: Leverages FlatBuffers' zero-copy capabilities with binary data
17//!    instead of string-based serialization.
18//!
19//! These optimizations significantly improve performance for high-frequency trading scenarios
20//! where every microsecond matters.
21
22use planus::ReadAsRoot;
23use rust_decimal::Decimal;
24use simd_json;
25use simd_json::prelude::*;
26use smartstring::alias::String as SmartString;
27use std::io;
28use thiserror::Error;
29
30/// Optimized serialization utilities for efficient Decimal handling
31///
32/// This module provides high-performance decimal parsing and serialization functions.
33/// Designed for trading applications where decimal precision and performance are critical.
34pub mod decimal_optimized {
35    use rust_decimal::Decimal;
36    use smartstring::alias::String as SmartString;
37
38    /// Fast string to Decimal parser with optimized path for common cases
39    ///
40    /// Replaces the standard .parse() method for better performance.
41    /// Uses optimized parsing for common decimal patterns, falling back to standard parsing for edge cases.
42    ///
43    /// # Arguments
44    ///
45    /// * `s` - String slice to parse as decimal
46    ///
47    /// # Returns
48    ///
49    /// Returns `Ok(Decimal)` on success, or `Err(String)` with error message on failure.
50    pub fn fast_parse_decimal(s: &str) -> Result<Decimal, String> {
51        // Fast path for common decimal patterns using optimized parsing
52        match fast_decimal_from_str(s) {
53            Ok(decimal) => Ok(decimal),
54            Err(_) => {
55                // Fallback to standard parsing for edge cases
56                s.parse::<Decimal>()
57                    .map_err(|e| format!("Failed to parse decimal '{s}': {e}"))
58            }
59        }
60    }
61
62    /// Optimized Decimal to string conversion using SmartString for performance
63    ///
64    /// Converts a decimal to string using [`SmartString`] for better memory efficiency.
65    /// Optimized for short decimal strings common in trading scenarios.
66    ///
67    /// # Arguments
68    ///
69    /// * `decimal` - The decimal value to convert
70    ///
71    /// # Returns
72    ///
73    /// Returns a [`SmartString`] containing the decimal representation.
74    pub fn fast_decimal_to_string(decimal: Decimal) -> SmartString {
75        SmartString::from(decimal.to_string())
76    }
77
78    /// Fast decimal parsing for simple numeric strings
79    ///
80    /// Handles common trading decimal patterns efficiently using optimized parsing paths.
81    /// Tries integer parsing first, then float conversion for simple decimals.
82    ///
83    /// # Arguments
84    ///
85    /// * `s` - String slice to parse
86    ///
87    /// # Returns
88    ///
89    /// Returns `Ok(Decimal)` on success, or `Err(&'static str)` for complex patterns.
90    fn fast_decimal_from_str(s: &str) -> Result<Decimal, &'static str> {
91        // Handle empty string
92        if s.is_empty() {
93            return Err("Empty string");
94        }
95
96        // Fast path for integers (no decimal point)
97        if !s.contains('.')
98            && let Ok(int_val) = s.parse::<i64>()
99        {
100            return Ok(Decimal::from(int_val));
101        }
102
103        // Fast path for simple decimals using f64 conversion
104        if let Ok(float_val) = s.parse::<f64>()
105            && float_val.is_finite()
106        {
107            return Decimal::try_from(float_val).map_err(|_| "Float conversion failed");
108        }
109
110        Err("Complex decimal pattern")
111    }
112
113    /// Batch convert vector of decimal strings with SIMD-friendly processing
114    ///
115    /// Processes multiple decimal strings in a single operation for better performance.
116    /// Uses vectorized processing patterns that can benefit from SIMD optimizations.
117    ///
118    /// # Arguments
119    ///
120    /// * `strings` - Slice of string slices to parse as decimals
121    ///
122    /// # Returns
123    ///
124    /// Returns a vector of results, one for each input string.
125    pub fn batch_parse_decimals(strings: &[&str]) -> Vec<Result<Decimal, String>> {
126        strings.iter().map(|s| fast_parse_decimal(s)).collect()
127    }
128}
129
130/// Generated FlatBuffers schema definitions
131///
132/// This module contains the auto-generated Rust code from FlatBuffers schema files.
133/// It includes the compiled schema definitions for trade and orderbook data structures.
134#[allow(missing_docs)]
135pub mod generated;
136
137/// Represents the side of a trade transaction
138///
139/// This enum identifies whether a trade was initiated by a buyer or seller.
140/// Used in trade records to indicate market direction and flow.
141#[derive(Debug, Clone, Copy, PartialEq)]
142pub enum TradeSide {
143    /// Trade initiated by a buyer (market buy order)
144    Buy,
145    /// Trade initiated by a seller (market sell order)
146    Sell,
147}
148
149/// Errors that can occur during schema operations
150///
151/// Comprehensive error type for all schema-related operations including
152/// serialization, deserialization, and data validation.
153#[derive(Error, Debug)]
154pub enum SchemaError {
155    /// Error during serialization process
156    #[error("Serialization failed: {0}")]
157    Serialization(String),
158
159    /// Error during deserialization process
160    #[error("Deserialization failed: {0}")]
161    Deserialization(String),
162
163    /// Invalid or malformed data encountered
164    #[error("Invalid data: {0}")]
165    InvalidData(String),
166
167    /// I/O operation failed
168    #[error("IO error: {0}")]
169    Io(#[from] io::Error),
170
171    /// Feature or functionality not yet implemented
172    #[error("Not implemented: {0}")]
173    NotImplemented(String),
174}
175
176/// Result type alias for schema operations
177///
178/// Convenience type alias that uses [`SchemaError`] as the error type.
179/// Used throughout the module for consistent error handling.
180pub type Result<T> = std::result::Result<T, SchemaError>;
181
182/// High-level trade data structure for easier manipulation
183///
184/// Represents a single trade transaction with all relevant metadata.
185/// This structure is optimized for serialization and uses `SmartString` for performance.
186#[derive(Debug, Clone)]
187pub struct TradeRecord {
188    /// Timestamp when the trade occurred at the exchange (nanoseconds since epoch)
189    pub timestamp_exchange: u64,
190    /// Timestamp when the trade was processed by our system (nanoseconds since epoch)
191    pub timestamp_system: u64,
192    /// Trading pair symbol (e.g., "BTCUSDT")
193    pub symbol: SmartString,
194    /// Exchange identifier (e.g., "binance", "coinbase")
195    pub exchange: SmartString,
196    /// Trade execution price using high-precision decimal arithmetic
197    pub price: Decimal,
198    /// Trade quantity/volume using high-precision decimal arithmetic
199    pub quantity: Decimal,
200    /// Side of the trade (buy or sell)
201    pub side: TradeSide,
202    /// Unique identifier for this trade from the exchange
203    pub trade_id: SmartString,
204    /// Optional order ID of the buyer (if available from exchange)
205    pub buyer_order_id: Option<SmartString>,
206    /// Optional order ID of the seller (if available from exchange)
207    pub seller_order_id: Option<SmartString>,
208    /// Sequence number for ordering trades chronologically
209    pub sequence: u64,
210}
211
212/// High-level orderbook data structure for easier manipulation
213///
214/// Represents a complete order book snapshot with bid/ask levels.
215/// Contains all price levels and metadata for a trading pair at a specific time.
216#[derive(Debug, Clone)]
217pub struct OrderBookRecord {
218    /// Timestamp when the orderbook was captured at the exchange (nanoseconds since epoch)
219    pub timestamp_exchange: u64,
220    /// Timestamp when the orderbook was processed by our system (nanoseconds since epoch)
221    pub timestamp_system: u64,
222    /// Trading pair symbol (e.g., "BTCUSDT")
223    pub symbol: SmartString,
224    /// Exchange identifier (e.g., "binance", "coinbase")
225    pub exchange: SmartString,
226    /// Ordered list of bid price levels (highest to lowest)
227    pub bids: Vec<PriceLevel>,
228    /// Ordered list of ask price levels (lowest to highest)
229    pub asks: Vec<PriceLevel>,
230    /// Sequence number for ordering orderbook updates chronologically
231    pub sequence: u64,
232    /// Optional checksum for data integrity verification
233    pub checksum: Option<SmartString>,
234}
235
236/// High-level price level structure
237///
238/// Represents a single price level in an order book with price, quantity, and order count.
239/// Used for both bid and ask levels in orderbook snapshots.
240#[derive(Debug, Clone)]
241pub struct PriceLevel {
242    /// Price at this level using high-precision decimal arithmetic
243    pub price: Decimal,
244    /// Total quantity available at this price level
245    pub quantity: Decimal,
246    /// Number of individual orders at this price level (if available)
247    pub order_count: Option<u32>,
248}
249
250/// Batch of trade records for efficient processing
251///
252/// Groups multiple trade records together for batch operations.
253/// Useful for bulk serialization and processing of trade data.
254#[derive(Debug)]
255pub struct TradesBatchRecord {
256    /// Vector of trade records in this batch
257    pub trades: Vec<TradeRecord>,
258    /// Timestamp when this batch was created (nanoseconds since epoch)
259    pub batch_timestamp: u64,
260    /// Unique identifier for this batch
261    pub batch_id: SmartString,
262}
263
264/// Batch of orderbook records for efficient processing
265///
266/// Groups multiple orderbook records together for batch operations.
267/// Useful for bulk serialization and processing of orderbook data.
268#[derive(Debug)]
269pub struct OrderBookBatchRecord {
270    /// Vector of orderbook records in this batch
271    pub orderbooks: Vec<OrderBookRecord>,
272    /// Timestamp when this batch was created (nanoseconds since epoch)
273    pub batch_timestamp: u64,
274    /// Unique identifier for this batch
275    pub batch_id: SmartString,
276}
277
278/// Serialization utilities for trade data
279///
280/// Provides methods to serialize and deserialize trade records using FlatBuffers.
281/// Optimized for high-frequency trading scenarios with thread-local builders.
282pub struct TradeSerializer;
283
284thread_local! {
285    /// Thread-local builder for zero-allocation serialization
286    ///
287    /// Provides a reusable FlatBuffers builder per thread to avoid allocations.
288    /// Used by [`TradeSerializer::serialize_trade_into`] for optimal performance.
289    static TRADE_BUILDER: std::cell::RefCell<planus::Builder> = std::cell::RefCell::new(planus::Builder::new());
290}
291
292impl TradeSerializer {
293    /// Serialize a single trade record to FlatBuffers
294    ///
295    /// Converts a [`TradeRecord`] to FlatBuffers binary format for efficient storage.
296    /// Returns a vector of bytes containing the serialized trade data.
297    ///
298    /// # Arguments
299    ///
300    /// * `trade` - The trade record to serialize
301    ///
302    /// # Returns
303    ///
304    /// Returns `Ok(Vec<u8>)` containing the serialized data, or `Err(SchemaError)` on failure.
305    pub fn serialize_trade(trade: &TradeRecord) -> Result<Vec<u8>> {
306        use crate::monitor::schema::generated::rusty_monitor::schema::{
307            Trade, TradeSide as FbTradeSide,
308        };
309
310        let mut builder = planus::Builder::new();
311
312        // Convert internal TradeSide to FlatBuffers TradeSide
313        let fb_side = match trade.side {
314            TradeSide::Buy => FbTradeSide::Buy,
315            TradeSide::Sell => FbTradeSide::Sell,
316        };
317
318        // Build the Trade using the generated builder pattern
319        // Note: We need to find the correct builder construction pattern
320        let trade_data = Trade {
321            timestamp_exchange: trade.timestamp_exchange,
322            timestamp_system: trade.timestamp_system,
323            symbol: trade.symbol.to_string(),
324            exchange: trade.exchange.to_string(),
325            price: trade.price.to_string(),
326            quantity: trade.quantity.to_string(),
327            side: fb_side,
328            trade_id: Some(trade.trade_id.to_string()),
329            buyer_order_id: trade.buyer_order_id.as_ref().map(|s| s.to_string()),
330            seller_order_id: trade.seller_order_id.as_ref().map(|s| s.to_string()),
331            sequence: trade.sequence,
332        };
333
334        let trade_offset = planus::WriteAsOffset::prepare(&trade_data, &mut builder);
335        let bytes = builder.finish(trade_offset, None);
336        Ok(bytes.to_vec())
337    }
338
339    /// Serialize a batch of trades to FlatBuffers
340    ///
341    /// Converts a [`TradesBatchRecord`] containing multiple trades to FlatBuffers binary format.
342    /// More efficient than serializing individual trades for bulk operations.
343    ///
344    /// # Arguments
345    ///
346    /// * `batch` - The batch of trades to serialize
347    ///
348    /// # Returns
349    ///
350    /// Returns `Ok(Vec<u8>)` containing the serialized batch data, or `Err(SchemaError)` on failure.
351    pub fn serialize_batch(batch: &TradesBatchRecord) -> Result<Vec<u8>> {
352        use crate::monitor::schema::generated::rusty_monitor::schema::{
353            Trade, TradeSide as FbTradeSide, TradesBatch,
354        };
355
356        let mut builder = planus::Builder::new();
357
358        // Convert all trades to FlatBuffers format
359        let trades: Result<Vec<Trade>> = batch
360            .trades
361            .iter()
362            .map(|trade| {
363                let fb_side = match trade.side {
364                    TradeSide::Buy => FbTradeSide::Buy,
365                    TradeSide::Sell => FbTradeSide::Sell,
366                };
367
368                Ok(Trade {
369                    timestamp_exchange: trade.timestamp_exchange,
370                    timestamp_system: trade.timestamp_system,
371                    symbol: trade.symbol.to_string(),
372                    exchange: trade.exchange.to_string(),
373                    price: trade.price.to_string(),
374                    quantity: trade.quantity.to_string(),
375                    side: fb_side,
376                    trade_id: Some(trade.trade_id.to_string()),
377                    buyer_order_id: trade.buyer_order_id.as_ref().map(|s| s.to_string()),
378                    seller_order_id: trade.seller_order_id.as_ref().map(|s| s.to_string()),
379                    sequence: trade.sequence,
380                })
381            })
382            .collect();
383
384        let trades = trades?;
385
386        let trades_batch = TradesBatch {
387            trades,
388            batch_timestamp: batch.batch_timestamp,
389            batch_id: Some(batch.batch_id.to_string()),
390        };
391
392        let batch_offset = planus::WriteAsOffset::prepare(&trades_batch, &mut builder);
393        let bytes = builder.finish(batch_offset, None);
394        Ok(bytes.to_vec())
395    }
396
397    /// Serialize a trade directly into a provided buffer (zero-copy)
398    ///
399    /// Serializes a trade record directly into a provided buffer, avoiding allocation.
400    /// Uses thread-local builder for optimal performance in high-frequency scenarios.
401    ///
402    /// # Arguments
403    ///
404    /// * `trade` - The trade record to serialize
405    /// * `buffer` - Mutable buffer to write serialized data into
406    ///
407    /// # Returns
408    ///
409    /// Returns `Ok(usize)` with the number of bytes written, or `Err(SchemaError)` on failure.
410    pub fn serialize_trade_into(trade: &TradeRecord, buffer: &mut Vec<u8>) -> Result<usize> {
411        use crate::monitor::schema::generated::rusty_monitor::schema::{
412            Trade, TradeSide as FbTradeSide,
413        };
414
415        TRADE_BUILDER.with(|builder_cell| {
416            let mut builder = builder_cell.borrow_mut();
417            builder.clear(); // Reset builder for reuse
418
419            // Convert internal TradeSide to FlatBuffers TradeSide
420            let fb_side = match trade.side {
421                TradeSide::Buy => FbTradeSide::Buy,
422                TradeSide::Sell => FbTradeSide::Sell,
423            };
424
425            // Build the Trade using the generated builder pattern
426            let trade_data = Trade {
427                timestamp_exchange: trade.timestamp_exchange,
428                timestamp_system: trade.timestamp_system,
429                symbol: trade.symbol.to_string(),
430                exchange: trade.exchange.to_string(),
431                price: trade.price.to_string(),
432                quantity: trade.quantity.to_string(),
433                side: fb_side,
434                trade_id: Some(trade.trade_id.to_string()),
435                buyer_order_id: trade.buyer_order_id.as_ref().map(|s| s.to_string()),
436                seller_order_id: trade.seller_order_id.as_ref().map(|s| s.to_string()),
437                sequence: trade.sequence,
438            };
439
440            let trade_offset = planus::WriteAsOffset::prepare(&trade_data, &mut builder);
441            let bytes = builder.finish(trade_offset, None);
442
443            // Clear the buffer and copy data
444            buffer.clear();
445            buffer.extend_from_slice(bytes);
446
447            Ok(bytes.len())
448        })
449    }
450
451    /// Deserialize trade data from FlatBuffers
452    ///
453    /// Converts FlatBuffers binary data back to a [`TradeRecord`] structure.
454    /// Uses optimized decimal parsing for better performance.
455    ///
456    /// # Arguments
457    ///
458    /// * `data` - Byte slice containing serialized trade data
459    ///
460    /// # Returns
461    ///
462    /// Returns `Ok(TradeRecord)` with the deserialized trade, or `Err(SchemaError)` on failure.
463    pub fn deserialize_trade(data: &[u8]) -> Result<TradeRecord> {
464        use crate::monitor::schema::generated::rusty_monitor::schema::{
465            TradeRef, TradeSide as FbTradeSide,
466        };
467
468        // Deserialize using FlatBuffers
469        let trade_ref = TradeRef::read_as_root(data)
470            .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
471
472        // Extract fields from FlatBuffers reference
473        let timestamp_exchange = trade_ref
474            .timestamp_exchange()
475            .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
476        let timestamp_system = trade_ref
477            .timestamp_system()
478            .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
479        let symbol = trade_ref
480            .symbol()
481            .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
482        let exchange = trade_ref
483            .exchange()
484            .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
485        let price_str = trade_ref
486            .price()
487            .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
488        let quantity_str = trade_ref
489            .quantity()
490            .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
491        let fb_side = trade_ref
492            .side()
493            .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
494        let trade_id = trade_ref
495            .trade_id()
496            .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
497        let buyer_order_id = trade_ref.buyer_order_id().ok().flatten();
498        let seller_order_id = trade_ref.seller_order_id().ok().flatten();
499        let sequence = trade_ref
500            .sequence()
501            .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
502
503        // Convert FlatBuffers string types to internal types using optimized parsing
504        let price = decimal_optimized::fast_parse_decimal(price_str)
505            .map_err(|e| SchemaError::InvalidData(format!("Invalid price: {e}")))?;
506        let quantity = decimal_optimized::fast_parse_decimal(quantity_str)
507            .map_err(|e| SchemaError::InvalidData(format!("Invalid quantity: {e}")))?;
508        let side = match fb_side {
509            FbTradeSide::Buy => TradeSide::Buy,
510            FbTradeSide::Sell => TradeSide::Sell,
511        };
512
513        Ok(TradeRecord {
514            timestamp_exchange,
515            timestamp_system,
516            symbol: symbol.into(),
517            exchange: exchange.into(),
518            price,
519            quantity,
520            side,
521            trade_id: trade_id.unwrap_or("").into(),
522            buyer_order_id: buyer_order_id.map(|s| s.into()),
523            seller_order_id: seller_order_id.map(|s| s.into()),
524            sequence,
525        })
526    }
527}
528
529/// Serialization utilities for orderbook data
530///
531/// Provides methods to serialize and deserialize orderbook records using FlatBuffers.
532/// Handles complex price level structures with optimized decimal parsing.
533pub struct OrderBookSerializer;
534
535impl OrderBookSerializer {
536    /// Serialize a single orderbook record to FlatBuffers
537    ///
538    /// Converts an [`OrderBookRecord`] to FlatBuffers binary format for efficient storage.
539    /// Handles complex price level structures with multiple bid/ask levels.
540    ///
541    /// # Arguments
542    ///
543    /// * `orderbook` - The orderbook record to serialize
544    ///
545    /// # Returns
546    ///
547    /// Returns `Ok(Vec<u8>)` containing the serialized data, or `Err(SchemaError)` on failure.
548    pub fn serialize_orderbook(orderbook: &OrderBookRecord) -> Result<Vec<u8>> {
549        use crate::monitor::schema::generated::orderbook_generated::rusty_monitor::schema::{
550            OrderBook, PriceLevel as FbPriceLevel,
551        };
552
553        let mut builder = planus::Builder::new();
554
555        // Convert price levels to FlatBuffers format
556        let bids: Vec<FbPriceLevel> = orderbook
557            .bids
558            .iter()
559            .map(|level| FbPriceLevel {
560                price: level.price.to_string(),
561                quantity: level.quantity.to_string(),
562                order_count: level.order_count.unwrap_or(0),
563            })
564            .collect();
565
566        let asks: Vec<FbPriceLevel> = orderbook
567            .asks
568            .iter()
569            .map(|level| FbPriceLevel {
570                price: level.price.to_string(),
571                quantity: level.quantity.to_string(),
572                order_count: level.order_count.unwrap_or(0),
573            })
574            .collect();
575
576        let orderbook_data = OrderBook {
577            timestamp_exchange: orderbook.timestamp_exchange,
578            timestamp_system: orderbook.timestamp_system,
579            symbol: orderbook.symbol.to_string(),
580            exchange: orderbook.exchange.to_string(),
581            bids,
582            asks,
583            sequence: orderbook.sequence,
584            last_update_id: 0, // Default value, could be added to OrderBookRecord if needed
585            is_snapshot: true, // Default value, could be added to OrderBookRecord if needed
586            depth: (orderbook.bids.len() + orderbook.asks.len()) as u32,
587            checksum: orderbook.checksum.as_ref().map(|s| s.to_string()),
588        };
589
590        let orderbook_offset = planus::WriteAsOffset::prepare(&orderbook_data, &mut builder);
591        let bytes = builder.finish(orderbook_offset, None);
592        Ok(bytes.to_vec())
593    }
594
595    /// Deserialize orderbook data from FlatBuffers
596    ///
597    /// Converts FlatBuffers binary data back to an [`OrderBookRecord`] structure.
598    /// Reconstructs all price levels with optimized decimal parsing.
599    ///
600    /// # Arguments
601    ///
602    /// * `data` - Byte slice containing serialized orderbook data
603    ///
604    /// # Returns
605    ///
606    /// Returns `Ok(OrderBookRecord)` with the deserialized orderbook, or `Err(SchemaError)` on failure.
607    pub fn deserialize_orderbook(data: &[u8]) -> Result<OrderBookRecord> {
608        use crate::monitor::schema::generated::orderbook_generated::rusty_monitor::schema::OrderBookRef;
609
610        // Deserialize using FlatBuffers
611        let orderbook_ref = OrderBookRef::read_as_root(data)
612            .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
613
614        // Extract fields from FlatBuffers reference
615        let timestamp_exchange = orderbook_ref
616            .timestamp_exchange()
617            .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
618        let timestamp_system = orderbook_ref
619            .timestamp_system()
620            .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
621        let symbol = orderbook_ref
622            .symbol()
623            .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
624        let exchange = orderbook_ref
625            .exchange()
626            .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
627        let sequence = orderbook_ref
628            .sequence()
629            .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
630        let checksum = orderbook_ref.checksum().ok().flatten();
631
632        // Convert price levels
633        let bids_vec = orderbook_ref
634            .bids()
635            .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
636        let bids = Self::parse_price_levels_fb(&bids_vec)?;
637
638        let asks_vec = orderbook_ref
639            .asks()
640            .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
641        let asks = Self::parse_price_levels_fb(&asks_vec)?;
642
643        Ok(OrderBookRecord {
644            timestamp_exchange,
645            timestamp_system,
646            symbol: symbol.into(),
647            exchange: exchange.into(),
648            bids,
649            asks,
650            sequence,
651            checksum: checksum.map(|s| s.into()),
652        })
653    }
654
655    /// Helper to parse price levels from JSON
656    ///
657    /// Converts JSON array of price levels to internal [`PriceLevel`] structures.
658    /// Used for processing raw JSON data from exchanges.
659    ///
660    /// # Arguments
661    ///
662    /// * `levels_json` - JSON value containing array of price levels
663    ///
664    /// # Returns
665    ///
666    /// Returns `Ok(Vec<PriceLevel>)` with parsed levels, or `Err(SchemaError)` on failure.
667    fn parse_price_levels_json(levels_json: &simd_json::OwnedValue) -> Result<Vec<PriceLevel>> {
668        let levels_array = levels_json
669            .as_array()
670            .ok_or_else(|| SchemaError::InvalidData("Price levels must be an array".to_string()))?;
671
672        let mut result = Vec::new();
673        for level_json in levels_array {
674            let price = level_json["price"]
675                .as_str()
676                .ok_or_else(|| SchemaError::InvalidData("Missing price in level".to_string()))?
677                .parse::<Decimal>()
678                .map_err(|e| SchemaError::InvalidData(format!("Invalid price: {e}")))?;
679
680            let quantity = level_json["quantity"]
681                .as_str()
682                .ok_or_else(|| SchemaError::InvalidData("Missing quantity in level".to_string()))?
683                .parse::<Decimal>()
684                .map_err(|e| SchemaError::InvalidData(format!("Invalid quantity: {e}")))?;
685
686            let order_count = level_json["order_count"].as_u64().map(|c| c as u32);
687
688            result.push(PriceLevel {
689                price,
690                quantity,
691                order_count,
692            });
693        }
694
695        Ok(result)
696    }
697
698    /// Helper to parse price levels from FlatBuffers
699    ///
700    /// Converts FlatBuffers vector of price levels to internal [`PriceLevel`] structures.
701    /// Used during deserialization of orderbook data.
702    ///
703    /// # Arguments
704    ///
705    /// * `levels_vec` - FlatBuffers vector containing price level references
706    ///
707    /// # Returns
708    ///
709    /// Returns `Ok(Vec<PriceLevel>)` with parsed levels, or `Err(SchemaError)` on failure.
710    fn parse_price_levels_fb<'a>(
711        levels_vec: &planus::Vector<'a, planus::Result<crate::monitor::schema::generated::orderbook_generated::rusty_monitor::schema::PriceLevelRef<'a>>>,
712    ) -> Result<Vec<PriceLevel>> {
713        let mut result = Vec::new();
714        for i in 0..levels_vec.len() {
715            let level_ref = levels_vec
716                .get(i)
717                .unwrap()
718                .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
719
720            let price_str = level_ref
721                .price()
722                .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
723            let quantity_str = level_ref
724                .quantity()
725                .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
726            let order_count = level_ref
727                .order_count()
728                .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
729
730            let price = decimal_optimized::fast_parse_decimal(price_str)
731                .map_err(|e| SchemaError::InvalidData(format!("Invalid price: {e}")))?;
732            let quantity = decimal_optimized::fast_parse_decimal(quantity_str)
733                .map_err(|e| SchemaError::InvalidData(format!("Invalid quantity: {e}")))?;
734
735            result.push(PriceLevel {
736                price,
737                quantity,
738                order_count: if order_count == 0 {
739                    None
740                } else {
741                    Some(order_count)
742                },
743            });
744        }
745
746        Ok(result)
747    }
748}
749
750/// Utility functions for timestamp handling
751///
752/// Provides high-resolution timestamp functions for market data timestamping.
753/// All timestamps are in nanoseconds since Unix epoch for maximum precision.
754pub mod timestamp {
755    /// Get current timestamp in nanoseconds
756    ///
757    /// Returns the current high-resolution timestamp in nanoseconds since Unix epoch.
758    /// Uses the monitor's optimized time utilities for consistent timing.
759    pub fn now_nanos() -> u64 {
760        crate::monitor::utils::time::now_nanos()
761    }
762
763    /// Get system time in nanoseconds since UNIX epoch
764    ///
765    /// Returns the current system time in nanoseconds since Unix epoch.
766    /// Uses standard library SystemTime for compatibility.
767    pub fn system_time_nanos() -> u64 {
768        std::time::SystemTime::now()
769            .duration_since(std::time::UNIX_EPOCH)
770            .unwrap_or_default()
771            .as_nanos() as u64
772    }
773}
774
775#[cfg(test)]
776mod tests {
777    use super::*;
778    use rust_decimal_macros::dec;
779
780    #[test]
781    fn test_trade_serialization_roundtrip() {
782        let trade = TradeRecord {
783            timestamp_exchange: 1234567890123456789,
784            timestamp_system: 1234567890123456790,
785            symbol: "BTCUSDT".into(),
786            exchange: "binance".into(),
787            price: dec!(50000.123),
788            quantity: dec!(0.001),
789            side: TradeSide::Buy,
790            trade_id: "12345".into(),
791            buyer_order_id: Some("buyer123".into()),
792            seller_order_id: Some("seller456".into()),
793            sequence: 100,
794        };
795
796        let serialized = TradeSerializer::serialize_trade(&trade).unwrap();
797        let deserialized = TradeSerializer::deserialize_trade(&serialized).unwrap();
798
799        assert_eq!(trade.timestamp_exchange, deserialized.timestamp_exchange);
800        assert_eq!(trade.symbol, deserialized.symbol);
801        assert_eq!(trade.price, deserialized.price);
802        assert_eq!(trade.side, deserialized.side);
803    }
804
805    #[test]
806    fn test_orderbook_serialization_roundtrip() {
807        let orderbook = OrderBookRecord {
808            timestamp_exchange: 1234567890123456789,
809            timestamp_system: 1234567890123456790,
810            symbol: "BTCUSDT".into(),
811            exchange: "binance".into(),
812            bids: vec![PriceLevel {
813                price: dec!(50000.0),
814                quantity: dec!(1.0),
815                order_count: Some(5),
816            }],
817            asks: vec![PriceLevel {
818                price: dec!(50001.0),
819                quantity: dec!(2.0),
820                order_count: Some(3),
821            }],
822            sequence: 200,
823            checksum: Some("abc123".into()),
824        };
825
826        let serialized = OrderBookSerializer::serialize_orderbook(&orderbook).unwrap();
827        let deserialized = OrderBookSerializer::deserialize_orderbook(&serialized).unwrap();
828
829        assert_eq!(
830            orderbook.timestamp_exchange,
831            deserialized.timestamp_exchange
832        );
833        assert_eq!(orderbook.symbol, deserialized.symbol);
834        assert_eq!(orderbook.bids.len(), deserialized.bids.len());
835        assert_eq!(orderbook.asks.len(), deserialized.asks.len());
836    }
837}