rusty_ems/exchanges/coinbase/
websocket_trading.rs

1//! Coinbase WebSocket Trading Implementation
2//!
3//! Provides WebSocket trading functionality for Coinbase exchange with support for:
4//! - Market data feeds (ticker, level2, matches)
5//! - Order management via user channel
6//! - Authentication for private channels
7//! - Heartbeat monitoring
8//! - Sequence number tracking
9//! - Automatic reconnection
10
11use anyhow::{Result, bail};
12use async_trait::async_trait;
13use flume::Sender;
14use futures::{SinkExt, StreamExt};
15use log::{debug, error, info, warn};
16use parking_lot::RwLock;
17use quanta::Clock;
18use rust_decimal::Decimal;
19use rusty_common::SmartString;
20use rusty_common::auth::exchanges::coinbase::CoinbaseAuth;
21use rusty_common::collections::FxHashMap;
22use rusty_common::time;
23use rusty_common::utils::id_generation;
24use rusty_common::websocket::connector::{WebSocketSink, WebSocketStream};
25use rusty_common::websocket::{Message, WebSocketConfig};
26use rusty_model::{
27    data::orderbook::{OrderBook, PriceLevel},
28    enums::{OrderSide, OrderStatus, OrderType},
29    instruments::InstrumentId,
30    trading_order::Order,
31    venues::Venue,
32};
33use simd_json::prelude::{ValueAsArray, ValueAsScalar, ValueObjectAccess};
34use simd_json::{json, value::owned::Value as JsonValue};
35use smallvec::SmallVec;
36use std::collections::VecDeque;
37use std::sync::Arc;
38use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
39use std::time::Duration;
40use tokio::sync::RwLock as AsyncRwLock;
41use tokio::task::JoinHandle;
42use tokio::time::interval;
43use yawc;
44
45// Removed unused imports
46use crate::execution_engine::{Exchange, ExecutionReport};
47
48/// Type alias for level2 update result containing product ID and price level changes
49type Level2UpdateResult = Result<(SmartString, Vec<(SmartString, Decimal, Decimal)>)>;
50
51/// WebSocket API URLs
52const COINBASE_WS_URL: &str = "wss://ws-feed.exchange.coinbase.com";
53const COINBASE_WS_DIRECT_URL: &str = "wss://ws-direct.exchange.coinbase.com";
54const COINBASE_WS_SANDBOX_URL: &str = "wss://ws-feed-public.sandbox.exchange.coinbase.com";
55const COINBASE_WS_DIRECT_SANDBOX_URL: &str = "wss://ws-direct.sandbox.exchange.coinbase.com";
56
57/// Connection management constants
58const HEARTBEAT_INTERVAL_SECONDS: u64 = 1;
59const RECONNECT_DELAY_SECONDS: u64 = 5;
60const MAX_RECONNECT_ATTEMPTS: u32 = 10;
61const SUBSCRIBE_TIMEOUT_SECONDS: u64 = 5;
62const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024; // 10MB
63
64/// Order tracking structure
65#[derive(Debug, Clone)]
66struct OrderInfo {
67    order_id: SmartString,
68    client_order_id: SmartString,
69    symbol: SmartString,
70    side: OrderSide,
71    order_type: OrderType,
72    price: Decimal,
73    quantity: Decimal,
74    remaining_quantity: Decimal,
75    executed_quantity: Decimal,
76    status: OrderStatus,
77    timestamp: u64,
78}
79
80/// Sequence tracking for reliable message ordering
81#[derive(Debug)]
82struct SequenceTracker {
83    /// Product ID -> last sequence number
84    sequences: FxHashMap<SmartString, u64>,
85    /// Missing sequences to be fetched via REST
86    missing_sequences: VecDeque<(SmartString, u64, u64)>,
87}
88
89impl SequenceTracker {
90    fn new() -> Self {
91        Self {
92            sequences: FxHashMap::default(),
93            missing_sequences: VecDeque::new(),
94        }
95    }
96
97    /// Update sequence number and detect gaps
98    fn update_sequence(&mut self, product_id: &str, new_sequence: u64) -> bool {
99        let product_id = SmartString::from(product_id);
100
101        if let Some(&last_sequence) = self.sequences.get(&product_id) {
102            if new_sequence <= last_sequence {
103                // Out of order or duplicate
104                return false;
105            }
106
107            if new_sequence > last_sequence + 1 {
108                // Gap detected
109                self.missing_sequences.push_back((
110                    product_id.clone(),
111                    last_sequence + 1,
112                    new_sequence - 1,
113                ));
114                warn!("Sequence gap detected for {product_id}: {last_sequence} -> {new_sequence}");
115            }
116        }
117
118        self.sequences.insert(product_id, new_sequence);
119        true
120    }
121}
122
123/// Coinbase WebSocket trading implementation
124pub struct CoinbaseWebsocketTrading {
125    /// Authentication handler
126    auth: Arc<CoinbaseAuth>,
127
128    /// WebSocket connection
129    ws_sink: Arc<AsyncRwLock<Option<WebSocketSink>>>,
130    ws_stream: Arc<AsyncRwLock<Option<WebSocketStream>>>,
131
132    /// Configuration
133    sandbox: bool,
134    use_direct: bool,
135
136    /// Connection state
137    is_connected: Arc<AtomicBool>,
138    is_authenticated: Arc<AtomicBool>,
139    last_heartbeat: Arc<AtomicU64>,
140    reconnect_attempts: Arc<AtomicU64>,
141
142    /// Subscriptions
143    subscribed_products: Arc<RwLock<Vec<SmartString>>>,
144    subscribed_channels: Arc<RwLock<Vec<SmartString>>>,
145
146    /// Order tracking
147    orders: Arc<RwLock<FxHashMap<SmartString, OrderInfo>>>,
148    client_order_map: Arc<RwLock<FxHashMap<SmartString, SmartString>>>,
149
150    /// Sequence tracking
151    sequence_tracker: Arc<RwLock<SequenceTracker>>,
152
153    /// Order books for subscribed products
154    order_books: Arc<RwLock<FxHashMap<SmartString, OrderBook>>>,
155
156    /// Monitoring
157    clock: Clock,
158
159    /// Task handles
160    heartbeat_handle: Arc<RwLock<Option<JoinHandle<()>>>>,
161    message_handle: Arc<RwLock<Option<JoinHandle<()>>>>,
162
163    /// Execution report channel
164    report_sender: Arc<RwLock<Option<Sender<ExecutionReport>>>>,
165}
166
167impl CoinbaseWebsocketTrading {
168    /// Create a new Coinbase WebSocket trading instance
169    pub fn new(auth: Arc<CoinbaseAuth>, sandbox: bool, use_direct: bool) -> Self {
170        let clock = Clock::new();
171
172        Self {
173            auth,
174            ws_sink: Arc::new(AsyncRwLock::new(None)),
175            ws_stream: Arc::new(AsyncRwLock::new(None)),
176            sandbox,
177            use_direct,
178            is_connected: Arc::new(AtomicBool::new(false)),
179            is_authenticated: Arc::new(AtomicBool::new(false)),
180            last_heartbeat: Arc::new(AtomicU64::new(0)),
181            reconnect_attempts: Arc::new(AtomicU64::new(0)),
182            subscribed_products: Arc::new(RwLock::new(Vec::new())),
183            subscribed_channels: Arc::new(RwLock::new(Vec::new())),
184            orders: Arc::new(RwLock::new(FxHashMap::default())),
185            client_order_map: Arc::new(RwLock::new(FxHashMap::default())),
186            sequence_tracker: Arc::new(RwLock::new(SequenceTracker::new())),
187            order_books: Arc::new(RwLock::new(FxHashMap::default())),
188            clock,
189            heartbeat_handle: Arc::new(RwLock::new(None)),
190            message_handle: Arc::new(RwLock::new(None)),
191            report_sender: Arc::new(RwLock::new(None)),
192        }
193    }
194
195    /// Get WebSocket URL based on configuration
196    const fn get_ws_url(&self) -> &'static str {
197        match (self.sandbox, self.use_direct) {
198            (true, true) => COINBASE_WS_DIRECT_SANDBOX_URL,
199            (true, false) => COINBASE_WS_SANDBOX_URL,
200            (false, true) => COINBASE_WS_DIRECT_URL,
201            (false, false) => COINBASE_WS_URL,
202        }
203    }
204
205    /// Subscribe to channels
206    async fn subscribe(&self, channels: Vec<&str>, product_ids: Vec<&str>) -> Result<()> {
207        // Use the proper WebSocket subscription generation
208        let ws_subscription = self.auth.generate_ws_subscription(&channels)?;
209
210        let mut subscribe_msg = json!({
211            "type": ws_subscription.message_type,
212            "product_ids": product_ids,
213            "channels": ws_subscription.channels
214        });
215
216        // Add authentication fields based on the key type
217        if let Some(jwt) = ws_subscription.jwt {
218            subscribe_msg["jwt"] = json!(jwt);
219        }
220        if let Some(api_key) = ws_subscription.api_key {
221            subscribe_msg["key"] = json!(api_key);
222        }
223        if let Some(timestamp) = ws_subscription.timestamp {
224            subscribe_msg["timestamp"] = json!(timestamp);
225        }
226        if let Some(signature) = ws_subscription.signature {
227            subscribe_msg["signature"] = json!(signature);
228        }
229
230        let subscribe_msg_str = subscribe_msg.to_string();
231
232        if let Some(sink) = &mut *self.ws_sink.write().await {
233            sink.send(Message::text(subscribe_msg_str).to_frame_view())
234                .await?;
235
236            // Update subscriptions
237            self.subscribed_products
238                .write()
239                .extend(product_ids.iter().map(|&p| SmartString::from(p)));
240            self.subscribed_channels
241                .write()
242                .extend(channels.iter().map(|&c| SmartString::from(c)));
243
244            debug!("Subscribed to channels: {channels:?} for products: {product_ids:?}");
245        } else {
246            bail!("WebSocket connection not established");
247        }
248
249        Ok(())
250    }
251
252    /// Connect and start message processing
253    async fn connect_internal(&self, report_sender: Sender<ExecutionReport>) -> Result<()> {
254        // Store the report sender
255        *self.report_sender.write() = Some(report_sender.clone());
256
257        let config = WebSocketConfig::new(
258            rusty_common::types::Exchange::Coinbase,
259            self.get_ws_url().to_string(),
260        );
261
262        // Connect to WebSocket
263        let ws_url = config.url.parse()?;
264        let ws = yawc::WebSocket::connect(ws_url).await?;
265        let (ws_sink, ws_stream) = ws.split();
266        *self.ws_sink.write().await = Some(ws_sink);
267        *self.ws_stream.write().await = Some(ws_stream);
268
269        self.is_connected.store(true, Ordering::Relaxed);
270
271        // Subscribe to required channels
272        let product_ids: Vec<String> = {
273            self.subscribed_products
274                .read()
275                .iter()
276                .map(std::string::ToString::to_string)
277                .collect()
278        };
279
280        let product_id_refs: Vec<&str> = product_ids
281            .iter()
282            .map(std::string::String::as_str)
283            .collect();
284
285        self.subscribe(
286            vec!["heartbeat", "ticker", "matches", "user", "level2"],
287            product_id_refs,
288        )
289        .await?;
290
291        // Start heartbeat monitoring
292        self.start_heartbeat_monitor();
293
294        // Start message processing
295        self.start_message_processing();
296
297        info!("Coinbase WebSocket connected to {}", self.get_ws_url());
298        Ok(())
299    }
300
301    /// Start heartbeat monitoring task
302    fn start_heartbeat_monitor(&self) {
303        let last_heartbeat = self.last_heartbeat.clone();
304        let clock = self.clock.clone();
305        let is_connected = self.is_connected.clone();
306
307        let handle = tokio::spawn(async move {
308            let mut interval = interval(Duration::from_secs(HEARTBEAT_INTERVAL_SECONDS * 2));
309
310            loop {
311                interval.tick().await;
312
313                if !is_connected.load(Ordering::Relaxed) {
314                    break;
315                }
316
317                let last_hb = last_heartbeat.load(Ordering::Relaxed);
318                let now = clock.raw() / 1_000_000_000; // Convert to seconds
319
320                if now - last_hb > HEARTBEAT_INTERVAL_SECONDS * 3 {
321                    warn!("Heartbeat timeout detected, connection may be dead");
322                    is_connected.store(false, Ordering::Relaxed);
323                    break;
324                }
325            }
326
327            debug!("Heartbeat monitor task terminated");
328        });
329
330        *self.heartbeat_handle.write() = Some(handle);
331    }
332
333    /// Start processing incoming messages
334    fn start_message_processing(&self) {
335        let ws_stream = self.ws_stream.clone();
336        let last_heartbeat = self.last_heartbeat.clone();
337        let clock = self.clock.clone();
338        let is_connected = self.is_connected.clone();
339        let is_authenticated = self.is_authenticated.clone();
340        let orders = self.orders.clone();
341        let client_order_map = self.client_order_map.clone();
342        let sequence_tracker = self.sequence_tracker.clone();
343        let report_sender = self.report_sender.clone();
344        let order_books = self.order_books.clone();
345
346        let handle = tokio::spawn(async move {
347            loop {
348                if !is_connected.load(Ordering::Relaxed) {
349                    break;
350                }
351
352                if let Some(stream) = &mut *ws_stream.write().await {
353                    if let Some(frame) = stream.next().await {
354                        let msg = Message::from_frame_view(frame);
355                        match msg {
356                            Message::Text(text) => {
357                                if let Err(e) = Self::process_message(
358                                    &text,
359                                    &last_heartbeat,
360                                    &clock,
361                                    &is_authenticated,
362                                    &orders,
363                                    &client_order_map,
364                                    &sequence_tracker,
365                                    &report_sender,
366                                    &order_books,
367                                )
368                                .await
369                                {
370                                    error!("Failed to process message: {e}");
371                                }
372                            }
373                            Message::Binary(_) => {
374                                warn!("Received unexpected binary message");
375                            }
376                            _ => {}
377                        }
378                    } else {
379                        warn!("WebSocket stream closed");
380                        is_connected.store(false, Ordering::Relaxed);
381                        break;
382                    }
383                }
384            }
385
386            debug!("Message processing task terminated");
387        });
388
389        *self.message_handle.write() = Some(handle);
390    }
391
392    /// Process incoming WebSocket message
393    #[allow(clippy::too_many_arguments)]
394    async fn process_message(
395        text: &str,
396        last_heartbeat: &Arc<AtomicU64>,
397        clock: &Clock,
398        is_authenticated: &Arc<AtomicBool>,
399        orders: &Arc<RwLock<FxHashMap<SmartString, OrderInfo>>>,
400        client_order_map: &Arc<RwLock<FxHashMap<SmartString, SmartString>>>,
401        sequence_tracker: &Arc<RwLock<SequenceTracker>>,
402        report_sender: &Arc<RwLock<Option<Sender<ExecutionReport>>>>,
403        order_books: &Arc<RwLock<FxHashMap<SmartString, OrderBook>>>,
404    ) -> Result<()> {
405        let mut text_bytes = text.as_bytes().to_vec();
406        let msg = simd_json::from_slice::<JsonValue>(&mut text_bytes)?;
407
408        let msg_type = msg
409            .get("type")
410            .and_then(|v| v.as_str())
411            .map(std::string::ToString::to_string);
412
413        if let Some(msg_type) = msg_type {
414            match msg_type.as_str() {
415                "subscriptions" => {
416                    info!("Subscription confirmed: {text}");
417                    is_authenticated.store(true, Ordering::Relaxed);
418                }
419                "heartbeat" => {
420                    last_heartbeat.store(clock.raw() / 1_000_000_000, Ordering::Relaxed);
421
422                    if let Some(sequence) = msg
423                        .get("sequence")
424                        .and_then(simd_json::prelude::ValueAsScalar::as_u64)
425                        && let Some(product_id) = msg.get("product_id").and_then(|v| v.as_str())
426                    {
427                        sequence_tracker
428                            .write()
429                            .update_sequence(product_id, sequence);
430                    }
431                }
432                "ticker" => {
433                    // Process ticker updates
434                    Self::process_ticker(msg).await?;
435                }
436                "snapshot" => {
437                    // Process level2 snapshot
438                    Self::process_snapshot_static(msg, order_books).await?;
439                }
440                "l2update" => {
441                    // Process level2 update
442                    if let Some(sequence) = msg
443                        .get("sequence")
444                        .and_then(simd_json::prelude::ValueAsScalar::as_u64)
445                        && let Some(product_id) = msg.get("product_id").and_then(|v| v.as_str())
446                        && sequence_tracker
447                            .write()
448                            .update_sequence(product_id, sequence)
449                    {
450                        Self::process_l2_update_static(msg, order_books).await?;
451                    }
452                }
453                "match" | "last_match" => {
454                    // Process trade matches
455                    if let Some(sequence) = msg
456                        .get("sequence")
457                        .and_then(simd_json::prelude::ValueAsScalar::as_u64)
458                        && let Some(product_id) = msg.get("product_id").and_then(|v| v.as_str())
459                    {
460                        sequence_tracker
461                            .write()
462                            .update_sequence(product_id, sequence);
463                    }
464                    Self::process_match(msg).await?;
465                }
466                "received" | "open" | "done" | "change" => {
467                    // Process order updates from user channel
468                    Self::process_order_update(
469                        msg,
470                        &msg_type,
471                        orders,
472                        client_order_map,
473                        report_sender,
474                    )
475                    .await?;
476                }
477                "error" => {
478                    let message = msg
479                        .get("message")
480                        .and_then(|v| v.as_str())
481                        .unwrap_or("Unknown error");
482                    error!("WebSocket error: {message}");
483                }
484                _ => {
485                    debug!("Unhandled message type: {msg_type}");
486                }
487            }
488        }
489
490        Ok(())
491    }
492
493    /// Process ticker message
494    async fn process_ticker(msg: JsonValue) -> Result<()> {
495        // TODO: Implement ticker processing if needed
496        debug!("Ticker update: {msg:?}");
497        Ok(())
498    }
499
500    /// Process level2 snapshot
501    async fn process_snapshot(&self, msg: JsonValue) -> Result<()> {
502        let order_book = Self::parse_level2_snapshot(&msg)?;
503        let product_id = order_book.symbol.clone();
504
505        // Store the order book
506        self.order_books
507            .write()
508            .insert(product_id.clone(), order_book);
509
510        debug!("Processed L2 snapshot for {product_id}");
511        Ok(())
512    }
513
514    /// Static version for `process_snapshot`
515    async fn process_snapshot_static(
516        msg: JsonValue,
517        order_books: &Arc<RwLock<FxHashMap<SmartString, OrderBook>>>,
518    ) -> Result<()> {
519        let order_book = Self::parse_level2_snapshot(&msg)?;
520        let product_id = order_book.symbol.clone();
521
522        // Store the order book
523        order_books.write().insert(product_id.clone(), order_book);
524
525        debug!("Processed L2 snapshot for {product_id}");
526        Ok(())
527    }
528
529    /// Process level2 update
530    async fn process_l2_update(&self, msg: JsonValue) -> Result<()> {
531        let (product_id, changes) = Self::parse_level2_update(&msg)?;
532
533        // Apply updates to the order book
534        if let Some(order_book) = self.order_books.write().get_mut(&product_id) {
535            Self::apply_level2_update(order_book, &changes);
536            debug!("Applied {} L2 updates to {}", changes.len(), product_id);
537        } else {
538            warn!("Received L2 update for unknown product: {product_id}");
539        }
540
541        Ok(())
542    }
543
544    /// Static version for `process_l2_update`
545    async fn process_l2_update_static(
546        msg: JsonValue,
547        order_books: &Arc<RwLock<FxHashMap<SmartString, OrderBook>>>,
548    ) -> Result<()> {
549        let (product_id, changes) = Self::parse_level2_update(&msg)?;
550
551        // Apply updates to the order book
552        if let Some(order_book) = order_books.write().get_mut(&product_id) {
553            Self::apply_level2_update(order_book, &changes);
554            debug!("Applied {} L2 updates to {}", changes.len(), product_id);
555        } else {
556            warn!("Received L2 update for unknown product: {product_id}");
557        }
558
559        Ok(())
560    }
561
562    /// Process match message
563    async fn process_match(msg: JsonValue) -> Result<()> {
564        // TODO: Implement match processing if needed
565        debug!("Match: {msg:?}");
566        Ok(())
567    }
568
569    /// Process order update from user channel
570    async fn process_order_update(
571        msg: JsonValue,
572        msg_type: &str,
573        orders: &Arc<RwLock<FxHashMap<SmartString, OrderInfo>>>,
574        client_order_map: &Arc<RwLock<FxHashMap<SmartString, SmartString>>>,
575        report_sender: &Arc<RwLock<Option<Sender<ExecutionReport>>>>,
576    ) -> Result<()> {
577        let order_id = msg.get("order_id").and_then(|v| v.as_str()).unwrap_or("");
578        let client_oid = msg.get("client_oid").and_then(|v| v.as_str()).unwrap_or("");
579        let product_id = msg.get("product_id").and_then(|v| v.as_str()).unwrap_or("");
580
581        // Map client order ID to order ID
582        if !client_oid.is_empty() && !order_id.is_empty() {
583            client_order_map
584                .write()
585                .insert(SmartString::from(client_oid), SmartString::from(order_id));
586        }
587
588        let (exec_type, order_status) = match msg_type {
589            "received" => ("New", OrderStatus::New),
590            "open" => ("New", OrderStatus::New),
591            "done" => {
592                let reason = msg.get("reason").and_then(|v| v.as_str()).unwrap_or("");
593                match reason {
594                    "filled" => ("Trade", OrderStatus::Filled),
595                    "canceled" => ("Canceled", OrderStatus::Cancelled),
596                    _ => ("Rejected", OrderStatus::Rejected),
597                }
598            }
599            "match" => ("Trade", OrderStatus::PartiallyFilled),
600            "change" => ("Replaced", OrderStatus::New),
601            _ => {
602                log::warn!("Unknown Coinbase WebSocket message type: {msg_type}");
603                ("Unknown", OrderStatus::Unknown)
604            }
605        };
606
607        // Parse quantities and prices for ExecutionReport
608        let executed_quantity = if msg_type == "match" {
609            msg.get("size")
610                .and_then(|v| v.as_str())
611                .and_then(|s| Decimal::from_str_exact(s).ok())
612                .unwrap_or(Decimal::ZERO)
613        } else {
614            Decimal::ZERO
615        };
616
617        let remaining_quantity = msg
618            .get("remaining_size")
619            .and_then(|v| v.as_str())
620            .and_then(|s| Decimal::from_str_exact(s).ok())
621            .unwrap_or(Decimal::ZERO);
622
623        let total_quantity = msg
624            .get("size")
625            .and_then(|v| v.as_str())
626            .and_then(|s| Decimal::from_str_exact(s).ok())
627            .unwrap_or(Decimal::ZERO);
628
629        let execution_price = if msg_type == "match" {
630            msg.get("price")
631                .and_then(|v| v.as_str())
632                .and_then(|s| Decimal::from_str_exact(s).ok())
633        } else {
634            None
635        };
636
637        let reject_reason = if msg_type == "done" {
638            let reason = msg.get("reason").and_then(|v| v.as_str()).unwrap_or("");
639            if reason == "canceled" || reason == "rejected" {
640                Some(reason.into())
641            } else {
642                None
643            }
644        } else {
645            None
646        };
647
648        // Create execution report with valid fields only
649        let report = ExecutionReport {
650            id: id_generation::generate_multi_part_id(
651                &["coinbase", client_oid, &uuid::Uuid::new_v4().to_string()],
652                '_',
653            ),
654            order_id: order_id.into(),
655            exchange_timestamp: time::get_epoch_timestamp_ns(),
656            system_timestamp: time::get_epoch_timestamp_ns(),
657            instrument_id: InstrumentId::new(product_id, Venue::Coinbase),
658            status: order_status,
659            filled_quantity: executed_quantity,
660            remaining_quantity,
661            execution_price,
662            reject_reason,
663            exchange_execution_id: Some(order_id.into()),
664            is_final: matches!(
665                order_status,
666                OrderStatus::Filled | OrderStatus::Cancelled | OrderStatus::Rejected
667            ),
668        };
669
670        // Send execution report
671        if let Some(sender) = &*report_sender.read()
672            && let Err(e) = sender.send(report)
673        {
674            error!("Failed to send execution report: {e}");
675        }
676
677        Ok(())
678    }
679
680    /// Place order via REST API (WebSocket doesn't support order placement)
681    async fn place_order_rest(&self, order: &Order) -> Result<SmartString> {
682        // Coinbase WebSocket doesn't support order placement
683        // Orders must be placed via REST API
684        // This is a placeholder - implement REST API call
685        bail!("Order placement via WebSocket not supported - use REST API")
686    }
687
688    /// Cancel order via REST API
689    async fn cancel_order_rest(&self, order_id: &str) -> Result<()> {
690        // Coinbase WebSocket doesn't support order cancellation
691        // Orders must be cancelled via REST API
692        bail!("Order cancellation via WebSocket not supported - use REST API")
693    }
694}
695
696// Public parsing functions for order book processing
697impl CoinbaseWebsocketTrading {
698    /// Parse level2 snapshot message into `OrderBook`
699    pub fn parse_level2_snapshot(msg: &JsonValue) -> Result<OrderBook> {
700        let product_id = msg
701            .get("product_id")
702            .and_then(|v| v.as_str())
703            .ok_or_else(|| anyhow::anyhow!("Missing product_id in snapshot"))?;
704
705        let bids = msg
706            .get("bids")
707            .and_then(|v| v.as_array())
708            .ok_or_else(|| anyhow::anyhow!("Missing bids in snapshot"))?;
709
710        let asks = msg
711            .get("asks")
712            .and_then(|v| v.as_array())
713            .ok_or_else(|| anyhow::anyhow!("Missing asks in snapshot"))?;
714
715        // Parse bids
716        let mut bid_levels = SmallVec::with_capacity(bids.len());
717        for bid in bids {
718            if let Some(arr) = bid.as_array()
719                && arr.len() >= 2
720            {
721                let price = arr[0]
722                    .as_str()
723                    .and_then(|s| s.parse::<Decimal>().ok())
724                    .ok_or_else(|| anyhow::anyhow!("Invalid bid price"))?;
725                let quantity = arr[1]
726                    .as_str()
727                    .and_then(|s| s.parse::<Decimal>().ok())
728                    .ok_or_else(|| anyhow::anyhow!("Invalid bid quantity"))?;
729                bid_levels.push(PriceLevel::new(price, quantity));
730            }
731        }
732
733        // Parse asks
734        let mut ask_levels = SmallVec::with_capacity(asks.len());
735        for ask in asks {
736            if let Some(arr) = ask.as_array()
737                && arr.len() >= 2
738            {
739                let price = arr[0]
740                    .as_str()
741                    .and_then(|s| s.parse::<Decimal>().ok())
742                    .ok_or_else(|| anyhow::anyhow!("Invalid ask price"))?;
743                let quantity = arr[1]
744                    .as_str()
745                    .and_then(|s| s.parse::<Decimal>().ok())
746                    .ok_or_else(|| anyhow::anyhow!("Invalid ask quantity"))?;
747                ask_levels.push(PriceLevel::new(price, quantity));
748            }
749        }
750
751        // Sort bids descending and asks ascending
752        bid_levels.sort_by(|a: &PriceLevel, b: &PriceLevel| b.price.cmp(&a.price));
753        ask_levels.sort_by(|a: &PriceLevel, b: &PriceLevel| a.price.cmp(&b.price));
754
755        Ok(OrderBook::new(
756            product_id,
757            time::get_epoch_timestamp_ns(),
758            time::get_epoch_timestamp_ns(),
759            bid_levels,
760            ask_levels,
761        ))
762    }
763
764    /// Parse level2 update message
765    pub fn parse_level2_update(msg: &JsonValue) -> Level2UpdateResult {
766        let product_id = msg
767            .get("product_id")
768            .and_then(|v| v.as_str())
769            .ok_or_else(|| anyhow::anyhow!("Missing product_id in update"))?;
770
771        let changes = msg
772            .get("changes")
773            .and_then(|v| v.as_array())
774            .ok_or_else(|| anyhow::anyhow!("Missing changes in update"))?;
775
776        let mut parsed_changes = Vec::with_capacity(changes.len());
777
778        for change in changes {
779            if let Some(arr) = change.as_array()
780                && arr.len() >= 3
781            {
782                let side = arr[0]
783                    .as_str()
784                    .ok_or_else(|| anyhow::anyhow!("Invalid side in change"))?;
785                let price = arr[1]
786                    .as_str()
787                    .and_then(|s| s.parse::<Decimal>().ok())
788                    .ok_or_else(|| anyhow::anyhow!("Invalid price in change"))?;
789                let quantity = arr[2]
790                    .as_str()
791                    .and_then(|s| s.parse::<Decimal>().ok())
792                    .ok_or_else(|| anyhow::anyhow!("Invalid quantity in change"))?;
793
794                parsed_changes.push((side.into(), price, quantity));
795            }
796        }
797
798        Ok((product_id.into(), parsed_changes))
799    }
800
801    /// Apply level2 updates to an order book
802    pub fn apply_level2_update(
803        order_book: &mut OrderBook,
804        changes: &[(SmartString, Decimal, Decimal)],
805    ) {
806        for (side, price, quantity) in changes {
807            match side.as_str() {
808                "buy" => {
809                    // Update bid side
810                    if quantity.is_zero() {
811                        // Remove level
812                        order_book.bids.retain(|level| level.price != *price);
813                    } else {
814                        // Add or update level
815                        if let Some(level) = order_book.bids.iter_mut().find(|l| l.price == *price)
816                        {
817                            level.quantity = *quantity;
818                        } else {
819                            // Insert new level and re-sort
820                            order_book.bids.push(PriceLevel::new(*price, *quantity));
821                            order_book.bids.sort_by(|a, b| b.price.cmp(&a.price));
822                        }
823                    }
824                }
825                "sell" => {
826                    // Update ask side
827                    if quantity.is_zero() {
828                        // Remove level
829                        order_book.asks.retain(|level| level.price != *price);
830                    } else {
831                        // Add or update level
832                        if let Some(level) = order_book.asks.iter_mut().find(|l| l.price == *price)
833                        {
834                            level.quantity = *quantity;
835                        } else {
836                            // Insert new level and re-sort
837                            order_book.asks.push(PriceLevel::new(*price, *quantity));
838                            order_book.asks.sort_by(|a, b| a.price.cmp(&b.price));
839                        }
840                    }
841                }
842                _ => {
843                    warn!("Unknown side in L2 update: {side}");
844                }
845            }
846        }
847    }
848}
849
850#[async_trait]
851impl Exchange for CoinbaseWebsocketTrading {
852    fn venue(&self) -> Venue {
853        Venue::Coinbase
854    }
855
856    async fn place_order(
857        &self,
858        order: Order,
859        report_sender: Sender<ExecutionReport>,
860    ) -> Result<()> {
861        // Coinbase WebSocket doesn't support order placement
862        // Must use REST API
863        bail!("Order placement not supported via WebSocket - use REST API")
864    }
865
866    async fn cancel_order(
867        &self,
868        order_id: SmartString,
869        report_sender: Sender<ExecutionReport>,
870    ) -> Result<()> {
871        // Coinbase WebSocket doesn't support order cancellation
872        // Must use REST API
873        bail!("Order cancellation not supported via WebSocket - use REST API")
874    }
875
876    async fn modify_order(
877        &self,
878        order_id: SmartString,
879        new_price: Option<Decimal>,
880        new_quantity: Option<Decimal>,
881        report_sender: Sender<ExecutionReport>,
882    ) -> Result<()> {
883        // Coinbase WebSocket doesn't support order modification
884        bail!("Order modification not supported via WebSocket - use REST API")
885    }
886
887    async fn cancel_all_orders(
888        &self,
889        instrument_id: Option<InstrumentId>,
890        report_sender: Sender<ExecutionReport>,
891    ) -> Result<()> {
892        // Coinbase WebSocket doesn't support mass cancellation
893        bail!("Mass order cancellation not supported via WebSocket - use REST API")
894    }
895
896    async fn get_order_status(&self, order_id: &str) -> Result<OrderStatus> {
897        // Check local order tracking
898        if let Some(order_info) = self.orders.read().get(order_id) {
899            Ok(order_info.status)
900        } else {
901            bail!("Order not found in local tracking")
902        }
903    }
904
905    async fn connect(&self, report_sender: Sender<ExecutionReport>) -> Result<()> {
906        // Use the proper report_sender channel instead of creating a dummy one
907        self.connect_internal(report_sender).await
908    }
909
910    async fn disconnect(&self) -> Result<()> {
911        self.is_connected.store(false, Ordering::Relaxed);
912        self.is_authenticated.store(false, Ordering::Relaxed);
913
914        // Close WebSocket connection
915        if let Some(sink) = &mut *self.ws_sink.write().await {
916            let _ = sink.close().await;
917        }
918        *self.ws_sink.write().await = None;
919        *self.ws_stream.write().await = None;
920
921        // Cancel background tasks
922        if let Some(handle) = self.heartbeat_handle.write().take() {
923            handle.abort();
924        }
925
926        if let Some(handle) = self.message_handle.write().take() {
927            handle.abort();
928        }
929
930        // Clear tracking data
931        self.orders.write().clear();
932        self.client_order_map.write().clear();
933        self.sequence_tracker.write().sequences.clear();
934
935        info!("Coinbase WebSocket disconnected");
936        Ok(())
937    }
938
939    async fn is_connected(&self) -> bool {
940        self.is_connected.load(Ordering::Relaxed) && self.is_authenticated.load(Ordering::Relaxed)
941    }
942
943    async fn get_instruments(&self) -> Result<SmallVec<[InstrumentId; 32]>> {
944        // WebSocket doesn't provide instrument query functionality
945        // You would need to use REST API or maintain a cached list
946        bail!("Instrument query not supported via WebSocket")
947    }
948
949    async fn send_fix_message(&self, message: Vec<u8>) -> Result<()> {
950        // Coinbase WebSocket doesn't support FIX protocol
951        bail!("FIX protocol not supported on Coinbase WebSocket")
952    }
953
954    async fn receive_fix_message(&self) -> Result<Vec<u8>> {
955        // Coinbase WebSocket doesn't support FIX protocol
956        bail!("FIX protocol not supported on Coinbase WebSocket")
957    }
958}