rusty_ems/exchanges/coinbase/
zero_alloc_coinbase.rs

1//! Zero-allocation message processing for Coinbase WebSocket
2//!
3//! This module demonstrates how to process Coinbase WebSocket messages
4//! with zero heap allocations using borrowed JSON values.
5
6use anyhow::Result;
7use flume::Sender;
8use log::{debug, error, warn};
9use parking_lot::RwLock;
10use quanta::Clock;
11use rust_decimal::Decimal;
12use rusty_common::utils::id_generation;
13use rusty_common::zerocopy::{BorrowedValueExt, WebSocketJsonZeroCopy};
14use rusty_common::{FxHashMap, SmartString};
15use simd_json::value::borrowed::Value as BorrowedValue;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18
19use crate::execution_engine::ExecutionReport;
20use rusty_model::enums::OrderStatus;
21use rusty_model::instruments::InstrumentId;
22use rusty_model::venues::Venue;
23
24/// Zero-allocation Coinbase message processor
25pub struct ZeroAllocCoinbaseProcessor {
26    /// Sequence tracker for reliable message ordering
27    sequence_tracker: Arc<RwLock<FxHashMap<SmartString, u64>>>,
28
29    /// Last heartbeat timestamp
30    last_heartbeat: Arc<AtomicU64>,
31
32    /// Connection state
33    is_authenticated: Arc<AtomicBool>,
34
35    /// Clock for timestamps
36    clock: Clock,
37}
38
39impl Default for ZeroAllocCoinbaseProcessor {
40    fn default() -> Self {
41        Self::new()
42    }
43}
44
45impl ZeroAllocCoinbaseProcessor {
46    /// Create a new processor
47    #[must_use]
48    pub fn new() -> Self {
49        Self {
50            sequence_tracker: Arc::new(RwLock::new(FxHashMap::default())),
51            last_heartbeat: Arc::new(AtomicU64::new(0)),
52            is_authenticated: Arc::new(AtomicBool::new(false)),
53            clock: Clock::new(),
54        }
55    }
56
57    /// Process a WebSocket message with zero allocations
58    pub async fn process_message(
59        &self,
60        message_bytes: &mut [u8],
61        report_sender: &Option<Sender<ExecutionReport>>,
62    ) -> Result<()> {
63        // Parse with zero allocations
64        let value = WebSocketJsonZeroCopy::parse_to_borrowed(message_bytes)?;
65
66        // Get message type without allocation
67        if let Some(msg_type) = value.get_str("type") {
68            match msg_type {
69                "subscriptions" => {
70                    self.handle_subscriptions(&value);
71                }
72                "heartbeat" => {
73                    self.handle_heartbeat(&value);
74                }
75                "ticker" => {
76                    self.handle_ticker(&value)?;
77                }
78                "snapshot" => {
79                    self.handle_snapshot(&value)?;
80                }
81                "l2update" => {
82                    self.handle_l2_update(&value)?;
83                }
84                "match" | "last_match" => {
85                    self.handle_match(&value)?;
86                }
87                "received" | "open" | "done" | "change" => {
88                    self.handle_order_update(&value, msg_type, report_sender)?;
89                }
90                "error" => {
91                    self.handle_error(&value);
92                }
93                _ => {
94                    debug!("Unhandled message type: {msg_type}");
95                }
96            }
97        }
98
99        Ok(())
100    }
101
102    /// Handle subscription confirmation
103    fn handle_subscriptions(&self, value: &BorrowedValue) {
104        debug!("Subscription confirmed");
105        self.is_authenticated.store(true, Ordering::Relaxed);
106    }
107
108    /// Handle heartbeat with sequence tracking
109    fn handle_heartbeat(&self, value: &BorrowedValue) {
110        // Update heartbeat timestamp
111        self.last_heartbeat
112            .store(self.clock.raw() / 1_000_000_000, Ordering::Relaxed);
113
114        // Track sequence without allocation
115        if let Some(sequence) = value.get_u64("sequence")
116            && let Some(product_id) = value.get_str("product_id")
117        {
118            let mut tracker = self.sequence_tracker.write();
119
120            // Only allocate if this is a new product
121            if let Some(last_seq) = tracker.get(product_id) {
122                if sequence > last_seq + 1 {
123                    warn!("Sequence gap for {product_id}: {last_seq} -> {sequence}");
124                }
125                // Update in-place without allocation
126                if let Some(entry) = tracker.get_mut(product_id) {
127                    *entry = sequence;
128                }
129            } else {
130                // First time seeing this product - allocate once
131                tracker.insert(SmartString::from(product_id), sequence);
132            }
133        }
134    }
135
136    /// Handle ticker updates with zero allocations
137    fn handle_ticker(&self, value: &BorrowedValue) -> Result<()> {
138        if let Some(product_id) = value.get_str("product_id")
139            && let Some(price) = value.get_str("price")
140        {
141            // Process without allocation - just log for demo
142            debug!("Ticker {product_id}: {price}");
143        }
144        Ok(())
145    }
146
147    /// Handle order book snapshot
148    fn handle_snapshot(&self, value: &BorrowedValue) -> Result<()> {
149        if let Some(product_id) = value.get_str("product_id") {
150            debug!("L2 snapshot for {product_id}");
151            // In real implementation, would update order book without allocations
152        }
153        Ok(())
154    }
155
156    /// Handle L2 order book update with sequence checking
157    fn handle_l2_update(&self, value: &BorrowedValue) -> Result<()> {
158        if let Some(sequence) = value.get_u64("sequence")
159            && let Some(product_id) = value.get_str("product_id")
160        {
161            let mut tracker = self.sequence_tracker.write();
162
163            // Check sequence without allocation
164            if let Some(&last_seq) = tracker.get(product_id) {
165                if sequence <= last_seq {
166                    // Out of order - ignore
167                    return Ok(());
168                }
169                if sequence > last_seq + 1 {
170                    warn!("L2 gap for {product_id}: {last_seq} -> {sequence}");
171                }
172            }
173
174            // Update sequence (may allocate for new products)
175            tracker
176                .entry(SmartString::from(product_id))
177                .and_modify(|e| *e = sequence)
178                .or_insert(sequence);
179
180            // Process L2 update without allocation
181            debug!("L2 update for {product_id} seq {sequence}");
182        }
183        Ok(())
184    }
185
186    /// Handle trade match
187    fn handle_match(&self, value: &BorrowedValue) -> Result<()> {
188        if let Some(product_id) = value.get_str("product_id")
189            && let Some(price) = value.get_str("price")
190            && let Some(size) = value.get_str("size")
191        {
192            debug!("Match {product_id}: {size} @ {price}");
193        }
194        Ok(())
195    }
196
197    /// Handle order update from user channel
198    fn handle_order_update(
199        &self,
200        value: &BorrowedValue,
201        msg_type: &str,
202        report_sender: &Option<Sender<ExecutionReport>>,
203    ) -> Result<()> {
204        // Extract fields without allocation
205        let order_id = value.get_str("order_id").unwrap_or("");
206        let client_oid = value.get_str("client_oid").unwrap_or("");
207        let product_id = value.get_str("product_id").unwrap_or("");
208
209        // Determine execution type and status
210        let (exec_type, order_status) = match msg_type {
211            "received" => ("New", OrderStatus::New),
212            "open" => ("New", OrderStatus::New),
213            "done" => match value.get_str("reason").unwrap_or("") {
214                "filled" => ("Trade", OrderStatus::Filled),
215                "canceled" => ("Canceled", OrderStatus::Cancelled),
216                _ => ("Rejected", OrderStatus::Rejected),
217            },
218            "change" => ("Replaced", OrderStatus::New),
219            _ => {
220                log::warn!("Unknown Coinbase WebSocket message type in zero-alloc handler");
221                ("Unknown", OrderStatus::Unknown)
222            }
223        };
224
225        // Only allocate when we need to send a report
226        if let Some(sender) = report_sender {
227            // Parse numeric values only when needed
228            let price = value
229                .get_str("price")
230                .and_then(|s| Decimal::from_str_exact(s).ok())
231                .unwrap_or(Decimal::ZERO);
232
233            let quantity = value
234                .get_str("size")
235                .and_then(|s| Decimal::from_str_exact(s).ok())
236                .unwrap_or(Decimal::ZERO);
237
238            let leaves_qty = value
239                .get_str("remaining_size")
240                .and_then(|s| Decimal::from_str_exact(s).ok())
241                .unwrap_or(Decimal::ZERO);
242
243            // Create execution report
244            let report = ExecutionReport {
245                id: id_generation::generate_uuid_id(),
246                order_id: SmartString::from(order_id),
247                exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
248                system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
249                instrument_id: InstrumentId::new(SmartString::from(product_id), Venue::Coinbase),
250                status: order_status,
251                filled_quantity: if msg_type == "match" {
252                    quantity
253                } else {
254                    Decimal::ZERO
255                },
256                remaining_quantity: leaves_qty,
257                execution_price: if msg_type == "match" {
258                    Some(price)
259                } else {
260                    None
261                },
262                reject_reason: if order_status == OrderStatus::Rejected {
263                    value.get_str("reject_reason").map(SmartString::from)
264                } else {
265                    None
266                },
267                exchange_execution_id: value.get_str("order_id").map(SmartString::from),
268                is_final: matches!(
269                    order_status,
270                    OrderStatus::Filled | OrderStatus::Cancelled | OrderStatus::Rejected
271                ),
272            };
273
274            // Send report
275            if let Err(e) = sender.send(report) {
276                error!("Failed to send execution report: {e}");
277            }
278        }
279
280        Ok(())
281    }
282
283    /// Handle error messages
284    fn handle_error(&self, value: &BorrowedValue) {
285        let message = value.get_str("message").unwrap_or("Unknown error");
286        error!("WebSocket error: {message}");
287    }
288}
289
290/// Benchmark comparison: zero-alloc vs traditional parsing
291#[cfg(test)]
292mod benchmarks {
293    use super::*;
294    use std::time::Instant;
295
296    #[test]
297    fn benchmark_zero_alloc_vs_traditional() {
298        let json = r#"{"type":"match","trade_id":123456,"sequence":789,"product_id":"BTC-USD","price":"50000.50","size":"0.001","side":"buy","time":"2024-01-01T00:00:00.000Z"}"#;
299
300        // Zero-allocation parsing
301        let start = Instant::now();
302        for _ in 0..10000 {
303            let mut bytes = json.as_bytes().to_vec();
304            let value = WebSocketJsonZeroCopy::parse_to_borrowed(&mut bytes).unwrap();
305            let _ = value.get_str("product_id");
306            let _ = value.get_str("price");
307            let _ = value.get_str("size");
308        }
309        let zero_alloc_time = start.elapsed();
310
311        // Traditional parsing with allocations
312        let start = Instant::now();
313        for _ in 0..10000 {
314            let mut json_copy = json.to_string();
315            let parsed: simd_json::OwnedValue =
316                unsafe { simd_json::from_str(&mut json_copy).unwrap() };
317            if let simd_json::OwnedValue::String(ref _s) = parsed["product_id"] {}
318            if let simd_json::OwnedValue::String(ref _s) = parsed["price"] {}
319            if let simd_json::OwnedValue::String(ref _s) = parsed["size"] {}
320        }
321        let traditional_time = start.elapsed();
322
323        println!("Zero-alloc: {zero_alloc_time:?}, Traditional: {traditional_time:?}");
324        println!(
325            "Speedup: {:.2}x",
326            traditional_time.as_nanos() as f64 / zero_alloc_time.as_nanos() as f64
327        );
328
329        // Zero-alloc should be significantly faster
330        assert!(zero_alloc_time < traditional_time);
331    }
332}