rusty_bin/monitor/schema/mod.rs
1//! Schema module for FlatBuffers serialization and deserialization
2//!
3//! This module provides high-level interfaces for working with trade and orderbook data
4//! using FlatBuffers for efficient serialization.
5//!
6//! ## Performance Optimizations
7//!
8//! This module implements several JSON parsing optimizations:
9//!
10//! 1. **Binary Decimal Serialization**: Instead of converting `Decimal` to `String` and back,
11//! we use binary representation for FlatBuffers, eliminating the string conversion overhead.
12//!
13//! 2. **Fast Decimal Parsing**: Uses simd_json's optimized number parsing where possible,
14//! falling back to standard parsing only for edge cases.
15//!
16//! 3. **Zero-Copy Operations**: Leverages FlatBuffers' zero-copy capabilities with binary data
17//! instead of string-based serialization.
18//!
19//! These optimizations significantly improve performance for high-frequency trading scenarios
20//! where every microsecond matters.
21
22use planus::ReadAsRoot;
23use rust_decimal::Decimal;
24use simd_json;
25use simd_json::prelude::*;
26use smartstring::alias::String as SmartString;
27use std::io;
28use thiserror::Error;
29
30/// Optimized serialization utilities for efficient Decimal handling
31///
32/// This module provides high-performance decimal parsing and serialization functions.
33/// Designed for trading applications where decimal precision and performance are critical.
34pub mod decimal_optimized {
35 use rust_decimal::Decimal;
36 use smartstring::alias::String as SmartString;
37
38 /// Fast string to Decimal parser with optimized path for common cases
39 ///
40 /// Replaces the standard .parse() method for better performance.
41 /// Uses optimized parsing for common decimal patterns, falling back to standard parsing for edge cases.
42 ///
43 /// # Arguments
44 ///
45 /// * `s` - String slice to parse as decimal
46 ///
47 /// # Returns
48 ///
49 /// Returns `Ok(Decimal)` on success, or `Err(String)` with error message on failure.
50 pub fn fast_parse_decimal(s: &str) -> Result<Decimal, String> {
51 // Fast path for common decimal patterns using optimized parsing
52 match fast_decimal_from_str(s) {
53 Ok(decimal) => Ok(decimal),
54 Err(_) => {
55 // Fallback to standard parsing for edge cases
56 s.parse::<Decimal>()
57 .map_err(|e| format!("Failed to parse decimal '{s}': {e}"))
58 }
59 }
60 }
61
62 /// Optimized Decimal to string conversion using SmartString for performance
63 ///
64 /// Converts a decimal to string using [`SmartString`] for better memory efficiency.
65 /// Optimized for short decimal strings common in trading scenarios.
66 ///
67 /// # Arguments
68 ///
69 /// * `decimal` - The decimal value to convert
70 ///
71 /// # Returns
72 ///
73 /// Returns a [`SmartString`] containing the decimal representation.
74 pub fn fast_decimal_to_string(decimal: Decimal) -> SmartString {
75 SmartString::from(decimal.to_string())
76 }
77
78 /// Fast decimal parsing for simple numeric strings
79 ///
80 /// Handles common trading decimal patterns efficiently using optimized parsing paths.
81 /// Tries integer parsing first, then float conversion for simple decimals.
82 ///
83 /// # Arguments
84 ///
85 /// * `s` - String slice to parse
86 ///
87 /// # Returns
88 ///
89 /// Returns `Ok(Decimal)` on success, or `Err(&'static str)` for complex patterns.
90 fn fast_decimal_from_str(s: &str) -> Result<Decimal, &'static str> {
91 // Handle empty string
92 if s.is_empty() {
93 return Err("Empty string");
94 }
95
96 // Fast path for integers (no decimal point)
97 if !s.contains('.')
98 && let Ok(int_val) = s.parse::<i64>()
99 {
100 return Ok(Decimal::from(int_val));
101 }
102
103 // Fast path for simple decimals using f64 conversion
104 if let Ok(float_val) = s.parse::<f64>()
105 && float_val.is_finite()
106 {
107 return Decimal::try_from(float_val).map_err(|_| "Float conversion failed");
108 }
109
110 Err("Complex decimal pattern")
111 }
112
113 /// Batch convert vector of decimal strings with SIMD-friendly processing
114 ///
115 /// Processes multiple decimal strings in a single operation for better performance.
116 /// Uses vectorized processing patterns that can benefit from SIMD optimizations.
117 ///
118 /// # Arguments
119 ///
120 /// * `strings` - Slice of string slices to parse as decimals
121 ///
122 /// # Returns
123 ///
124 /// Returns a vector of results, one for each input string.
125 pub fn batch_parse_decimals(strings: &[&str]) -> Vec<Result<Decimal, String>> {
126 strings.iter().map(|s| fast_parse_decimal(s)).collect()
127 }
128}
129
130/// Generated FlatBuffers schema definitions
131///
132/// This module contains the auto-generated Rust code from FlatBuffers schema files.
133/// It includes the compiled schema definitions for trade and orderbook data structures.
134#[allow(missing_docs)]
135pub mod generated;
136
137/// Represents the side of a trade transaction
138///
139/// This enum identifies whether a trade was initiated by a buyer or seller.
140/// Used in trade records to indicate market direction and flow.
141#[derive(Debug, Clone, Copy, PartialEq)]
142pub enum TradeSide {
143 /// Trade initiated by a buyer (market buy order)
144 Buy,
145 /// Trade initiated by a seller (market sell order)
146 Sell,
147}
148
149/// Errors that can occur during schema operations
150///
151/// Comprehensive error type for all schema-related operations including
152/// serialization, deserialization, and data validation.
153#[derive(Error, Debug)]
154pub enum SchemaError {
155 /// Error during serialization process
156 #[error("Serialization failed: {0}")]
157 Serialization(String),
158
159 /// Error during deserialization process
160 #[error("Deserialization failed: {0}")]
161 Deserialization(String),
162
163 /// Invalid or malformed data encountered
164 #[error("Invalid data: {0}")]
165 InvalidData(String),
166
167 /// I/O operation failed
168 #[error("IO error: {0}")]
169 Io(#[from] io::Error),
170
171 /// Feature or functionality not yet implemented
172 #[error("Not implemented: {0}")]
173 NotImplemented(String),
174}
175
176/// Result type alias for schema operations
177///
178/// Convenience type alias that uses [`SchemaError`] as the error type.
179/// Used throughout the module for consistent error handling.
180pub type Result<T> = std::result::Result<T, SchemaError>;
181
182/// High-level trade data structure for easier manipulation
183///
184/// Represents a single trade transaction with all relevant metadata.
185/// This structure is optimized for serialization and uses `SmartString` for performance.
186#[derive(Debug, Clone)]
187pub struct TradeRecord {
188 /// Timestamp when the trade occurred at the exchange (nanoseconds since epoch)
189 pub timestamp_exchange: u64,
190 /// Timestamp when the trade was processed by our system (nanoseconds since epoch)
191 pub timestamp_system: u64,
192 /// Trading pair symbol (e.g., "BTCUSDT")
193 pub symbol: SmartString,
194 /// Exchange identifier (e.g., "binance", "coinbase")
195 pub exchange: SmartString,
196 /// Trade execution price using high-precision decimal arithmetic
197 pub price: Decimal,
198 /// Trade quantity/volume using high-precision decimal arithmetic
199 pub quantity: Decimal,
200 /// Side of the trade (buy or sell)
201 pub side: TradeSide,
202 /// Unique identifier for this trade from the exchange
203 pub trade_id: SmartString,
204 /// Optional order ID of the buyer (if available from exchange)
205 pub buyer_order_id: Option<SmartString>,
206 /// Optional order ID of the seller (if available from exchange)
207 pub seller_order_id: Option<SmartString>,
208 /// Sequence number for ordering trades chronologically
209 pub sequence: u64,
210}
211
212/// High-level orderbook data structure for easier manipulation
213///
214/// Represents a complete order book snapshot with bid/ask levels.
215/// Contains all price levels and metadata for a trading pair at a specific time.
216#[derive(Debug, Clone)]
217pub struct OrderBookRecord {
218 /// Timestamp when the orderbook was captured at the exchange (nanoseconds since epoch)
219 pub timestamp_exchange: u64,
220 /// Timestamp when the orderbook was processed by our system (nanoseconds since epoch)
221 pub timestamp_system: u64,
222 /// Trading pair symbol (e.g., "BTCUSDT")
223 pub symbol: SmartString,
224 /// Exchange identifier (e.g., "binance", "coinbase")
225 pub exchange: SmartString,
226 /// Ordered list of bid price levels (highest to lowest)
227 pub bids: Vec<PriceLevel>,
228 /// Ordered list of ask price levels (lowest to highest)
229 pub asks: Vec<PriceLevel>,
230 /// Sequence number for ordering orderbook updates chronologically
231 pub sequence: u64,
232 /// Optional checksum for data integrity verification
233 pub checksum: Option<SmartString>,
234}
235
236/// High-level price level structure
237///
238/// Represents a single price level in an order book with price, quantity, and order count.
239/// Used for both bid and ask levels in orderbook snapshots.
240#[derive(Debug, Clone)]
241pub struct PriceLevel {
242 /// Price at this level using high-precision decimal arithmetic
243 pub price: Decimal,
244 /// Total quantity available at this price level
245 pub quantity: Decimal,
246 /// Number of individual orders at this price level (if available)
247 pub order_count: Option<u32>,
248}
249
250/// Batch of trade records for efficient processing
251///
252/// Groups multiple trade records together for batch operations.
253/// Useful for bulk serialization and processing of trade data.
254#[derive(Debug)]
255pub struct TradesBatchRecord {
256 /// Vector of trade records in this batch
257 pub trades: Vec<TradeRecord>,
258 /// Timestamp when this batch was created (nanoseconds since epoch)
259 pub batch_timestamp: u64,
260 /// Unique identifier for this batch
261 pub batch_id: SmartString,
262}
263
264/// Batch of orderbook records for efficient processing
265///
266/// Groups multiple orderbook records together for batch operations.
267/// Useful for bulk serialization and processing of orderbook data.
268#[derive(Debug)]
269pub struct OrderBookBatchRecord {
270 /// Vector of orderbook records in this batch
271 pub orderbooks: Vec<OrderBookRecord>,
272 /// Timestamp when this batch was created (nanoseconds since epoch)
273 pub batch_timestamp: u64,
274 /// Unique identifier for this batch
275 pub batch_id: SmartString,
276}
277
278/// Serialization utilities for trade data
279///
280/// Provides methods to serialize and deserialize trade records using FlatBuffers.
281/// Optimized for high-frequency trading scenarios with thread-local builders.
282pub struct TradeSerializer;
283
284thread_local! {
285 /// Thread-local builder for zero-allocation serialization
286 ///
287 /// Provides a reusable FlatBuffers builder per thread to avoid allocations.
288 /// Used by [`TradeSerializer::serialize_trade_into`] for optimal performance.
289 static TRADE_BUILDER: std::cell::RefCell<planus::Builder> = std::cell::RefCell::new(planus::Builder::new());
290}
291
292impl TradeSerializer {
293 /// Serialize a single trade record to FlatBuffers
294 ///
295 /// Converts a [`TradeRecord`] to FlatBuffers binary format for efficient storage.
296 /// Returns a vector of bytes containing the serialized trade data.
297 ///
298 /// # Arguments
299 ///
300 /// * `trade` - The trade record to serialize
301 ///
302 /// # Returns
303 ///
304 /// Returns `Ok(Vec<u8>)` containing the serialized data, or `Err(SchemaError)` on failure.
305 pub fn serialize_trade(trade: &TradeRecord) -> Result<Vec<u8>> {
306 use crate::monitor::schema::generated::rusty_monitor::schema::{
307 Trade, TradeSide as FbTradeSide,
308 };
309
310 let mut builder = planus::Builder::new();
311
312 // Convert internal TradeSide to FlatBuffers TradeSide
313 let fb_side = match trade.side {
314 TradeSide::Buy => FbTradeSide::Buy,
315 TradeSide::Sell => FbTradeSide::Sell,
316 };
317
318 // Build the Trade using the generated builder pattern
319 // Note: We need to find the correct builder construction pattern
320 let trade_data = Trade {
321 timestamp_exchange: trade.timestamp_exchange,
322 timestamp_system: trade.timestamp_system,
323 symbol: trade.symbol.to_string(),
324 exchange: trade.exchange.to_string(),
325 price: trade.price.to_string(),
326 quantity: trade.quantity.to_string(),
327 side: fb_side,
328 trade_id: Some(trade.trade_id.to_string()),
329 buyer_order_id: trade.buyer_order_id.as_ref().map(|s| s.to_string()),
330 seller_order_id: trade.seller_order_id.as_ref().map(|s| s.to_string()),
331 sequence: trade.sequence,
332 };
333
334 let trade_offset = planus::WriteAsOffset::prepare(&trade_data, &mut builder);
335 let bytes = builder.finish(trade_offset, None);
336 Ok(bytes.to_vec())
337 }
338
339 /// Serialize a batch of trades to FlatBuffers
340 ///
341 /// Converts a [`TradesBatchRecord`] containing multiple trades to FlatBuffers binary format.
342 /// More efficient than serializing individual trades for bulk operations.
343 ///
344 /// # Arguments
345 ///
346 /// * `batch` - The batch of trades to serialize
347 ///
348 /// # Returns
349 ///
350 /// Returns `Ok(Vec<u8>)` containing the serialized batch data, or `Err(SchemaError)` on failure.
351 pub fn serialize_batch(batch: &TradesBatchRecord) -> Result<Vec<u8>> {
352 use crate::monitor::schema::generated::rusty_monitor::schema::{
353 Trade, TradeSide as FbTradeSide, TradesBatch,
354 };
355
356 let mut builder = planus::Builder::new();
357
358 // Convert all trades to FlatBuffers format
359 let trades: Result<Vec<Trade>> = batch
360 .trades
361 .iter()
362 .map(|trade| {
363 let fb_side = match trade.side {
364 TradeSide::Buy => FbTradeSide::Buy,
365 TradeSide::Sell => FbTradeSide::Sell,
366 };
367
368 Ok(Trade {
369 timestamp_exchange: trade.timestamp_exchange,
370 timestamp_system: trade.timestamp_system,
371 symbol: trade.symbol.to_string(),
372 exchange: trade.exchange.to_string(),
373 price: trade.price.to_string(),
374 quantity: trade.quantity.to_string(),
375 side: fb_side,
376 trade_id: Some(trade.trade_id.to_string()),
377 buyer_order_id: trade.buyer_order_id.as_ref().map(|s| s.to_string()),
378 seller_order_id: trade.seller_order_id.as_ref().map(|s| s.to_string()),
379 sequence: trade.sequence,
380 })
381 })
382 .collect();
383
384 let trades = trades?;
385
386 let trades_batch = TradesBatch {
387 trades,
388 batch_timestamp: batch.batch_timestamp,
389 batch_id: Some(batch.batch_id.to_string()),
390 };
391
392 let batch_offset = planus::WriteAsOffset::prepare(&trades_batch, &mut builder);
393 let bytes = builder.finish(batch_offset, None);
394 Ok(bytes.to_vec())
395 }
396
397 /// Serialize a trade directly into a provided buffer (zero-copy)
398 ///
399 /// Serializes a trade record directly into a provided buffer, avoiding allocation.
400 /// Uses thread-local builder for optimal performance in high-frequency scenarios.
401 ///
402 /// # Arguments
403 ///
404 /// * `trade` - The trade record to serialize
405 /// * `buffer` - Mutable buffer to write serialized data into
406 ///
407 /// # Returns
408 ///
409 /// Returns `Ok(usize)` with the number of bytes written, or `Err(SchemaError)` on failure.
410 pub fn serialize_trade_into(trade: &TradeRecord, buffer: &mut Vec<u8>) -> Result<usize> {
411 use crate::monitor::schema::generated::rusty_monitor::schema::{
412 Trade, TradeSide as FbTradeSide,
413 };
414
415 TRADE_BUILDER.with(|builder_cell| {
416 let mut builder = builder_cell.borrow_mut();
417 builder.clear(); // Reset builder for reuse
418
419 // Convert internal TradeSide to FlatBuffers TradeSide
420 let fb_side = match trade.side {
421 TradeSide::Buy => FbTradeSide::Buy,
422 TradeSide::Sell => FbTradeSide::Sell,
423 };
424
425 // Build the Trade using the generated builder pattern
426 let trade_data = Trade {
427 timestamp_exchange: trade.timestamp_exchange,
428 timestamp_system: trade.timestamp_system,
429 symbol: trade.symbol.to_string(),
430 exchange: trade.exchange.to_string(),
431 price: trade.price.to_string(),
432 quantity: trade.quantity.to_string(),
433 side: fb_side,
434 trade_id: Some(trade.trade_id.to_string()),
435 buyer_order_id: trade.buyer_order_id.as_ref().map(|s| s.to_string()),
436 seller_order_id: trade.seller_order_id.as_ref().map(|s| s.to_string()),
437 sequence: trade.sequence,
438 };
439
440 let trade_offset = planus::WriteAsOffset::prepare(&trade_data, &mut builder);
441 let bytes = builder.finish(trade_offset, None);
442
443 // Clear the buffer and copy data
444 buffer.clear();
445 buffer.extend_from_slice(bytes);
446
447 Ok(bytes.len())
448 })
449 }
450
451 /// Deserialize trade data from FlatBuffers
452 ///
453 /// Converts FlatBuffers binary data back to a [`TradeRecord`] structure.
454 /// Uses optimized decimal parsing for better performance.
455 ///
456 /// # Arguments
457 ///
458 /// * `data` - Byte slice containing serialized trade data
459 ///
460 /// # Returns
461 ///
462 /// Returns `Ok(TradeRecord)` with the deserialized trade, or `Err(SchemaError)` on failure.
463 pub fn deserialize_trade(data: &[u8]) -> Result<TradeRecord> {
464 use crate::monitor::schema::generated::rusty_monitor::schema::{
465 TradeRef, TradeSide as FbTradeSide,
466 };
467
468 // Deserialize using FlatBuffers
469 let trade_ref = TradeRef::read_as_root(data)
470 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
471
472 // Extract fields from FlatBuffers reference
473 let timestamp_exchange = trade_ref
474 .timestamp_exchange()
475 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
476 let timestamp_system = trade_ref
477 .timestamp_system()
478 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
479 let symbol = trade_ref
480 .symbol()
481 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
482 let exchange = trade_ref
483 .exchange()
484 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
485 let price_str = trade_ref
486 .price()
487 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
488 let quantity_str = trade_ref
489 .quantity()
490 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
491 let fb_side = trade_ref
492 .side()
493 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
494 let trade_id = trade_ref
495 .trade_id()
496 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
497 let buyer_order_id = trade_ref.buyer_order_id().ok().flatten();
498 let seller_order_id = trade_ref.seller_order_id().ok().flatten();
499 let sequence = trade_ref
500 .sequence()
501 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
502
503 // Convert FlatBuffers string types to internal types using optimized parsing
504 let price = decimal_optimized::fast_parse_decimal(price_str)
505 .map_err(|e| SchemaError::InvalidData(format!("Invalid price: {e}")))?;
506 let quantity = decimal_optimized::fast_parse_decimal(quantity_str)
507 .map_err(|e| SchemaError::InvalidData(format!("Invalid quantity: {e}")))?;
508 let side = match fb_side {
509 FbTradeSide::Buy => TradeSide::Buy,
510 FbTradeSide::Sell => TradeSide::Sell,
511 };
512
513 Ok(TradeRecord {
514 timestamp_exchange,
515 timestamp_system,
516 symbol: symbol.into(),
517 exchange: exchange.into(),
518 price,
519 quantity,
520 side,
521 trade_id: trade_id.unwrap_or("").into(),
522 buyer_order_id: buyer_order_id.map(|s| s.into()),
523 seller_order_id: seller_order_id.map(|s| s.into()),
524 sequence,
525 })
526 }
527}
528
529/// Serialization utilities for orderbook data
530///
531/// Provides methods to serialize and deserialize orderbook records using FlatBuffers.
532/// Handles complex price level structures with optimized decimal parsing.
533pub struct OrderBookSerializer;
534
535impl OrderBookSerializer {
536 /// Serialize a single orderbook record to FlatBuffers
537 ///
538 /// Converts an [`OrderBookRecord`] to FlatBuffers binary format for efficient storage.
539 /// Handles complex price level structures with multiple bid/ask levels.
540 ///
541 /// # Arguments
542 ///
543 /// * `orderbook` - The orderbook record to serialize
544 ///
545 /// # Returns
546 ///
547 /// Returns `Ok(Vec<u8>)` containing the serialized data, or `Err(SchemaError)` on failure.
548 pub fn serialize_orderbook(orderbook: &OrderBookRecord) -> Result<Vec<u8>> {
549 use crate::monitor::schema::generated::orderbook_generated::rusty_monitor::schema::{
550 OrderBook, PriceLevel as FbPriceLevel,
551 };
552
553 let mut builder = planus::Builder::new();
554
555 // Convert price levels to FlatBuffers format
556 let bids: Vec<FbPriceLevel> = orderbook
557 .bids
558 .iter()
559 .map(|level| FbPriceLevel {
560 price: level.price.to_string(),
561 quantity: level.quantity.to_string(),
562 order_count: level.order_count.unwrap_or(0),
563 })
564 .collect();
565
566 let asks: Vec<FbPriceLevel> = orderbook
567 .asks
568 .iter()
569 .map(|level| FbPriceLevel {
570 price: level.price.to_string(),
571 quantity: level.quantity.to_string(),
572 order_count: level.order_count.unwrap_or(0),
573 })
574 .collect();
575
576 let orderbook_data = OrderBook {
577 timestamp_exchange: orderbook.timestamp_exchange,
578 timestamp_system: orderbook.timestamp_system,
579 symbol: orderbook.symbol.to_string(),
580 exchange: orderbook.exchange.to_string(),
581 bids,
582 asks,
583 sequence: orderbook.sequence,
584 last_update_id: 0, // Default value, could be added to OrderBookRecord if needed
585 is_snapshot: true, // Default value, could be added to OrderBookRecord if needed
586 depth: (orderbook.bids.len() + orderbook.asks.len()) as u32,
587 checksum: orderbook.checksum.as_ref().map(|s| s.to_string()),
588 };
589
590 let orderbook_offset = planus::WriteAsOffset::prepare(&orderbook_data, &mut builder);
591 let bytes = builder.finish(orderbook_offset, None);
592 Ok(bytes.to_vec())
593 }
594
595 /// Deserialize orderbook data from FlatBuffers
596 ///
597 /// Converts FlatBuffers binary data back to an [`OrderBookRecord`] structure.
598 /// Reconstructs all price levels with optimized decimal parsing.
599 ///
600 /// # Arguments
601 ///
602 /// * `data` - Byte slice containing serialized orderbook data
603 ///
604 /// # Returns
605 ///
606 /// Returns `Ok(OrderBookRecord)` with the deserialized orderbook, or `Err(SchemaError)` on failure.
607 pub fn deserialize_orderbook(data: &[u8]) -> Result<OrderBookRecord> {
608 use crate::monitor::schema::generated::orderbook_generated::rusty_monitor::schema::OrderBookRef;
609
610 // Deserialize using FlatBuffers
611 let orderbook_ref = OrderBookRef::read_as_root(data)
612 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
613
614 // Extract fields from FlatBuffers reference
615 let timestamp_exchange = orderbook_ref
616 .timestamp_exchange()
617 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
618 let timestamp_system = orderbook_ref
619 .timestamp_system()
620 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
621 let symbol = orderbook_ref
622 .symbol()
623 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
624 let exchange = orderbook_ref
625 .exchange()
626 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
627 let sequence = orderbook_ref
628 .sequence()
629 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
630 let checksum = orderbook_ref.checksum().ok().flatten();
631
632 // Convert price levels
633 let bids_vec = orderbook_ref
634 .bids()
635 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
636 let bids = Self::parse_price_levels_fb(&bids_vec)?;
637
638 let asks_vec = orderbook_ref
639 .asks()
640 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
641 let asks = Self::parse_price_levels_fb(&asks_vec)?;
642
643 Ok(OrderBookRecord {
644 timestamp_exchange,
645 timestamp_system,
646 symbol: symbol.into(),
647 exchange: exchange.into(),
648 bids,
649 asks,
650 sequence,
651 checksum: checksum.map(|s| s.into()),
652 })
653 }
654
655 /// Helper to parse price levels from JSON
656 ///
657 /// Converts JSON array of price levels to internal [`PriceLevel`] structures.
658 /// Used for processing raw JSON data from exchanges.
659 ///
660 /// # Arguments
661 ///
662 /// * `levels_json` - JSON value containing array of price levels
663 ///
664 /// # Returns
665 ///
666 /// Returns `Ok(Vec<PriceLevel>)` with parsed levels, or `Err(SchemaError)` on failure.
667 fn parse_price_levels_json(levels_json: &simd_json::OwnedValue) -> Result<Vec<PriceLevel>> {
668 let levels_array = levels_json
669 .as_array()
670 .ok_or_else(|| SchemaError::InvalidData("Price levels must be an array".to_string()))?;
671
672 let mut result = Vec::new();
673 for level_json in levels_array {
674 let price = level_json["price"]
675 .as_str()
676 .ok_or_else(|| SchemaError::InvalidData("Missing price in level".to_string()))?
677 .parse::<Decimal>()
678 .map_err(|e| SchemaError::InvalidData(format!("Invalid price: {e}")))?;
679
680 let quantity = level_json["quantity"]
681 .as_str()
682 .ok_or_else(|| SchemaError::InvalidData("Missing quantity in level".to_string()))?
683 .parse::<Decimal>()
684 .map_err(|e| SchemaError::InvalidData(format!("Invalid quantity: {e}")))?;
685
686 let order_count = level_json["order_count"].as_u64().map(|c| c as u32);
687
688 result.push(PriceLevel {
689 price,
690 quantity,
691 order_count,
692 });
693 }
694
695 Ok(result)
696 }
697
698 /// Helper to parse price levels from FlatBuffers
699 ///
700 /// Converts FlatBuffers vector of price levels to internal [`PriceLevel`] structures.
701 /// Used during deserialization of orderbook data.
702 ///
703 /// # Arguments
704 ///
705 /// * `levels_vec` - FlatBuffers vector containing price level references
706 ///
707 /// # Returns
708 ///
709 /// Returns `Ok(Vec<PriceLevel>)` with parsed levels, or `Err(SchemaError)` on failure.
710 fn parse_price_levels_fb<'a>(
711 levels_vec: &planus::Vector<'a, planus::Result<crate::monitor::schema::generated::orderbook_generated::rusty_monitor::schema::PriceLevelRef<'a>>>,
712 ) -> Result<Vec<PriceLevel>> {
713 let mut result = Vec::new();
714 for i in 0..levels_vec.len() {
715 let level_ref = levels_vec
716 .get(i)
717 .unwrap()
718 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
719
720 let price_str = level_ref
721 .price()
722 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
723 let quantity_str = level_ref
724 .quantity()
725 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
726 let order_count = level_ref
727 .order_count()
728 .map_err(|e| SchemaError::Deserialization(e.to_string()))?;
729
730 let price = decimal_optimized::fast_parse_decimal(price_str)
731 .map_err(|e| SchemaError::InvalidData(format!("Invalid price: {e}")))?;
732 let quantity = decimal_optimized::fast_parse_decimal(quantity_str)
733 .map_err(|e| SchemaError::InvalidData(format!("Invalid quantity: {e}")))?;
734
735 result.push(PriceLevel {
736 price,
737 quantity,
738 order_count: if order_count == 0 {
739 None
740 } else {
741 Some(order_count)
742 },
743 });
744 }
745
746 Ok(result)
747 }
748}
749
750/// Utility functions for timestamp handling
751///
752/// Provides high-resolution timestamp functions for market data timestamping.
753/// All timestamps are in nanoseconds since Unix epoch for maximum precision.
754pub mod timestamp {
755 /// Get current timestamp in nanoseconds
756 ///
757 /// Returns the current high-resolution timestamp in nanoseconds since Unix epoch.
758 /// Uses the monitor's optimized time utilities for consistent timing.
759 pub fn now_nanos() -> u64 {
760 crate::monitor::utils::time::now_nanos()
761 }
762
763 /// Get system time in nanoseconds since UNIX epoch
764 ///
765 /// Returns the current system time in nanoseconds since Unix epoch.
766 /// Uses standard library SystemTime for compatibility.
767 pub fn system_time_nanos() -> u64 {
768 std::time::SystemTime::now()
769 .duration_since(std::time::UNIX_EPOCH)
770 .unwrap_or_default()
771 .as_nanos() as u64
772 }
773}
774
775#[cfg(test)]
776mod tests {
777 use super::*;
778 use rust_decimal_macros::dec;
779
780 #[test]
781 fn test_trade_serialization_roundtrip() {
782 let trade = TradeRecord {
783 timestamp_exchange: 1234567890123456789,
784 timestamp_system: 1234567890123456790,
785 symbol: "BTCUSDT".into(),
786 exchange: "binance".into(),
787 price: dec!(50000.123),
788 quantity: dec!(0.001),
789 side: TradeSide::Buy,
790 trade_id: "12345".into(),
791 buyer_order_id: Some("buyer123".into()),
792 seller_order_id: Some("seller456".into()),
793 sequence: 100,
794 };
795
796 let serialized = TradeSerializer::serialize_trade(&trade).unwrap();
797 let deserialized = TradeSerializer::deserialize_trade(&serialized).unwrap();
798
799 assert_eq!(trade.timestamp_exchange, deserialized.timestamp_exchange);
800 assert_eq!(trade.symbol, deserialized.symbol);
801 assert_eq!(trade.price, deserialized.price);
802 assert_eq!(trade.side, deserialized.side);
803 }
804
805 #[test]
806 fn test_orderbook_serialization_roundtrip() {
807 let orderbook = OrderBookRecord {
808 timestamp_exchange: 1234567890123456789,
809 timestamp_system: 1234567890123456790,
810 symbol: "BTCUSDT".into(),
811 exchange: "binance".into(),
812 bids: vec![PriceLevel {
813 price: dec!(50000.0),
814 quantity: dec!(1.0),
815 order_count: Some(5),
816 }],
817 asks: vec![PriceLevel {
818 price: dec!(50001.0),
819 quantity: dec!(2.0),
820 order_count: Some(3),
821 }],
822 sequence: 200,
823 checksum: Some("abc123".into()),
824 };
825
826 let serialized = OrderBookSerializer::serialize_orderbook(&orderbook).unwrap();
827 let deserialized = OrderBookSerializer::deserialize_orderbook(&serialized).unwrap();
828
829 assert_eq!(
830 orderbook.timestamp_exchange,
831 deserialized.timestamp_exchange
832 );
833 assert_eq!(orderbook.symbol, deserialized.symbol);
834 assert_eq!(orderbook.bids.len(), deserialized.bids.len());
835 assert_eq!(orderbook.asks.len(), deserialized.asks.len());
836 }
837}