rusty_ems/exchanges/coinbase/
zero_alloc_coinbase.rs1use anyhow::Result;
7use flume::Sender;
8use log::{debug, error, warn};
9use parking_lot::RwLock;
10use quanta::Clock;
11use rust_decimal::Decimal;
12use rusty_common::utils::id_generation;
13use rusty_common::zerocopy::{BorrowedValueExt, WebSocketJsonZeroCopy};
14use rusty_common::{FxHashMap, SmartString};
15use simd_json::value::borrowed::Value as BorrowedValue;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18
19use crate::execution_engine::ExecutionReport;
20use rusty_model::enums::OrderStatus;
21use rusty_model::instruments::InstrumentId;
22use rusty_model::venues::Venue;
23
24pub struct ZeroAllocCoinbaseProcessor {
26 sequence_tracker: Arc<RwLock<FxHashMap<SmartString, u64>>>,
28
29 last_heartbeat: Arc<AtomicU64>,
31
32 is_authenticated: Arc<AtomicBool>,
34
35 clock: Clock,
37}
38
39impl Default for ZeroAllocCoinbaseProcessor {
40 fn default() -> Self {
41 Self::new()
42 }
43}
44
45impl ZeroAllocCoinbaseProcessor {
46 #[must_use]
48 pub fn new() -> Self {
49 Self {
50 sequence_tracker: Arc::new(RwLock::new(FxHashMap::default())),
51 last_heartbeat: Arc::new(AtomicU64::new(0)),
52 is_authenticated: Arc::new(AtomicBool::new(false)),
53 clock: Clock::new(),
54 }
55 }
56
57 pub async fn process_message(
59 &self,
60 message_bytes: &mut [u8],
61 report_sender: &Option<Sender<ExecutionReport>>,
62 ) -> Result<()> {
63 let value = WebSocketJsonZeroCopy::parse_to_borrowed(message_bytes)?;
65
66 if let Some(msg_type) = value.get_str("type") {
68 match msg_type {
69 "subscriptions" => {
70 self.handle_subscriptions(&value);
71 }
72 "heartbeat" => {
73 self.handle_heartbeat(&value);
74 }
75 "ticker" => {
76 self.handle_ticker(&value)?;
77 }
78 "snapshot" => {
79 self.handle_snapshot(&value)?;
80 }
81 "l2update" => {
82 self.handle_l2_update(&value)?;
83 }
84 "match" | "last_match" => {
85 self.handle_match(&value)?;
86 }
87 "received" | "open" | "done" | "change" => {
88 self.handle_order_update(&value, msg_type, report_sender)?;
89 }
90 "error" => {
91 self.handle_error(&value);
92 }
93 _ => {
94 debug!("Unhandled message type: {msg_type}");
95 }
96 }
97 }
98
99 Ok(())
100 }
101
102 fn handle_subscriptions(&self, value: &BorrowedValue) {
104 debug!("Subscription confirmed");
105 self.is_authenticated.store(true, Ordering::Relaxed);
106 }
107
108 fn handle_heartbeat(&self, value: &BorrowedValue) {
110 self.last_heartbeat
112 .store(self.clock.raw() / 1_000_000_000, Ordering::Relaxed);
113
114 if let Some(sequence) = value.get_u64("sequence")
116 && let Some(product_id) = value.get_str("product_id")
117 {
118 let mut tracker = self.sequence_tracker.write();
119
120 if let Some(last_seq) = tracker.get(product_id) {
122 if sequence > last_seq + 1 {
123 warn!("Sequence gap for {product_id}: {last_seq} -> {sequence}");
124 }
125 if let Some(entry) = tracker.get_mut(product_id) {
127 *entry = sequence;
128 }
129 } else {
130 tracker.insert(SmartString::from(product_id), sequence);
132 }
133 }
134 }
135
136 fn handle_ticker(&self, value: &BorrowedValue) -> Result<()> {
138 if let Some(product_id) = value.get_str("product_id")
139 && let Some(price) = value.get_str("price")
140 {
141 debug!("Ticker {product_id}: {price}");
143 }
144 Ok(())
145 }
146
147 fn handle_snapshot(&self, value: &BorrowedValue) -> Result<()> {
149 if let Some(product_id) = value.get_str("product_id") {
150 debug!("L2 snapshot for {product_id}");
151 }
153 Ok(())
154 }
155
156 fn handle_l2_update(&self, value: &BorrowedValue) -> Result<()> {
158 if let Some(sequence) = value.get_u64("sequence")
159 && let Some(product_id) = value.get_str("product_id")
160 {
161 let mut tracker = self.sequence_tracker.write();
162
163 if let Some(&last_seq) = tracker.get(product_id) {
165 if sequence <= last_seq {
166 return Ok(());
168 }
169 if sequence > last_seq + 1 {
170 warn!("L2 gap for {product_id}: {last_seq} -> {sequence}");
171 }
172 }
173
174 tracker
176 .entry(SmartString::from(product_id))
177 .and_modify(|e| *e = sequence)
178 .or_insert(sequence);
179
180 debug!("L2 update for {product_id} seq {sequence}");
182 }
183 Ok(())
184 }
185
186 fn handle_match(&self, value: &BorrowedValue) -> Result<()> {
188 if let Some(product_id) = value.get_str("product_id")
189 && let Some(price) = value.get_str("price")
190 && let Some(size) = value.get_str("size")
191 {
192 debug!("Match {product_id}: {size} @ {price}");
193 }
194 Ok(())
195 }
196
197 fn handle_order_update(
199 &self,
200 value: &BorrowedValue,
201 msg_type: &str,
202 report_sender: &Option<Sender<ExecutionReport>>,
203 ) -> Result<()> {
204 let order_id = value.get_str("order_id").unwrap_or("");
206 let client_oid = value.get_str("client_oid").unwrap_or("");
207 let product_id = value.get_str("product_id").unwrap_or("");
208
209 let (exec_type, order_status) = match msg_type {
211 "received" => ("New", OrderStatus::New),
212 "open" => ("New", OrderStatus::New),
213 "done" => match value.get_str("reason").unwrap_or("") {
214 "filled" => ("Trade", OrderStatus::Filled),
215 "canceled" => ("Canceled", OrderStatus::Cancelled),
216 _ => ("Rejected", OrderStatus::Rejected),
217 },
218 "change" => ("Replaced", OrderStatus::New),
219 _ => {
220 log::warn!("Unknown Coinbase WebSocket message type in zero-alloc handler");
221 ("Unknown", OrderStatus::Unknown)
222 }
223 };
224
225 if let Some(sender) = report_sender {
227 let price = value
229 .get_str("price")
230 .and_then(|s| Decimal::from_str_exact(s).ok())
231 .unwrap_or(Decimal::ZERO);
232
233 let quantity = value
234 .get_str("size")
235 .and_then(|s| Decimal::from_str_exact(s).ok())
236 .unwrap_or(Decimal::ZERO);
237
238 let leaves_qty = value
239 .get_str("remaining_size")
240 .and_then(|s| Decimal::from_str_exact(s).ok())
241 .unwrap_or(Decimal::ZERO);
242
243 let report = ExecutionReport {
245 id: id_generation::generate_uuid_id(),
246 order_id: SmartString::from(order_id),
247 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
248 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
249 instrument_id: InstrumentId::new(SmartString::from(product_id), Venue::Coinbase),
250 status: order_status,
251 filled_quantity: if msg_type == "match" {
252 quantity
253 } else {
254 Decimal::ZERO
255 },
256 remaining_quantity: leaves_qty,
257 execution_price: if msg_type == "match" {
258 Some(price)
259 } else {
260 None
261 },
262 reject_reason: if order_status == OrderStatus::Rejected {
263 value.get_str("reject_reason").map(SmartString::from)
264 } else {
265 None
266 },
267 exchange_execution_id: value.get_str("order_id").map(SmartString::from),
268 is_final: matches!(
269 order_status,
270 OrderStatus::Filled | OrderStatus::Cancelled | OrderStatus::Rejected
271 ),
272 };
273
274 if let Err(e) = sender.send(report) {
276 error!("Failed to send execution report: {e}");
277 }
278 }
279
280 Ok(())
281 }
282
283 fn handle_error(&self, value: &BorrowedValue) {
285 let message = value.get_str("message").unwrap_or("Unknown error");
286 error!("WebSocket error: {message}");
287 }
288}
289
290#[cfg(test)]
292mod benchmarks {
293 use super::*;
294 use std::time::Instant;
295
296 #[test]
297 fn benchmark_zero_alloc_vs_traditional() {
298 let json = r#"{"type":"match","trade_id":123456,"sequence":789,"product_id":"BTC-USD","price":"50000.50","size":"0.001","side":"buy","time":"2024-01-01T00:00:00.000Z"}"#;
299
300 let start = Instant::now();
302 for _ in 0..10000 {
303 let mut bytes = json.as_bytes().to_vec();
304 let value = WebSocketJsonZeroCopy::parse_to_borrowed(&mut bytes).unwrap();
305 let _ = value.get_str("product_id");
306 let _ = value.get_str("price");
307 let _ = value.get_str("size");
308 }
309 let zero_alloc_time = start.elapsed();
310
311 let start = Instant::now();
313 for _ in 0..10000 {
314 let mut json_copy = json.to_string();
315 let parsed: simd_json::OwnedValue =
316 unsafe { simd_json::from_str(&mut json_copy).unwrap() };
317 if let simd_json::OwnedValue::String(ref _s) = parsed["product_id"] {}
318 if let simd_json::OwnedValue::String(ref _s) = parsed["price"] {}
319 if let simd_json::OwnedValue::String(ref _s) = parsed["size"] {}
320 }
321 let traditional_time = start.elapsed();
322
323 println!("Zero-alloc: {zero_alloc_time:?}, Traditional: {traditional_time:?}");
324 println!(
325 "Speedup: {:.2}x",
326 traditional_time.as_nanos() as f64 / zero_alloc_time.as_nanos() as f64
327 );
328
329 assert!(zero_alloc_time < traditional_time);
331 }
332}