rusty_feeder/optimization/
zero_alloc_parser.rs1use dashmap::DashMap;
82use parking_lot::Mutex;
83use simd_json::prelude::*;
84use simd_json::{BorrowedValue, OwnedValue, StaticNode};
85use std::sync::Arc;
86use std::sync::atomic::{AtomicU64, Ordering};
87
88#[derive(Debug, Clone, Copy, PartialEq)]
90pub enum MessageType {
91 Trade,
93 Level2Update,
95 Ticker,
97 Heartbeat,
99 Subscription,
101 Error,
103 Unknown,
105}
106
107impl MessageType {
108 #[inline(always)]
110 pub fn from_borrowed_value(value: &BorrowedValue) -> Self {
111 match value.get("type").and_then(|v| v.as_str()) {
112 Some("match") | Some("trade") => Self::Trade,
113 Some("l2update") | Some("level2") => Self::Level2Update,
114 Some("ticker") => Self::Ticker,
115 Some("heartbeat") | Some("ping") => Self::Heartbeat,
116 Some("subscriptions") => Self::Subscription,
117 Some("error") => Self::Error,
118 _ => Self::Unknown,
119 }
120 }
121
122 #[inline(always)]
124 pub fn from_owned_value(value: &OwnedValue) -> Self {
125 match value.get("type").and_then(|v| v.as_str()) {
126 Some("match") | Some("trade") => Self::Trade,
127 Some("l2update") | Some("level2") => Self::Level2Update,
128 Some("ticker") => Self::Ticker,
129 Some("heartbeat") | Some("ping") => Self::Heartbeat,
130 Some("subscriptions") => Self::Subscription,
131 Some("error") => Self::Error,
132 _ => Self::Unknown,
133 }
134 }
135}
136
137pub struct ZeroAllocParser {
153 type_cache: Arc<DashMap<u64, MessageType>>,
155
156 buffer_pool: Arc<Mutex<Vec<Vec<u8>>>>,
158
159 max_buffer_size: usize,
161
162 stats: Arc<AtomicParserStats>,
164}
165
166#[derive(Debug, Default, Clone)]
168pub struct ParserStats {
169 pub total_parsed: u64,
171
172 pub cache_hits: u64,
174
175 pub cache_misses: u64,
177
178 pub zero_copy_operations: u64,
180
181 pub buffer_reuses: u64,
183
184 pub avg_parse_time_ns: u64,
186
187 pub total_parse_time_ns: u64,
189}
190
191#[derive(Debug)]
193pub struct AtomicParserStats {
194 pub total_parsed: AtomicU64,
196
197 pub cache_hits: AtomicU64,
199
200 pub cache_misses: AtomicU64,
202
203 pub zero_copy_operations: AtomicU64,
205
206 pub buffer_reuses: AtomicU64,
208
209 pub avg_parse_time_ns: AtomicU64,
211
212 pub total_parse_time_ns: AtomicU64,
214}
215
216impl Default for AtomicParserStats {
217 fn default() -> Self {
218 Self {
219 total_parsed: AtomicU64::new(0),
220 cache_hits: AtomicU64::new(0),
221 cache_misses: AtomicU64::new(0),
222 zero_copy_operations: AtomicU64::new(0),
223 buffer_reuses: AtomicU64::new(0),
224 avg_parse_time_ns: AtomicU64::new(0),
225 total_parse_time_ns: AtomicU64::new(0),
226 }
227 }
228}
229
230impl AtomicParserStats {
231 pub fn to_stats(&self) -> ParserStats {
233 ParserStats {
234 total_parsed: self.total_parsed.load(Ordering::Relaxed),
235 cache_hits: self.cache_hits.load(Ordering::Relaxed),
236 cache_misses: self.cache_misses.load(Ordering::Relaxed),
237 zero_copy_operations: self.zero_copy_operations.load(Ordering::Relaxed),
238 buffer_reuses: self.buffer_reuses.load(Ordering::Relaxed),
239 avg_parse_time_ns: self.avg_parse_time_ns.load(Ordering::Relaxed),
240 total_parse_time_ns: self.total_parse_time_ns.load(Ordering::Relaxed),
241 }
242 }
243
244 pub fn reset(&self) {
246 self.total_parsed.store(0, Ordering::Relaxed);
247 self.cache_hits.store(0, Ordering::Relaxed);
248 self.cache_misses.store(0, Ordering::Relaxed);
249 self.zero_copy_operations.store(0, Ordering::Relaxed);
250 self.buffer_reuses.store(0, Ordering::Relaxed);
251 self.avg_parse_time_ns.store(0, Ordering::Relaxed);
252 self.total_parse_time_ns.store(0, Ordering::Relaxed);
253 }
254}
255
256impl ZeroAllocParser {
257 #[must_use]
259 pub fn new() -> Self {
260 Self::with_capacity(1024, 128 * 1024) }
262
263 #[must_use]
265 pub fn with_capacity(initial_buffers: usize, max_buffer_size: usize) -> Self {
266 let mut buffer_pool = Vec::with_capacity(initial_buffers);
267
268 for _ in 0..initial_buffers.min(16) {
270 buffer_pool.push(Vec::with_capacity(4096)); }
272
273 Self {
274 type_cache: Arc::new(DashMap::new()),
275 buffer_pool: Arc::new(Mutex::new(buffer_pool)),
276 max_buffer_size,
277 stats: Arc::new(AtomicParserStats::default()),
278 }
279 }
280
281 #[inline]
286 pub fn parse_message(&self, text: &str) -> Result<(MessageType, OwnedValue), &'static str> {
287 let start_time = quanta::Instant::now();
288
289 let mut buffer = self.get_buffer();
291
292 if buffer.capacity() < text.len() {
294 buffer.reserve(text.len() - buffer.capacity());
295 }
296
297 buffer.clear();
299 buffer.extend_from_slice(text.as_bytes());
300
301 match simd_json::to_owned_value(&mut buffer) {
303 Ok(json) => {
304 let msg_type = self.get_cached_message_type_owned(&json);
305
306 let elapsed = start_time.elapsed().as_nanos() as u64;
307 let total_parsed = self.stats.total_parsed.fetch_add(1, Ordering::Relaxed) + 1;
308 let total_time = self
309 .stats
310 .total_parse_time_ns
311 .fetch_add(elapsed, Ordering::Relaxed)
312 + elapsed;
313 self.stats
314 .avg_parse_time_ns
315 .store(total_time / total_parsed, Ordering::Relaxed);
316
317 self.return_buffer(buffer);
319
320 Ok((msg_type, json))
321 }
322 Err(_) => {
323 self.return_buffer(buffer);
325 Err("Invalid JSON")
326 }
327 }
328 }
329
330 #[inline]
340 pub fn parse_message_with_closure<T, F>(&self, text: &str, f: F) -> Result<T, &'static str>
341 where
342 F: FnOnce(MessageType, &BorrowedValue) -> T,
343 {
344 let start_time = quanta::Instant::now();
345
346 let mut buffer = self.get_buffer();
348
349 if buffer.capacity() < text.len() {
351 buffer.reserve(text.len() - buffer.capacity());
352 }
353
354 buffer.clear();
356 buffer.extend_from_slice(text.as_bytes());
357
358 let result = {
360 match simd_json::to_borrowed_value(&mut buffer) {
361 Ok(json) => {
362 let msg_type = self.get_cached_message_type(&json);
363
364 let result = f(msg_type, &json);
366
367 let elapsed = start_time.elapsed().as_nanos() as u64;
369 let total_parsed = self.stats.total_parsed.fetch_add(1, Ordering::Relaxed) + 1;
370 let total_time = self
371 .stats
372 .total_parse_time_ns
373 .fetch_add(elapsed, Ordering::Relaxed)
374 + elapsed;
375 self.stats
376 .avg_parse_time_ns
377 .store(total_time / total_parsed, Ordering::Relaxed);
378 self.stats
379 .zero_copy_operations
380 .fetch_add(1, Ordering::Relaxed);
381
382 Ok(result)
383 }
384 Err(_) => Err("Invalid JSON"),
385 }
386 };
387
388 self.return_buffer(buffer);
390
391 result
392 }
393
394 fn get_cached_message_type(&self, json: &BorrowedValue) -> MessageType {
396 let type_hash = if let Some(type_str) = json.get("type").and_then(|v| v.as_str()) {
398 self.hash_str(type_str)
399 } else {
400 0
401 };
402
403 if let Some(cached_type) = self.type_cache.get(&type_hash) {
405 self.stats.cache_hits.fetch_add(1, Ordering::Relaxed);
406 return *cached_type;
407 }
408
409 let msg_type = MessageType::from_borrowed_value(json);
411
412 if type_hash != 0 {
413 self.type_cache.insert(type_hash, msg_type);
414 }
415
416 self.stats.cache_misses.fetch_add(1, Ordering::Relaxed);
417
418 msg_type
419 }
420
421 fn get_cached_message_type_owned(&self, json: &OwnedValue) -> MessageType {
423 let type_hash = if let Some(type_str) = json.get("type").and_then(|v| v.as_str()) {
425 self.hash_str(type_str)
426 } else {
427 0
428 };
429
430 if let Some(cached_type) = self.type_cache.get(&type_hash) {
432 self.stats.cache_hits.fetch_add(1, Ordering::Relaxed);
433 return *cached_type;
434 }
435
436 let msg_type = MessageType::from_owned_value(json);
438
439 if type_hash != 0 {
440 self.type_cache.insert(type_hash, msg_type);
441 }
442
443 self.stats.cache_misses.fetch_add(1, Ordering::Relaxed);
444
445 msg_type
446 }
447
448 fn hash_str(&self, s: &str) -> u64 {
450 let mut hash = 5381_u64;
451 for byte in s.bytes() {
452 hash = ((hash << 5).wrapping_add(hash)).wrapping_add(u64::from(byte));
453 }
454 hash
455 }
456
457 fn get_buffer(&self) -> Vec<u8> {
459 let mut pool = self.buffer_pool.lock();
460
461 if let Some(buffer) = pool.pop() {
462 self.stats.buffer_reuses.fetch_add(1, Ordering::Relaxed);
463 buffer
464 } else {
465 Vec::with_capacity(4096)
466 }
467 }
468
469 fn return_buffer(&self, mut buffer: Vec<u8>) {
471 if buffer.capacity() <= self.max_buffer_size {
473 buffer.clear(); let mut pool = self.buffer_pool.lock();
476 if pool.len() < 64 {
477 pool.push(buffer);
479 }
480 }
481 }
483
484 #[must_use]
486 pub fn stats(&self) -> ParserStats {
487 self.stats.to_stats()
488 }
489
490 pub fn reset_stats(&self) {
492 self.stats.reset();
493 }
494
495 #[inline]
497 pub fn extract_symbol<'a>(&self, json: &'a OwnedValue) -> Option<&'a str> {
498 json.get("product_id")
499 .or_else(|| json.get("symbol"))
500 .or_else(|| json.get("s"))
501 .and_then(|v| v.as_str())
502 }
503
504 #[inline]
506 pub fn extract_timestamp(&self, json: &OwnedValue) -> Option<u64> {
507 json.get("time")
508 .or_else(|| json.get("timestamp"))
509 .or_else(|| json.get("T"))
510 .and_then(|v| match v {
511 OwnedValue::String(s) => {
512 s.parse::<u64>().ok()
514 }
515 OwnedValue::Static(StaticNode::I64(i)) => Some(*i as u64),
516 OwnedValue::Static(StaticNode::U64(u)) => Some(*u),
517 OwnedValue::Static(StaticNode::F64(f)) => Some(*f as u64),
518 _ => None,
519 })
520 }
521
522 #[inline]
524 pub fn extract_price<'a>(&self, json: &'a OwnedValue) -> Option<&'a str> {
525 json.get("price")
526 .or_else(|| json.get("p"))
527 .and_then(|v| v.as_str())
528 }
529
530 #[inline]
532 pub fn extract_quantity<'a>(&self, json: &'a OwnedValue) -> Option<&'a str> {
533 json.get("size")
534 .or_else(|| json.get("quantity"))
535 .or_else(|| json.get("q"))
536 .and_then(|v| v.as_str())
537 }
538}
539
540impl Default for ZeroAllocParser {
541 fn default() -> Self {
542 Self::new()
543 }
544}
545
546thread_local! {
547 static PARSER: std::cell::RefCell<ZeroAllocParser> =
548 std::cell::RefCell::new(ZeroAllocParser::new());
549}
550
551#[inline]
553pub fn parse_message_fast(text: &str) -> Result<(MessageType, OwnedValue), &'static str> {
554 PARSER.with(|parser| parser.borrow().parse_message(text))
555}
556
557#[inline]
559pub fn parse_message_fast_with_closure<T, F>(text: &str, f: F) -> Result<T, &'static str>
560where
561 F: FnOnce(MessageType, &BorrowedValue) -> T,
562{
563 PARSER.with(|parser| parser.borrow().parse_message_with_closure(text, f))
564}
565
566#[inline]
568pub fn extract_trade_fields(json: &OwnedValue) -> Option<(&str, &str, &str, Option<u64>)> {
569 PARSER.with(|parser| {
570 let parser = parser.borrow();
571 let symbol = parser.extract_symbol(json)?;
572 let price = parser.extract_price(json)?;
573 let quantity = parser.extract_quantity(json)?;
574 let timestamp = parser.extract_timestamp(json);
575
576 Some((symbol, price, quantity, timestamp))
577 })
578}
579
580#[must_use]
582pub fn parser_stats() -> ParserStats {
583 PARSER.with(|parser| parser.borrow().stats())
584}
585
586#[cfg(test)]
587mod tests {
588 use super::*;
589
590 #[test]
591 fn test_zero_alloc_parser_creation() {
592 let parser = ZeroAllocParser::new();
593 let stats = parser.stats();
594 assert_eq!(stats.total_parsed, 0);
595 }
596
597 #[test]
598 fn test_message_type_parsing() {
599 let trade_json =
600 r#"{"type": "match", "product_id": "BTC-USD", "price": "50000", "size": "1.0"}"#;
601
602 let result = parse_message_fast(trade_json);
603 assert!(result.is_ok());
604
605 let (msg_type, json) = result.unwrap();
606 assert_eq!(msg_type, MessageType::Trade);
607
608 let symbol = json.get("product_id").unwrap().as_str().unwrap();
609 assert_eq!(symbol, "BTC-USD");
610 }
611
612 #[test]
613 fn test_field_extraction() {
614 let trade_json =
615 r#"{"type": "match", "product_id": "BTC-USD", "price": "50000.50", "size": "1.25"}"#;
616
617 let (_, json) = parse_message_fast(trade_json).unwrap();
618 let fields = extract_trade_fields(&json).unwrap();
619
620 assert_eq!(fields.0, "BTC-USD"); assert_eq!(fields.1, "50000.50"); assert_eq!(fields.2, "1.25"); }
624
625 #[test]
626 fn test_caching_performance() {
627 let parser = ZeroAllocParser::new();
628 let message = r#"{"type": "match", "product_id": "BTC-USD", "price": "50000"}"#;
629
630 let _ = parser.parse_message(message).unwrap();
632
633 let _ = parser.parse_message(message).unwrap();
635
636 let stats = parser.stats();
637 assert!(stats.cache_hits > 0);
638 }
639
640 #[test]
641 fn test_buffer_reuse() {
642 let parser = ZeroAllocParser::new();
643 let message1 = r#"{"type": "match", "product_id": "BTC-USD"}"#;
644 let message2 = r#"{"type": "ticker", "product_id": "ETH-USD"}"#;
645
646 let _ = parser.parse_message(message1).unwrap();
647 let _ = parser.parse_message(message2).unwrap();
648
649 let stats = parser.stats();
650 assert!(stats.buffer_reuses > 0);
651 }
652
653 #[test]
654 fn test_closure_based_parsing() {
655 let parser = ZeroAllocParser::new();
656 let message = r#"{"type": "match", "product_id": "BTC-USD", "price": "50000.0"}"#;
657
658 let result = parser
659 .parse_message_with_closure(message, |msg_type, json| {
660 let product_id = json.get("product_id").unwrap().as_str().unwrap();
661 let price = json.get("price").unwrap().as_str().unwrap();
662 (msg_type, product_id.to_string(), price.to_string())
663 })
664 .unwrap();
665
666 assert_eq!(result.0, MessageType::Trade);
667 assert_eq!(result.1, "BTC-USD");
668 assert_eq!(result.2, "50000.0");
669
670 let stats = parser.stats();
671 assert!(stats.zero_copy_operations > 0);
672 }
673
674 #[test]
675 fn test_thread_local_closure_parsing() {
676 let message = r#"{"type": "ticker", "product_id": "ETH-USD", "price": "3000.0"}"#;
677
678 let result = parse_message_fast_with_closure(message, |msg_type, json| {
679 let product_id = json.get("product_id").unwrap().as_str().unwrap();
680 (msg_type, product_id.to_string())
681 })
682 .unwrap();
683
684 assert_eq!(result.0, MessageType::Ticker);
685 assert_eq!(result.1, "ETH-USD");
686 }
687}