1use anyhow::{Result, bail};
12use async_trait::async_trait;
13use flume::Sender;
14use futures::{SinkExt, StreamExt};
15use log::{debug, error, info, warn};
16use parking_lot::RwLock;
17use quanta::Clock;
18use rust_decimal::Decimal;
19use rusty_common::SmartString;
20use rusty_common::auth::exchanges::coinbase::CoinbaseAuth;
21use rusty_common::collections::FxHashMap;
22use rusty_common::time;
23use rusty_common::utils::id_generation;
24use rusty_common::websocket::connector::{WebSocketSink, WebSocketStream};
25use rusty_common::websocket::{Message, WebSocketConfig};
26use rusty_model::{
27 data::orderbook::{OrderBook, PriceLevel},
28 enums::{OrderSide, OrderStatus, OrderType},
29 instruments::InstrumentId,
30 trading_order::Order,
31 venues::Venue,
32};
33use simd_json::prelude::{ValueAsArray, ValueAsScalar, ValueObjectAccess};
34use simd_json::{json, value::owned::Value as JsonValue};
35use smallvec::SmallVec;
36use std::collections::VecDeque;
37use std::sync::Arc;
38use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
39use std::time::Duration;
40use tokio::sync::RwLock as AsyncRwLock;
41use tokio::task::JoinHandle;
42use tokio::time::interval;
43use yawc;
44
45use crate::execution_engine::{Exchange, ExecutionReport};
47
48type Level2UpdateResult = Result<(SmartString, Vec<(SmartString, Decimal, Decimal)>)>;
50
51const COINBASE_WS_URL: &str = "wss://ws-feed.exchange.coinbase.com";
53const COINBASE_WS_DIRECT_URL: &str = "wss://ws-direct.exchange.coinbase.com";
54const COINBASE_WS_SANDBOX_URL: &str = "wss://ws-feed-public.sandbox.exchange.coinbase.com";
55const COINBASE_WS_DIRECT_SANDBOX_URL: &str = "wss://ws-direct.sandbox.exchange.coinbase.com";
56
57const HEARTBEAT_INTERVAL_SECONDS: u64 = 1;
59const RECONNECT_DELAY_SECONDS: u64 = 5;
60const MAX_RECONNECT_ATTEMPTS: u32 = 10;
61const SUBSCRIBE_TIMEOUT_SECONDS: u64 = 5;
62const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024; #[derive(Debug, Clone)]
66struct OrderInfo {
67 order_id: SmartString,
68 client_order_id: SmartString,
69 symbol: SmartString,
70 side: OrderSide,
71 order_type: OrderType,
72 price: Decimal,
73 quantity: Decimal,
74 remaining_quantity: Decimal,
75 executed_quantity: Decimal,
76 status: OrderStatus,
77 timestamp: u64,
78}
79
80#[derive(Debug)]
82struct SequenceTracker {
83 sequences: FxHashMap<SmartString, u64>,
85 missing_sequences: VecDeque<(SmartString, u64, u64)>,
87}
88
89impl SequenceTracker {
90 fn new() -> Self {
91 Self {
92 sequences: FxHashMap::default(),
93 missing_sequences: VecDeque::new(),
94 }
95 }
96
97 fn update_sequence(&mut self, product_id: &str, new_sequence: u64) -> bool {
99 let product_id = SmartString::from(product_id);
100
101 if let Some(&last_sequence) = self.sequences.get(&product_id) {
102 if new_sequence <= last_sequence {
103 return false;
105 }
106
107 if new_sequence > last_sequence + 1 {
108 self.missing_sequences.push_back((
110 product_id.clone(),
111 last_sequence + 1,
112 new_sequence - 1,
113 ));
114 warn!("Sequence gap detected for {product_id}: {last_sequence} -> {new_sequence}");
115 }
116 }
117
118 self.sequences.insert(product_id, new_sequence);
119 true
120 }
121}
122
123pub struct CoinbaseWebsocketTrading {
125 auth: Arc<CoinbaseAuth>,
127
128 ws_sink: Arc<AsyncRwLock<Option<WebSocketSink>>>,
130 ws_stream: Arc<AsyncRwLock<Option<WebSocketStream>>>,
131
132 sandbox: bool,
134 use_direct: bool,
135
136 is_connected: Arc<AtomicBool>,
138 is_authenticated: Arc<AtomicBool>,
139 last_heartbeat: Arc<AtomicU64>,
140 reconnect_attempts: Arc<AtomicU64>,
141
142 subscribed_products: Arc<RwLock<Vec<SmartString>>>,
144 subscribed_channels: Arc<RwLock<Vec<SmartString>>>,
145
146 orders: Arc<RwLock<FxHashMap<SmartString, OrderInfo>>>,
148 client_order_map: Arc<RwLock<FxHashMap<SmartString, SmartString>>>,
149
150 sequence_tracker: Arc<RwLock<SequenceTracker>>,
152
153 order_books: Arc<RwLock<FxHashMap<SmartString, OrderBook>>>,
155
156 clock: Clock,
158
159 heartbeat_handle: Arc<RwLock<Option<JoinHandle<()>>>>,
161 message_handle: Arc<RwLock<Option<JoinHandle<()>>>>,
162
163 report_sender: Arc<RwLock<Option<Sender<ExecutionReport>>>>,
165}
166
167impl CoinbaseWebsocketTrading {
168 pub fn new(auth: Arc<CoinbaseAuth>, sandbox: bool, use_direct: bool) -> Self {
170 let clock = Clock::new();
171
172 Self {
173 auth,
174 ws_sink: Arc::new(AsyncRwLock::new(None)),
175 ws_stream: Arc::new(AsyncRwLock::new(None)),
176 sandbox,
177 use_direct,
178 is_connected: Arc::new(AtomicBool::new(false)),
179 is_authenticated: Arc::new(AtomicBool::new(false)),
180 last_heartbeat: Arc::new(AtomicU64::new(0)),
181 reconnect_attempts: Arc::new(AtomicU64::new(0)),
182 subscribed_products: Arc::new(RwLock::new(Vec::new())),
183 subscribed_channels: Arc::new(RwLock::new(Vec::new())),
184 orders: Arc::new(RwLock::new(FxHashMap::default())),
185 client_order_map: Arc::new(RwLock::new(FxHashMap::default())),
186 sequence_tracker: Arc::new(RwLock::new(SequenceTracker::new())),
187 order_books: Arc::new(RwLock::new(FxHashMap::default())),
188 clock,
189 heartbeat_handle: Arc::new(RwLock::new(None)),
190 message_handle: Arc::new(RwLock::new(None)),
191 report_sender: Arc::new(RwLock::new(None)),
192 }
193 }
194
195 const fn get_ws_url(&self) -> &'static str {
197 match (self.sandbox, self.use_direct) {
198 (true, true) => COINBASE_WS_DIRECT_SANDBOX_URL,
199 (true, false) => COINBASE_WS_SANDBOX_URL,
200 (false, true) => COINBASE_WS_DIRECT_URL,
201 (false, false) => COINBASE_WS_URL,
202 }
203 }
204
205 async fn subscribe(&self, channels: Vec<&str>, product_ids: Vec<&str>) -> Result<()> {
207 let ws_subscription = self.auth.generate_ws_subscription(&channels)?;
209
210 let mut subscribe_msg = json!({
211 "type": ws_subscription.message_type,
212 "product_ids": product_ids,
213 "channels": ws_subscription.channels
214 });
215
216 if let Some(jwt) = ws_subscription.jwt {
218 subscribe_msg["jwt"] = json!(jwt);
219 }
220 if let Some(api_key) = ws_subscription.api_key {
221 subscribe_msg["key"] = json!(api_key);
222 }
223 if let Some(timestamp) = ws_subscription.timestamp {
224 subscribe_msg["timestamp"] = json!(timestamp);
225 }
226 if let Some(signature) = ws_subscription.signature {
227 subscribe_msg["signature"] = json!(signature);
228 }
229
230 let subscribe_msg_str = subscribe_msg.to_string();
231
232 if let Some(sink) = &mut *self.ws_sink.write().await {
233 sink.send(Message::text(subscribe_msg_str).to_frame_view())
234 .await?;
235
236 self.subscribed_products
238 .write()
239 .extend(product_ids.iter().map(|&p| SmartString::from(p)));
240 self.subscribed_channels
241 .write()
242 .extend(channels.iter().map(|&c| SmartString::from(c)));
243
244 debug!("Subscribed to channels: {channels:?} for products: {product_ids:?}");
245 } else {
246 bail!("WebSocket connection not established");
247 }
248
249 Ok(())
250 }
251
252 async fn connect_internal(&self, report_sender: Sender<ExecutionReport>) -> Result<()> {
254 *self.report_sender.write() = Some(report_sender.clone());
256
257 let config = WebSocketConfig::new(
258 rusty_common::types::Exchange::Coinbase,
259 self.get_ws_url().to_string(),
260 );
261
262 let ws_url = config.url.parse()?;
264 let ws = yawc::WebSocket::connect(ws_url).await?;
265 let (ws_sink, ws_stream) = ws.split();
266 *self.ws_sink.write().await = Some(ws_sink);
267 *self.ws_stream.write().await = Some(ws_stream);
268
269 self.is_connected.store(true, Ordering::Relaxed);
270
271 let product_ids: Vec<String> = {
273 self.subscribed_products
274 .read()
275 .iter()
276 .map(std::string::ToString::to_string)
277 .collect()
278 };
279
280 let product_id_refs: Vec<&str> = product_ids
281 .iter()
282 .map(std::string::String::as_str)
283 .collect();
284
285 self.subscribe(
286 vec!["heartbeat", "ticker", "matches", "user", "level2"],
287 product_id_refs,
288 )
289 .await?;
290
291 self.start_heartbeat_monitor();
293
294 self.start_message_processing();
296
297 info!("Coinbase WebSocket connected to {}", self.get_ws_url());
298 Ok(())
299 }
300
301 fn start_heartbeat_monitor(&self) {
303 let last_heartbeat = self.last_heartbeat.clone();
304 let clock = self.clock.clone();
305 let is_connected = self.is_connected.clone();
306
307 let handle = tokio::spawn(async move {
308 let mut interval = interval(Duration::from_secs(HEARTBEAT_INTERVAL_SECONDS * 2));
309
310 loop {
311 interval.tick().await;
312
313 if !is_connected.load(Ordering::Relaxed) {
314 break;
315 }
316
317 let last_hb = last_heartbeat.load(Ordering::Relaxed);
318 let now = clock.raw() / 1_000_000_000; if now - last_hb > HEARTBEAT_INTERVAL_SECONDS * 3 {
321 warn!("Heartbeat timeout detected, connection may be dead");
322 is_connected.store(false, Ordering::Relaxed);
323 break;
324 }
325 }
326
327 debug!("Heartbeat monitor task terminated");
328 });
329
330 *self.heartbeat_handle.write() = Some(handle);
331 }
332
333 fn start_message_processing(&self) {
335 let ws_stream = self.ws_stream.clone();
336 let last_heartbeat = self.last_heartbeat.clone();
337 let clock = self.clock.clone();
338 let is_connected = self.is_connected.clone();
339 let is_authenticated = self.is_authenticated.clone();
340 let orders = self.orders.clone();
341 let client_order_map = self.client_order_map.clone();
342 let sequence_tracker = self.sequence_tracker.clone();
343 let report_sender = self.report_sender.clone();
344 let order_books = self.order_books.clone();
345
346 let handle = tokio::spawn(async move {
347 loop {
348 if !is_connected.load(Ordering::Relaxed) {
349 break;
350 }
351
352 if let Some(stream) = &mut *ws_stream.write().await {
353 if let Some(frame) = stream.next().await {
354 let msg = Message::from_frame_view(frame);
355 match msg {
356 Message::Text(text) => {
357 if let Err(e) = Self::process_message(
358 &text,
359 &last_heartbeat,
360 &clock,
361 &is_authenticated,
362 &orders,
363 &client_order_map,
364 &sequence_tracker,
365 &report_sender,
366 &order_books,
367 )
368 .await
369 {
370 error!("Failed to process message: {e}");
371 }
372 }
373 Message::Binary(_) => {
374 warn!("Received unexpected binary message");
375 }
376 _ => {}
377 }
378 } else {
379 warn!("WebSocket stream closed");
380 is_connected.store(false, Ordering::Relaxed);
381 break;
382 }
383 }
384 }
385
386 debug!("Message processing task terminated");
387 });
388
389 *self.message_handle.write() = Some(handle);
390 }
391
392 #[allow(clippy::too_many_arguments)]
394 async fn process_message(
395 text: &str,
396 last_heartbeat: &Arc<AtomicU64>,
397 clock: &Clock,
398 is_authenticated: &Arc<AtomicBool>,
399 orders: &Arc<RwLock<FxHashMap<SmartString, OrderInfo>>>,
400 client_order_map: &Arc<RwLock<FxHashMap<SmartString, SmartString>>>,
401 sequence_tracker: &Arc<RwLock<SequenceTracker>>,
402 report_sender: &Arc<RwLock<Option<Sender<ExecutionReport>>>>,
403 order_books: &Arc<RwLock<FxHashMap<SmartString, OrderBook>>>,
404 ) -> Result<()> {
405 let mut text_bytes = text.as_bytes().to_vec();
406 let msg = simd_json::from_slice::<JsonValue>(&mut text_bytes)?;
407
408 let msg_type = msg
409 .get("type")
410 .and_then(|v| v.as_str())
411 .map(std::string::ToString::to_string);
412
413 if let Some(msg_type) = msg_type {
414 match msg_type.as_str() {
415 "subscriptions" => {
416 info!("Subscription confirmed: {text}");
417 is_authenticated.store(true, Ordering::Relaxed);
418 }
419 "heartbeat" => {
420 last_heartbeat.store(clock.raw() / 1_000_000_000, Ordering::Relaxed);
421
422 if let Some(sequence) = msg
423 .get("sequence")
424 .and_then(simd_json::prelude::ValueAsScalar::as_u64)
425 && let Some(product_id) = msg.get("product_id").and_then(|v| v.as_str())
426 {
427 sequence_tracker
428 .write()
429 .update_sequence(product_id, sequence);
430 }
431 }
432 "ticker" => {
433 Self::process_ticker(msg).await?;
435 }
436 "snapshot" => {
437 Self::process_snapshot_static(msg, order_books).await?;
439 }
440 "l2update" => {
441 if let Some(sequence) = msg
443 .get("sequence")
444 .and_then(simd_json::prelude::ValueAsScalar::as_u64)
445 && let Some(product_id) = msg.get("product_id").and_then(|v| v.as_str())
446 && sequence_tracker
447 .write()
448 .update_sequence(product_id, sequence)
449 {
450 Self::process_l2_update_static(msg, order_books).await?;
451 }
452 }
453 "match" | "last_match" => {
454 if let Some(sequence) = msg
456 .get("sequence")
457 .and_then(simd_json::prelude::ValueAsScalar::as_u64)
458 && let Some(product_id) = msg.get("product_id").and_then(|v| v.as_str())
459 {
460 sequence_tracker
461 .write()
462 .update_sequence(product_id, sequence);
463 }
464 Self::process_match(msg).await?;
465 }
466 "received" | "open" | "done" | "change" => {
467 Self::process_order_update(
469 msg,
470 &msg_type,
471 orders,
472 client_order_map,
473 report_sender,
474 )
475 .await?;
476 }
477 "error" => {
478 let message = msg
479 .get("message")
480 .and_then(|v| v.as_str())
481 .unwrap_or("Unknown error");
482 error!("WebSocket error: {message}");
483 }
484 _ => {
485 debug!("Unhandled message type: {msg_type}");
486 }
487 }
488 }
489
490 Ok(())
491 }
492
493 async fn process_ticker(msg: JsonValue) -> Result<()> {
495 debug!("Ticker update: {msg:?}");
497 Ok(())
498 }
499
500 async fn process_snapshot(&self, msg: JsonValue) -> Result<()> {
502 let order_book = Self::parse_level2_snapshot(&msg)?;
503 let product_id = order_book.symbol.clone();
504
505 self.order_books
507 .write()
508 .insert(product_id.clone(), order_book);
509
510 debug!("Processed L2 snapshot for {product_id}");
511 Ok(())
512 }
513
514 async fn process_snapshot_static(
516 msg: JsonValue,
517 order_books: &Arc<RwLock<FxHashMap<SmartString, OrderBook>>>,
518 ) -> Result<()> {
519 let order_book = Self::parse_level2_snapshot(&msg)?;
520 let product_id = order_book.symbol.clone();
521
522 order_books.write().insert(product_id.clone(), order_book);
524
525 debug!("Processed L2 snapshot for {product_id}");
526 Ok(())
527 }
528
529 async fn process_l2_update(&self, msg: JsonValue) -> Result<()> {
531 let (product_id, changes) = Self::parse_level2_update(&msg)?;
532
533 if let Some(order_book) = self.order_books.write().get_mut(&product_id) {
535 Self::apply_level2_update(order_book, &changes);
536 debug!("Applied {} L2 updates to {}", changes.len(), product_id);
537 } else {
538 warn!("Received L2 update for unknown product: {product_id}");
539 }
540
541 Ok(())
542 }
543
544 async fn process_l2_update_static(
546 msg: JsonValue,
547 order_books: &Arc<RwLock<FxHashMap<SmartString, OrderBook>>>,
548 ) -> Result<()> {
549 let (product_id, changes) = Self::parse_level2_update(&msg)?;
550
551 if let Some(order_book) = order_books.write().get_mut(&product_id) {
553 Self::apply_level2_update(order_book, &changes);
554 debug!("Applied {} L2 updates to {}", changes.len(), product_id);
555 } else {
556 warn!("Received L2 update for unknown product: {product_id}");
557 }
558
559 Ok(())
560 }
561
562 async fn process_match(msg: JsonValue) -> Result<()> {
564 debug!("Match: {msg:?}");
566 Ok(())
567 }
568
569 async fn process_order_update(
571 msg: JsonValue,
572 msg_type: &str,
573 orders: &Arc<RwLock<FxHashMap<SmartString, OrderInfo>>>,
574 client_order_map: &Arc<RwLock<FxHashMap<SmartString, SmartString>>>,
575 report_sender: &Arc<RwLock<Option<Sender<ExecutionReport>>>>,
576 ) -> Result<()> {
577 let order_id = msg.get("order_id").and_then(|v| v.as_str()).unwrap_or("");
578 let client_oid = msg.get("client_oid").and_then(|v| v.as_str()).unwrap_or("");
579 let product_id = msg.get("product_id").and_then(|v| v.as_str()).unwrap_or("");
580
581 if !client_oid.is_empty() && !order_id.is_empty() {
583 client_order_map
584 .write()
585 .insert(SmartString::from(client_oid), SmartString::from(order_id));
586 }
587
588 let (exec_type, order_status) = match msg_type {
589 "received" => ("New", OrderStatus::New),
590 "open" => ("New", OrderStatus::New),
591 "done" => {
592 let reason = msg.get("reason").and_then(|v| v.as_str()).unwrap_or("");
593 match reason {
594 "filled" => ("Trade", OrderStatus::Filled),
595 "canceled" => ("Canceled", OrderStatus::Cancelled),
596 _ => ("Rejected", OrderStatus::Rejected),
597 }
598 }
599 "match" => ("Trade", OrderStatus::PartiallyFilled),
600 "change" => ("Replaced", OrderStatus::New),
601 _ => {
602 log::warn!("Unknown Coinbase WebSocket message type: {msg_type}");
603 ("Unknown", OrderStatus::Unknown)
604 }
605 };
606
607 let executed_quantity = if msg_type == "match" {
609 msg.get("size")
610 .and_then(|v| v.as_str())
611 .and_then(|s| Decimal::from_str_exact(s).ok())
612 .unwrap_or(Decimal::ZERO)
613 } else {
614 Decimal::ZERO
615 };
616
617 let remaining_quantity = msg
618 .get("remaining_size")
619 .and_then(|v| v.as_str())
620 .and_then(|s| Decimal::from_str_exact(s).ok())
621 .unwrap_or(Decimal::ZERO);
622
623 let total_quantity = msg
624 .get("size")
625 .and_then(|v| v.as_str())
626 .and_then(|s| Decimal::from_str_exact(s).ok())
627 .unwrap_or(Decimal::ZERO);
628
629 let execution_price = if msg_type == "match" {
630 msg.get("price")
631 .and_then(|v| v.as_str())
632 .and_then(|s| Decimal::from_str_exact(s).ok())
633 } else {
634 None
635 };
636
637 let reject_reason = if msg_type == "done" {
638 let reason = msg.get("reason").and_then(|v| v.as_str()).unwrap_or("");
639 if reason == "canceled" || reason == "rejected" {
640 Some(reason.into())
641 } else {
642 None
643 }
644 } else {
645 None
646 };
647
648 let report = ExecutionReport {
650 id: id_generation::generate_multi_part_id(
651 &["coinbase", client_oid, &uuid::Uuid::new_v4().to_string()],
652 '_',
653 ),
654 order_id: order_id.into(),
655 exchange_timestamp: time::get_epoch_timestamp_ns(),
656 system_timestamp: time::get_epoch_timestamp_ns(),
657 instrument_id: InstrumentId::new(product_id, Venue::Coinbase),
658 status: order_status,
659 filled_quantity: executed_quantity,
660 remaining_quantity,
661 execution_price,
662 reject_reason,
663 exchange_execution_id: Some(order_id.into()),
664 is_final: matches!(
665 order_status,
666 OrderStatus::Filled | OrderStatus::Cancelled | OrderStatus::Rejected
667 ),
668 };
669
670 if let Some(sender) = &*report_sender.read()
672 && let Err(e) = sender.send(report)
673 {
674 error!("Failed to send execution report: {e}");
675 }
676
677 Ok(())
678 }
679
680 async fn place_order_rest(&self, order: &Order) -> Result<SmartString> {
682 bail!("Order placement via WebSocket not supported - use REST API")
686 }
687
688 async fn cancel_order_rest(&self, order_id: &str) -> Result<()> {
690 bail!("Order cancellation via WebSocket not supported - use REST API")
693 }
694}
695
696impl CoinbaseWebsocketTrading {
698 pub fn parse_level2_snapshot(msg: &JsonValue) -> Result<OrderBook> {
700 let product_id = msg
701 .get("product_id")
702 .and_then(|v| v.as_str())
703 .ok_or_else(|| anyhow::anyhow!("Missing product_id in snapshot"))?;
704
705 let bids = msg
706 .get("bids")
707 .and_then(|v| v.as_array())
708 .ok_or_else(|| anyhow::anyhow!("Missing bids in snapshot"))?;
709
710 let asks = msg
711 .get("asks")
712 .and_then(|v| v.as_array())
713 .ok_or_else(|| anyhow::anyhow!("Missing asks in snapshot"))?;
714
715 let mut bid_levels = SmallVec::with_capacity(bids.len());
717 for bid in bids {
718 if let Some(arr) = bid.as_array()
719 && arr.len() >= 2
720 {
721 let price = arr[0]
722 .as_str()
723 .and_then(|s| s.parse::<Decimal>().ok())
724 .ok_or_else(|| anyhow::anyhow!("Invalid bid price"))?;
725 let quantity = arr[1]
726 .as_str()
727 .and_then(|s| s.parse::<Decimal>().ok())
728 .ok_or_else(|| anyhow::anyhow!("Invalid bid quantity"))?;
729 bid_levels.push(PriceLevel::new(price, quantity));
730 }
731 }
732
733 let mut ask_levels = SmallVec::with_capacity(asks.len());
735 for ask in asks {
736 if let Some(arr) = ask.as_array()
737 && arr.len() >= 2
738 {
739 let price = arr[0]
740 .as_str()
741 .and_then(|s| s.parse::<Decimal>().ok())
742 .ok_or_else(|| anyhow::anyhow!("Invalid ask price"))?;
743 let quantity = arr[1]
744 .as_str()
745 .and_then(|s| s.parse::<Decimal>().ok())
746 .ok_or_else(|| anyhow::anyhow!("Invalid ask quantity"))?;
747 ask_levels.push(PriceLevel::new(price, quantity));
748 }
749 }
750
751 bid_levels.sort_by(|a: &PriceLevel, b: &PriceLevel| b.price.cmp(&a.price));
753 ask_levels.sort_by(|a: &PriceLevel, b: &PriceLevel| a.price.cmp(&b.price));
754
755 Ok(OrderBook::new(
756 product_id,
757 time::get_epoch_timestamp_ns(),
758 time::get_epoch_timestamp_ns(),
759 bid_levels,
760 ask_levels,
761 ))
762 }
763
764 pub fn parse_level2_update(msg: &JsonValue) -> Level2UpdateResult {
766 let product_id = msg
767 .get("product_id")
768 .and_then(|v| v.as_str())
769 .ok_or_else(|| anyhow::anyhow!("Missing product_id in update"))?;
770
771 let changes = msg
772 .get("changes")
773 .and_then(|v| v.as_array())
774 .ok_or_else(|| anyhow::anyhow!("Missing changes in update"))?;
775
776 let mut parsed_changes = Vec::with_capacity(changes.len());
777
778 for change in changes {
779 if let Some(arr) = change.as_array()
780 && arr.len() >= 3
781 {
782 let side = arr[0]
783 .as_str()
784 .ok_or_else(|| anyhow::anyhow!("Invalid side in change"))?;
785 let price = arr[1]
786 .as_str()
787 .and_then(|s| s.parse::<Decimal>().ok())
788 .ok_or_else(|| anyhow::anyhow!("Invalid price in change"))?;
789 let quantity = arr[2]
790 .as_str()
791 .and_then(|s| s.parse::<Decimal>().ok())
792 .ok_or_else(|| anyhow::anyhow!("Invalid quantity in change"))?;
793
794 parsed_changes.push((side.into(), price, quantity));
795 }
796 }
797
798 Ok((product_id.into(), parsed_changes))
799 }
800
801 pub fn apply_level2_update(
803 order_book: &mut OrderBook,
804 changes: &[(SmartString, Decimal, Decimal)],
805 ) {
806 for (side, price, quantity) in changes {
807 match side.as_str() {
808 "buy" => {
809 if quantity.is_zero() {
811 order_book.bids.retain(|level| level.price != *price);
813 } else {
814 if let Some(level) = order_book.bids.iter_mut().find(|l| l.price == *price)
816 {
817 level.quantity = *quantity;
818 } else {
819 order_book.bids.push(PriceLevel::new(*price, *quantity));
821 order_book.bids.sort_by(|a, b| b.price.cmp(&a.price));
822 }
823 }
824 }
825 "sell" => {
826 if quantity.is_zero() {
828 order_book.asks.retain(|level| level.price != *price);
830 } else {
831 if let Some(level) = order_book.asks.iter_mut().find(|l| l.price == *price)
833 {
834 level.quantity = *quantity;
835 } else {
836 order_book.asks.push(PriceLevel::new(*price, *quantity));
838 order_book.asks.sort_by(|a, b| a.price.cmp(&b.price));
839 }
840 }
841 }
842 _ => {
843 warn!("Unknown side in L2 update: {side}");
844 }
845 }
846 }
847 }
848}
849
850#[async_trait]
851impl Exchange for CoinbaseWebsocketTrading {
852 fn venue(&self) -> Venue {
853 Venue::Coinbase
854 }
855
856 async fn place_order(
857 &self,
858 order: Order,
859 report_sender: Sender<ExecutionReport>,
860 ) -> Result<()> {
861 bail!("Order placement not supported via WebSocket - use REST API")
864 }
865
866 async fn cancel_order(
867 &self,
868 order_id: SmartString,
869 report_sender: Sender<ExecutionReport>,
870 ) -> Result<()> {
871 bail!("Order cancellation not supported via WebSocket - use REST API")
874 }
875
876 async fn modify_order(
877 &self,
878 order_id: SmartString,
879 new_price: Option<Decimal>,
880 new_quantity: Option<Decimal>,
881 report_sender: Sender<ExecutionReport>,
882 ) -> Result<()> {
883 bail!("Order modification not supported via WebSocket - use REST API")
885 }
886
887 async fn cancel_all_orders(
888 &self,
889 instrument_id: Option<InstrumentId>,
890 report_sender: Sender<ExecutionReport>,
891 ) -> Result<()> {
892 bail!("Mass order cancellation not supported via WebSocket - use REST API")
894 }
895
896 async fn get_order_status(&self, order_id: &str) -> Result<OrderStatus> {
897 if let Some(order_info) = self.orders.read().get(order_id) {
899 Ok(order_info.status)
900 } else {
901 bail!("Order not found in local tracking")
902 }
903 }
904
905 async fn connect(&self, report_sender: Sender<ExecutionReport>) -> Result<()> {
906 self.connect_internal(report_sender).await
908 }
909
910 async fn disconnect(&self) -> Result<()> {
911 self.is_connected.store(false, Ordering::Relaxed);
912 self.is_authenticated.store(false, Ordering::Relaxed);
913
914 if let Some(sink) = &mut *self.ws_sink.write().await {
916 let _ = sink.close().await;
917 }
918 *self.ws_sink.write().await = None;
919 *self.ws_stream.write().await = None;
920
921 if let Some(handle) = self.heartbeat_handle.write().take() {
923 handle.abort();
924 }
925
926 if let Some(handle) = self.message_handle.write().take() {
927 handle.abort();
928 }
929
930 self.orders.write().clear();
932 self.client_order_map.write().clear();
933 self.sequence_tracker.write().sequences.clear();
934
935 info!("Coinbase WebSocket disconnected");
936 Ok(())
937 }
938
939 async fn is_connected(&self) -> bool {
940 self.is_connected.load(Ordering::Relaxed) && self.is_authenticated.load(Ordering::Relaxed)
941 }
942
943 async fn get_instruments(&self) -> Result<SmallVec<[InstrumentId; 32]>> {
944 bail!("Instrument query not supported via WebSocket")
947 }
948
949 async fn send_fix_message(&self, message: Vec<u8>) -> Result<()> {
950 bail!("FIX protocol not supported on Coinbase WebSocket")
952 }
953
954 async fn receive_fix_message(&self) -> Result<Vec<u8>> {
955 bail!("FIX protocol not supported on Coinbase WebSocket")
957 }
958}