rusty_ems/exchanges/
bithumb_websocket_trading.rs

1//! Bithumb WebSocket Trading Implementation
2//!
3//! This module provides a high-performance WebSocket trading client for Bithumb's private endpoints
4//! with JWT authentication, real-time order updates, and asset tracking.
5//!
6//! # Features
7//!
8//! - **JWT authentication**: Support for Bithumb's JWT-based WebSocket authentication
9//! - **Private channels**: Real-time myOrder and myAsset data streams
10//! - **Order management**: Integration with REST API for order placement
11//! - **Asset tracking**: Real-time balance and position updates
12//! - **Connection health monitoring**: Real-time metrics and health status
13//! - **Automatic reconnection**: Configurable backoff strategy with exponential delays
14//! - **Zero-copy message processing**: Optimized for low-latency trading
15//!
16//! # Connection Management
17//!
18//! The client maintains connection health through:
19//! - WebSocket ping/pong frames sent every 2 minutes
20//! - Automatic reconnection on connection loss
21//! - JWT token refresh before expiration
22//! - Configurable timeout handling
23//!
24//! # Private Channels
25//!
26//! Supports Bithumb's private WebSocket channels:
27//! - `myOrder`: Real-time order updates (creation/execution/cancellation)
28//! - `myAsset`: Real-time asset/balance updates
29//! - Event-driven updates only (no snapshot data)
30//!
31//! # Usage Example
32//!
33//! ```rust,no_run
34//! use rusty_common::auth::exchanges::bithumb::BithumbAuth;
35//! use rusty_ems::exchanges::bithumb_websocket_trading::BithumbWebSocketTrader;
36//! use std::sync::Arc;
37//!
38//! #[tokio::main]
39//! async fn main() -> Result<()> {
40//!     let auth = Arc::new(BithumbAuth::new("access_key".into(), "secret_key".into()));
41//!
42//!     let trader = BithumbWebSocketTrader::new(auth);
43//!     let (report_tx, report_rx) = flume::bounded(100);
44//!
45//!     // Connect and authenticate
46//!     trader.connect(report_tx).await?;
47//!
48//!     Ok(())
49//! }
50//! ```
51
52use anyhow::{Result, bail};
53use async_trait::async_trait;
54use flume::Sender;
55use futures::{SinkExt, StreamExt};
56use log::{debug, error, info, warn};
57use parking_lot::RwLock;
58use quanta::Clock;
59use rust_decimal::Decimal;
60use rusty_common::auth::exchanges::bithumb::{BithumbAuth, header_keys};
61use rusty_common::types::Exchange;
62use rusty_common::utils::id_generation;
63use rusty_common::websocket::{
64    Message, WebSocketConfig,
65    client::ConnectionState as WsConnectionState,
66    connector::{WebSocketConnector, WebSocketSink, WebSocketStream},
67    stats::new_shared_stats,
68};
69use rusty_model::{
70    enums::OrderStatus, instruments::InstrumentId, trading_order::Order, venues::Venue,
71};
72
73use super::bithumb_errors::map_websocket_order_state;
74#[cfg(test)]
75use super::bithumb_errors::map_websocket_order_status;
76use simd_json;
77use simd_json::prelude::{ValueAsArray, ValueAsScalar, ValueObjectAccess};
78use smartstring::alias::String as SmartString;
79use std::sync::Arc;
80use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
81use std::time::Duration;
82use tokio::sync::RwLock as AsyncRwLock;
83use tokio::task::JoinHandle;
84
85use crate::execution_engine::ExecutionReport;
86
87/// Bithumb private WebSocket API URL
88const BITHUMB_PRIVATE_WS_URL: &str = "wss://ws-api.bithumb.com/websocket/v1/private";
89
90/// Ping interval for keeping WebSocket connection alive (2 minutes)
91const PING_INTERVAL_SECONDS: u64 = 120;
92
93/// Pong timeout (10 seconds - if no pong received within this time, consider connection dead)
94const PONG_TIMEOUT_SECONDS: u64 = 10;
95
96/// Maximum message size allowed (10MB)
97const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
98
99/// Bithumb-specific connection state for proper state machine tracking
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101#[repr(u8)]
102pub enum BithumbConnectionState {
103    /// WebSocket is not connected
104    Disconnected = 0,
105    /// WebSocket is attempting to connect
106    Connecting = 1,
107    /// WebSocket is connected but not authenticated
108    Connected = 2,
109    /// WebSocket is attempting to authenticate
110    Authenticating = 3,
111    /// WebSocket is connected and authenticated
112    Authenticated = 4,
113    /// WebSocket is disconnecting
114    Disconnecting = 5,
115}
116
117impl From<u8> for BithumbConnectionState {
118    fn from(value: u8) -> Self {
119        match value {
120            0 => Self::Disconnected,
121            1 => Self::Connecting,
122            2 => Self::Connected,
123            3 => Self::Authenticating,
124            4 => Self::Authenticated,
125            5 => Self::Disconnecting,
126            _ => Self::Disconnected,
127        }
128    }
129}
130
131/// Connection health information for Bithumb WebSocket
132#[derive(Debug, Clone)]
133pub struct BithumbConnectionHealth {
134    /// Current connection state
135    pub state: BithumbConnectionState,
136    /// Whether the connection is currently active
137    pub is_connected: bool,
138    /// Whether the connection is authenticated
139    pub is_authenticated: bool,
140    /// Time elapsed since the last ping was sent
141    pub time_since_last_ping: Option<Duration>,
142    /// Time elapsed since the last pong was received
143    pub time_since_last_pong: Option<Duration>,
144    /// Time elapsed since authentication was completed
145    pub time_since_auth: Option<Duration>,
146    /// Total number of messages sent
147    pub messages_sent: u64,
148    /// Total number of messages received
149    pub messages_received: u64,
150    /// Number of reconnection attempts made
151    pub reconnection_attempts: u64,
152}
153
154/// Connection metrics for monitoring
155#[derive(Debug, Default)]
156struct ConnectionMetrics {
157    pings_sent: AtomicU64,
158    pongs_received: AtomicU64,
159    messages_sent: AtomicU64,
160    messages_received: AtomicU64,
161    reconnection_attempts: AtomicU64,
162}
163
164/// Task handles for lifecycle management
165#[derive(Debug, Default)]
166struct TaskHandles {
167    ping_handle: Option<JoinHandle<()>>,
168    message_handler: Option<JoinHandle<()>>,
169}
170
171/// Bithumb WebSocket Trading Client
172///
173/// Provides real-time market data and private account updates via WebSocket.
174/// Trading operations are handled through the REST API integration.
175pub struct BithumbWebSocketTrader {
176    /// Authentication handler for Bithumb API
177    auth: Arc<BithumbAuth>,
178
179    /// WebSocket connection for private data
180    ws_sink: Arc<AsyncRwLock<Option<WebSocketSink>>>,
181    ws_stream: Arc<AsyncRwLock<Option<WebSocketStream>>>,
182
183    /// Connection state tracking
184    state: Arc<AtomicU8>,
185
186    /// High-precision clock for timestamp generation
187    clock: Clock,
188
189    /// Last ping sent timestamp (nanoseconds, monotonic)
190    last_ping_time: Arc<AtomicU64>,
191
192    /// Last pong received timestamp (nanoseconds, monotonic)
193    last_pong_time: Arc<AtomicU64>,
194
195    /// Authentication completed timestamp (nanoseconds, monotonic)
196    auth_completed_time: Arc<AtomicU64>,
197
198    /// Task handles for lifecycle management
199    task_handles: Arc<AsyncRwLock<TaskHandles>>,
200
201    /// Connection metrics
202    metrics: Arc<ConnectionMetrics>,
203
204    /// Connection statistics
205    messages_received: Arc<AtomicU64>,
206    messages_sent: Arc<AtomicU64>,
207    reconnect_count: Arc<AtomicU64>,
208}
209
210impl Clone for BithumbWebSocketTrader {
211    fn clone(&self) -> Self {
212        Self {
213            auth: self.auth.clone(),
214            ws_sink: self.ws_sink.clone(),
215            ws_stream: self.ws_stream.clone(),
216            state: self.state.clone(),
217            clock: self.clock.clone(),
218            last_ping_time: self.last_ping_time.clone(),
219            last_pong_time: self.last_pong_time.clone(),
220            auth_completed_time: self.auth_completed_time.clone(),
221            task_handles: self.task_handles.clone(),
222            metrics: self.metrics.clone(),
223            messages_received: self.messages_received.clone(),
224            messages_sent: self.messages_sent.clone(),
225            reconnect_count: self.reconnect_count.clone(),
226        }
227    }
228}
229
230impl BithumbWebSocketTrader {
231    /// Create a `SmartString` ID with the given prefix and a UUID suffix
232    ///
233    /// This helper reduces boilerplate when creating unique identifiers for
234    /// orders, tickets, and other entities that need UUID-based IDs.
235    ///
236    /// # Examples
237    ///
238    /// ```rust,no_run
239    /// let order_id = Self::create_smart_id("bithumb_order");
240    /// let ticket_id = Self::create_smart_id("bithumb");
241    /// ```
242    /// Create a new Bithumb WebSocket trader instance
243    #[must_use]
244    pub fn new(auth: Arc<BithumbAuth>) -> Self {
245        Self {
246            auth,
247            ws_sink: Arc::new(AsyncRwLock::new(None)),
248            ws_stream: Arc::new(AsyncRwLock::new(None)),
249            state: Arc::new(AtomicU8::new(BithumbConnectionState::Disconnected as u8)),
250            clock: Clock::new(),
251            last_ping_time: Arc::new(AtomicU64::new(0)),
252            last_pong_time: Arc::new(AtomicU64::new(0)),
253            auth_completed_time: Arc::new(AtomicU64::new(0)),
254            task_handles: Arc::new(AsyncRwLock::new(TaskHandles::default())),
255            metrics: Arc::new(ConnectionMetrics::default()),
256            messages_received: Arc::new(AtomicU64::new(0)),
257            messages_sent: Arc::new(AtomicU64::new(0)),
258            reconnect_count: Arc::new(AtomicU64::new(0)),
259        }
260    }
261
262    /// Get current connection state
263    #[must_use]
264    pub fn connection_state(&self) -> BithumbConnectionState {
265        BithumbConnectionState::from(self.state.load(Ordering::Acquire))
266    }
267
268    /// Set connection state
269    fn set_state(&self, new_state: BithumbConnectionState) {
270        let old_state = self.connection_state();
271        self.state.store(new_state as u8, Ordering::Release);
272
273        if old_state != new_state {
274            debug!("Bithumb connection state transition: {old_state:?} -> {new_state:?}");
275        }
276    }
277
278    /// Check if currently connected and authenticated
279    #[must_use]
280    pub fn is_connected(&self) -> bool {
281        matches!(
282            self.connection_state(),
283            BithumbConnectionState::Connected | BithumbConnectionState::Authenticated
284        )
285    }
286
287    /// Check if WebSocket is authenticated
288    #[must_use]
289    pub fn is_authenticated(&self) -> bool {
290        matches!(
291            self.connection_state(),
292            BithumbConnectionState::Authenticated
293        )
294    }
295
296    /// Get connection statistics
297    #[must_use]
298    pub fn get_stats(&self) -> (u64, u64, u64) {
299        (
300            self.messages_received.load(Ordering::Relaxed),
301            self.messages_sent.load(Ordering::Relaxed),
302            self.reconnect_count.load(Ordering::Relaxed),
303        )
304    }
305
306    /// Get detailed connection health information
307    #[must_use]
308    pub fn get_connection_health(&self) -> BithumbConnectionHealth {
309        let now = self.clock.raw();
310        let last_ping = self.last_ping_time.load(Ordering::Acquire);
311        let last_pong = self.last_pong_time.load(Ordering::Acquire);
312        let auth_time = self.auth_completed_time.load(Ordering::Acquire);
313
314        let time_since_last_ping = if last_ping > 0 {
315            Some(Duration::from_nanos(now.saturating_sub(last_ping)))
316        } else {
317            None
318        };
319
320        let time_since_last_pong = if last_pong > 0 {
321            Some(Duration::from_nanos(now.saturating_sub(last_pong)))
322        } else {
323            None
324        };
325
326        let time_since_auth = if auth_time > 0 {
327            Some(Duration::from_nanos(now.saturating_sub(auth_time)))
328        } else {
329            None
330        };
331
332        BithumbConnectionHealth {
333            state: self.connection_state(),
334            is_connected: self.is_connected(),
335            is_authenticated: self.is_authenticated(),
336            time_since_last_ping,
337            time_since_last_pong,
338            time_since_auth,
339            messages_sent: self.metrics.messages_sent.load(Ordering::Relaxed),
340            messages_received: self.metrics.messages_received.load(Ordering::Relaxed),
341            reconnection_attempts: self.metrics.reconnection_attempts.load(Ordering::Relaxed),
342        }
343    }
344
345    /// Connect to Bithumb private WebSocket endpoint
346    pub async fn connect(&self, report_tx: Sender<ExecutionReport>) -> Result<()> {
347        info!("Connecting to Bithumb private WebSocket: {BITHUMB_PRIVATE_WS_URL}");
348
349        // Set connecting state
350        self.set_state(BithumbConnectionState::Connecting);
351
352        // Generate JWT token for WebSocket headers
353        // Use generate_headers to get JWT token
354        let headers = self
355            .auth
356            .generate_headers("GET", "/websocket/v1/private", None)?;
357        let auth_header = headers
358            .get(&header_keys::authorization())
359            .ok_or_else(|| anyhow::anyhow!("No authorization header generated"))?
360            .clone();
361
362        // Create WebSocket configuration with JWT in headers
363        let config =
364            WebSocketConfig::builder(Exchange::Bithumb, BITHUMB_PRIVATE_WS_URL.to_string())
365                .connect_timeout(Duration::from_secs(10))
366                .timeout(Duration::from_secs(30))
367                .ping_interval(Duration::from_secs(PING_INTERVAL_SECONDS))
368                .pong_timeout(Duration::from_secs(PONG_TIMEOUT_SECONDS))
369                .max_frame_size(65536) // 64KB
370                .max_message_size(MAX_MESSAGE_SIZE)
371                .header(
372                    header_keys::AUTHORIZATION.to_string(),
373                    auth_header.to_string(),
374                )
375                .build();
376
377        let stats = new_shared_stats();
378        let connection_state = Arc::new(RwLock::new(WsConnectionState::Disconnected));
379        let mut connector =
380            WebSocketConnector::new(config, stats.clone(), connection_state.clone());
381
382        let (sink, stream) = connector.connect_with_retry(BITHUMB_PRIVATE_WS_URL).await?;
383
384        // Store connection references
385        *self.ws_sink.write().await = Some(sink);
386        *self.ws_stream.write().await = Some(stream);
387
388        // Update state
389        self.set_state(BithumbConnectionState::Connected);
390
391        // Subscribe to private channels (no authentication message needed)
392        self.subscribe_to_private_channels().await?;
393
394        // Mark as authenticated since JWT was in connection headers
395        self.set_state(BithumbConnectionState::Authenticated);
396        self.auth_completed_time
397            .store(self.clock.raw(), Ordering::Release);
398
399        // Start background tasks
400        self.start_ping_task().await;
401        self.start_message_task(report_tx).await;
402
403        info!("Successfully connected and authenticated to Bithumb private WebSocket");
404        Ok(())
405    }
406
407    /// Authenticate and subscribe to private channels
408    // Removed authenticate_and_subscribe method as authentication is handled via headers
409    /// Subscribe to Bithumb private channels (myOrder and myAsset)
410    async fn subscribe_to_private_channels(&self) -> Result<()> {
411        debug!("Subscribing to Bithumb private channels");
412
413        // Generate unique ticket ID for this session
414        let ticket_id = id_generation::generate_report_id_with_uuid("bithumb");
415
416        // Subscribe to myOrder channel for order updates (array format)
417        let order_subscription = simd_json::json!([
418            {
419                "ticket": ticket_id.clone()
420            },
421            {
422                "type": "myOrder",
423                "codes": []  // Empty array for all markets
424            },
425            {
426                "format": "DEFAULT"  // Use full field names
427            }
428        ]);
429
430        // Subscribe to myAsset channel for balance updates (array format)
431        let asset_subscription = simd_json::json!([
432            {
433                "ticket": ticket_id
434            },
435            {
436                "type": "myAsset"
437            },
438            {
439                "format": "DEFAULT"
440            }
441        ]);
442
443        // Send order subscription
444        let order_msg: SmartString = simd_json::to_string(&order_subscription)?.into();
445        self.send_message(Message::Text(order_msg)).await?;
446
447        // Send asset subscription
448        let asset_msg: SmartString = simd_json::to_string(&asset_subscription)?.into();
449        self.send_message(Message::Text(asset_msg)).await?;
450
451        debug!("Bithumb private channels subscription completed");
452        Ok(())
453    }
454
455    /// Send a message through the WebSocket connection
456    async fn send_message(&self, message: Message) -> Result<()> {
457        if let Some(sink) = self.ws_sink.write().await.as_mut() {
458            let frame = message.to_frame_view();
459            sink.send(frame).await?;
460            self.metrics.messages_sent.fetch_add(1, Ordering::Relaxed);
461            Ok(())
462        } else {
463            bail!("WebSocket sink not available")
464        }
465    }
466
467    /// Start ping task to keep connection alive
468    async fn start_ping_task(&self) {
469        let sink = self.ws_sink.clone();
470        let last_ping = self.last_ping_time.clone();
471        let clock = self.clock.clone();
472        let metrics = self.metrics.clone();
473
474        let task = tokio::spawn(async move {
475            let mut interval = tokio::time::interval(Duration::from_secs(PING_INTERVAL_SECONDS));
476
477            loop {
478                interval.tick().await;
479
480                // Send ping frame
481                if let Some(sink) = sink.write().await.as_mut() {
482                    let ping_message = Message::Ping(Vec::new());
483                    let frame = ping_message.to_frame_view();
484                    if let Err(e) = sink.send(frame).await {
485                        error!("Failed to send ping: {e}");
486                        break;
487                    }
488
489                    last_ping.store(clock.raw(), Ordering::Release);
490                    metrics.pings_sent.fetch_add(1, Ordering::Relaxed);
491                } else {
492                    break;
493                }
494            }
495        });
496
497        self.task_handles.write().await.ping_handle = Some(task);
498    }
499
500    /// Start message processing task
501    async fn start_message_task(&self, report_tx: Sender<ExecutionReport>) {
502        let stream = self.ws_stream.clone();
503        let last_pong_time = self.last_pong_time.clone();
504        let clock = self.clock.clone();
505        let metrics = self.metrics.clone();
506
507        let task = tokio::spawn(async move {
508            loop {
509                // Take a lock scope to get the stream
510                let mut stream_guard = stream.write().await;
511                if let Some(ws_stream) = stream_guard.as_mut() {
512                    if let Some(frame) = ws_stream.next().await {
513                        drop(stream_guard); // Release the lock before processing
514
515                        let message = Message::from_frame_view(frame);
516                        metrics.messages_received.fetch_add(1, Ordering::Relaxed);
517
518                        match message {
519                            Message::Text(text) => {
520                                debug!("Received text message: {text}");
521                                if let Err(e) =
522                                    Self::process_message(&text, &report_tx, &clock).await
523                                {
524                                    error!("Failed to process message: {e}");
525                                }
526                            }
527                            Message::Pong(_) => {
528                                last_pong_time.store(clock.raw(), Ordering::Release);
529                                metrics.pongs_received.fetch_add(1, Ordering::Relaxed);
530                            }
531                            Message::Close(_) => {
532                                warn!("Received close message from Bithumb WebSocket");
533                                break;
534                            }
535                            _ => {
536                                debug!("Received other message type: {message:?}");
537                            }
538                        }
539                    } else {
540                        break; // Stream ended
541                    }
542                } else {
543                    break; // No stream available
544                }
545            }
546        });
547
548        self.task_handles.write().await.message_handler = Some(task);
549    }
550
551    /// Process incoming WebSocket messages
552    async fn process_message(
553        text: &str,
554        report_tx: &Sender<ExecutionReport>,
555        clock: &Clock,
556    ) -> Result<()> {
557        // Parse JSON message using simd_json for high performance
558        // Use mutable bytes for in-place optimizations while maintaining safety
559        let mut json_bytes = text.as_bytes().to_vec();
560        let json: simd_json::OwnedValue = simd_json::from_slice(&mut json_bytes)?;
561
562        // Determine message type
563        if let Some(msg_type) = json.get("type").and_then(|v| v.as_str()) {
564            match msg_type {
565                "myOrder" => {
566                    Self::handle_order_update(&json, report_tx, clock).await?;
567                }
568                "myAsset" => {
569                    Self::handle_asset_update(&json).await?;
570                }
571                "auth" => {
572                    debug!("Authentication response: {json:?}");
573                }
574                "subscribe" => {
575                    debug!("Subscription response: {json:?}");
576                }
577                _ => {
578                    debug!("Unknown message type: {msg_type}");
579                }
580            }
581        } else {
582            debug!("Message without type field: {text}");
583        }
584
585        Ok(())
586    }
587
588    /// Handle myOrder updates
589    async fn handle_order_update(
590        json: &simd_json::OwnedValue,
591        report_tx: &Sender<ExecutionReport>,
592        clock: &Clock,
593    ) -> Result<()> {
594        debug!("Processing Bithumb order update: {json:?}");
595
596        // Use correct field names from Bithumb documentation
597        let order_id = json
598            .get("uuid")
599            .and_then(|v| v.as_str())
600            .unwrap_or("unknown")
601            .into();
602
603        let symbol = json
604            .get("code")
605            .and_then(|v| v.as_str())
606            .unwrap_or("UNKNOWN");
607
608        let state_str = json
609            .get("state")
610            .and_then(|v| v.as_str())
611            .unwrap_or("unknown");
612
613        let status = Self::map_bithumb_order_state(state_str);
614
615        let executed_volume = json
616            .get("executed_volume")
617            .and_then(simd_json::prelude::ValueAsScalar::as_f64)
618            .and_then(|f| Decimal::try_from(f).ok())
619            .unwrap_or(Decimal::ZERO);
620
621        let remaining_volume = json
622            .get("remaining_volume")
623            .and_then(simd_json::prelude::ValueAsScalar::as_f64)
624            .and_then(|f| Decimal::try_from(f).ok())
625            .unwrap_or(Decimal::ZERO);
626
627        let execution_price = json
628            .get("price")
629            .and_then(simd_json::prelude::ValueAsScalar::as_f64)
630            .and_then(|f| Decimal::try_from(f).ok());
631
632        let timestamp = json
633            .get("timestamp")
634            .and_then(simd_json::prelude::ValueAsScalar::as_u64)
635            .unwrap_or(0)
636            * 1_000_000; // Convert milliseconds to nanoseconds
637
638        let report = ExecutionReport {
639            id: id_generation::generate_exchange_order_id("bithumb"),
640            order_id,
641            exchange_timestamp: timestamp,
642            system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
643            instrument_id: InstrumentId::new(symbol, Venue::Bithumb),
644            status,
645            filled_quantity: executed_volume,
646            remaining_quantity: remaining_volume,
647            execution_price,
648            reject_reason: None,
649            exchange_execution_id: json
650                .get("trade_uuid")
651                .and_then(|v| v.as_str())
652                .map(std::convert::Into::into),
653            is_final: matches!(
654                status,
655                OrderStatus::Filled | OrderStatus::Cancelled | OrderStatus::Rejected
656            ),
657        };
658
659        if let Err(e) = report_tx.try_send(report) {
660            error!("Failed to send Bithumb order update report: {e}");
661        }
662
663        Ok(())
664    }
665
666    /// Handle myAsset updates
667    async fn handle_asset_update(json: &simd_json::OwnedValue) -> Result<()> {
668        debug!("Processing Bithumb asset update: {json:?}");
669
670        // Use correct field names from Bithumb documentation
671        // assets is directly at the root level, not under "data"
672        if let Some(assets) = json.get("assets").and_then(|v| v.as_array()) {
673            for (index, asset) in assets.iter().enumerate() {
674                match (
675                    asset.get("currency").and_then(|v| v.as_str()),
676                    asset.get("balance").and_then(|v| v.as_str()), // String in response
677                    asset.get("locked").and_then(|v| v.as_str()),  // String in response
678                ) {
679                    (Some(currency), Some(balance), Some(locked)) => {
680                        // Parse string values to f64 for logging
681                        let balance_f64 = match balance.parse::<f64>() {
682                            Ok(val) => val,
683                            Err(e) => {
684                                warn!("Failed to parse balance for {currency}: '{balance}' - {e}");
685                                0.0
686                            }
687                        };
688                        let locked_f64 = match locked.parse::<f64>() {
689                            Ok(val) => val,
690                            Err(e) => {
691                                warn!(
692                                    "Failed to parse locked amount for {currency}: '{locked}' - {e}"
693                                );
694                                0.0
695                            }
696                        };
697
698                        info!(
699                            "Bithumb asset update - {currency}: balance={balance_f64}, locked={locked_f64}"
700                        );
701                    }
702                    (currency, balance, locked) => {
703                        warn!(
704                            "Incomplete asset data at index {}: currency={:?}, balance={:?}, locked={:?}",
705                            index,
706                            currency.is_some(),
707                            balance.is_some(),
708                            locked.is_some()
709                        );
710                    }
711                }
712            }
713        } else {
714            warn!("No 'assets' array found in myAsset update or it's not an array");
715        }
716
717        Ok(())
718    }
719
720    /// Map Bithumb order state to internal `OrderStatus`
721    fn map_bithumb_order_state(state: &str) -> OrderStatus {
722        map_websocket_order_state(state).unwrap_or(OrderStatus::Unknown)
723    }
724
725    /// Disconnect from WebSocket
726    pub async fn disconnect(&self) -> Result<()> {
727        info!("Disconnecting from Bithumb WebSocket");
728
729        self.set_state(BithumbConnectionState::Disconnecting);
730
731        // Cancel background tasks
732        {
733            let mut handles = self.task_handles.write().await;
734            if let Some(handle) = handles.ping_handle.take() {
735                handle.abort();
736            }
737            if let Some(handle) = handles.message_handler.take() {
738                handle.abort();
739            }
740        }
741
742        // Close WebSocket connections
743        *self.ws_sink.write().await = None;
744        *self.ws_stream.write().await = None;
745
746        self.set_state(BithumbConnectionState::Disconnected);
747
748        info!("Disconnected from Bithumb WebSocket");
749        Ok(())
750    }
751}
752
753#[async_trait]
754impl crate::execution_engine::Exchange for BithumbWebSocketTrader {
755    fn venue(&self) -> Venue {
756        Venue::Bithumb
757    }
758
759    async fn place_order(
760        &self,
761        _order: Order,
762        _report_sender: Sender<ExecutionReport>,
763    ) -> Result<()> {
764        // Bithumb WebSocket doesn't support order placement - orders must be placed via REST API
765        // The existing BithumbExchange handles REST order placement
766        // This WebSocket client focuses on real-time updates only
767        bail!("Order placement via WebSocket not supported by Bithumb - use REST API")
768    }
769
770    async fn cancel_order(
771        &self,
772        _order_id: SmartString,
773        _report_sender: Sender<ExecutionReport>,
774    ) -> Result<()> {
775        // Bithumb WebSocket doesn't support order cancellation - must use REST API
776        bail!("Order cancellation via WebSocket not supported by Bithumb - use REST API")
777    }
778
779    async fn modify_order(
780        &self,
781        _order_id: SmartString,
782        _new_price: Option<Decimal>,
783        _new_quantity: Option<Decimal>,
784        _report_sender: Sender<ExecutionReport>,
785    ) -> Result<()> {
786        bail!("Order modification not supported by Bithumb")
787    }
788
789    async fn cancel_all_orders(
790        &self,
791        _instrument_id: Option<InstrumentId>,
792        _report_sender: Sender<ExecutionReport>,
793    ) -> Result<()> {
794        bail!("Cancel all orders via WebSocket not supported by Bithumb - use REST API")
795    }
796
797    async fn get_order_status(&self, _order_id: &str) -> Result<OrderStatus> {
798        bail!("Order status query via WebSocket not supported by Bithumb - use REST API")
799    }
800
801    async fn connect(&self, _report_sender: Sender<ExecutionReport>) -> Result<()> {
802        bail!("Generic connect not implemented - use connect(report_tx)")
803    }
804
805    async fn disconnect(&self) -> Result<()> {
806        self.disconnect().await
807    }
808
809    async fn is_connected(&self) -> bool {
810        self.is_connected()
811    }
812
813    async fn get_instruments(&self) -> Result<smallvec::SmallVec<[InstrumentId; 32]>> {
814        // Bithumb WebSocket doesn't provide instrument discovery - use REST API
815        bail!("Instrument discovery via WebSocket not supported by Bithumb - use REST API")
816    }
817
818    async fn send_fix_message(&self, _message: Vec<u8>) -> Result<()> {
819        bail!("FIX protocol not supported by Bithumb - use native WebSocket API")
820    }
821
822    async fn receive_fix_message(&self) -> Result<Vec<u8>> {
823        bail!("FIX protocol not supported by Bithumb - use native WebSocket API")
824    }
825}
826
827#[cfg(test)]
828mod tests {
829    use super::*;
830    use crate::execution_engine::Exchange;
831    use rust_decimal::Decimal;
832    use rusty_common::auth::exchanges::bithumb::BithumbAuth;
833    use rusty_model::enums::OrderSide;
834
835    #[test]
836    fn test_bithumb_websocket_trader_creation() {
837        let auth = Arc::new(BithumbAuth::new("test_key".into(), "test_secret".into()));
838        let trader = BithumbWebSocketTrader::new(auth);
839
840        assert_eq!(
841            trader.connection_state(),
842            BithumbConnectionState::Disconnected
843        );
844        assert!(!trader.is_connected());
845        assert_eq!(trader.venue(), Venue::Bithumb);
846    }
847
848    #[test]
849    fn test_connection_state_transitions() {
850        let auth = Arc::new(BithumbAuth::new("test_key".into(), "test_secret".into()));
851        let trader = BithumbWebSocketTrader::new(auth);
852
853        // Test state transitions
854        trader.set_state(BithumbConnectionState::Connecting);
855        assert_eq!(
856            trader.connection_state(),
857            BithumbConnectionState::Connecting
858        );
859
860        trader.set_state(BithumbConnectionState::Connected);
861        assert_eq!(trader.connection_state(), BithumbConnectionState::Connected);
862        assert!(trader.is_connected());
863
864        trader.set_state(BithumbConnectionState::Authenticated);
865        assert_eq!(
866            trader.connection_state(),
867            BithumbConnectionState::Authenticated
868        );
869        assert!(trader.is_connected());
870        assert!(trader.is_authenticated());
871    }
872
873    #[test]
874    fn test_connection_health() {
875        let auth = Arc::new(BithumbAuth::new("test_key".into(), "test_secret".into()));
876        let trader = BithumbWebSocketTrader::new(auth);
877
878        let health = trader.get_connection_health();
879        assert_eq!(health.state, BithumbConnectionState::Disconnected);
880        assert!(!health.is_connected);
881        assert!(!health.is_authenticated);
882        assert!(health.time_since_last_ping.is_none());
883        assert_eq!(health.messages_sent, 0);
884        assert_eq!(health.reconnection_attempts, 0);
885    }
886
887    #[test]
888    fn test_order_status_mapping() {
889        // Test WebSocket status mapping
890        assert_eq!(
891            map_websocket_order_status("placed").unwrap(),
892            OrderStatus::New
893        );
894        assert_eq!(
895            map_websocket_order_status("pending").unwrap(),
896            OrderStatus::Open
897        );
898        assert_eq!(
899            map_websocket_order_status("partial").unwrap(),
900            OrderStatus::PartiallyFilled
901        );
902        assert_eq!(
903            map_websocket_order_status("completed").unwrap(),
904            OrderStatus::Filled
905        );
906        assert_eq!(
907            map_websocket_order_status("cancelled").unwrap(),
908            OrderStatus::Cancelled
909        );
910        assert_eq!(
911            map_websocket_order_status("rejected").unwrap(),
912            OrderStatus::Rejected
913        );
914        assert!(map_websocket_order_status("unknown").is_err());
915
916        // Test WebSocket state mapping
917        assert_eq!(
918            map_websocket_order_state("wait").unwrap(),
919            OrderStatus::Open
920        );
921        assert_eq!(
922            map_websocket_order_state("trade").unwrap(),
923            OrderStatus::PartiallyFilled
924        );
925        assert_eq!(
926            map_websocket_order_state("done").unwrap(),
927            OrderStatus::Filled
928        );
929        assert_eq!(
930            map_websocket_order_state("cancel").unwrap(),
931            OrderStatus::Cancelled
932        );
933        assert!(map_websocket_order_state("unknown").is_err());
934    }
935
936    #[tokio::test]
937    async fn test_exchange_trait_methods_return_errors() {
938        let auth = Arc::new(BithumbAuth::new("test_key".into(), "test_secret".into()));
939        let trader = BithumbWebSocketTrader::new(auth);
940        let (report_tx, _report_rx) = flume::bounded(100);
941
942        // Test that trading operations return errors (WebSocket is read-only)
943        let dummy_order = Order::new(
944            Venue::Bithumb,
945            "BTC_KRW",
946            OrderSide::Buy,
947            rusty_model::enums::OrderType::Limit,
948            Decimal::ONE,
949            Some(Decimal::from(50000)),
950            rusty_model::types::ClientId::new("test_client"),
951        );
952
953        assert!(
954            trader
955                .place_order(dummy_order, report_tx.clone())
956                .await
957                .is_err()
958        );
959        assert!(
960            trader
961                .cancel_order("test".into(), report_tx.clone())
962                .await
963                .is_err()
964        );
965        assert!(
966            trader
967                .modify_order("test".into(), None, None, report_tx)
968                .await
969                .is_err()
970        );
971        assert!(trader.get_order_status("test").await.is_err());
972        assert!(trader.get_instruments().await.is_err());
973    }
974
975    #[test]
976    fn test_subscription_format() {
977        // Test that subscription messages use correct array format
978        let ticket = "test_ticket";
979
980        // Test myOrder subscription format
981        let order_sub = simd_json::json!([
982            {"ticket": ticket},
983            {"type": "myOrder", "codes": []},
984            {"format": "DEFAULT"}
985        ]);
986
987        let order_str = simd_json::to_string(&order_sub).unwrap();
988        assert!(order_str.contains(r"ticket"));
989        assert!(order_str.contains(r"myOrder"));
990        assert!(order_str.starts_with('[') && order_str.ends_with(']'));
991
992        // Test myAsset subscription format
993        let asset_sub = simd_json::json!([
994            {"ticket": ticket},
995            {"type": "myAsset"},
996            {"format": "DEFAULT"}
997        ]);
998
999        let asset_str = simd_json::to_string(&asset_sub).unwrap();
1000        assert!(asset_str.contains(r"ticket"));
1001        assert!(asset_str.contains(r"myAsset"));
1002        assert!(asset_str.starts_with('[') && asset_str.ends_with(']'));
1003    }
1004
1005    #[test]
1006    fn test_bithumb_order_state_mapping() {
1007        // Test Bithumb-specific order state mapping
1008        assert_eq!(
1009            BithumbWebSocketTrader::map_bithumb_order_state("wait"),
1010            OrderStatus::Open
1011        );
1012        assert_eq!(
1013            BithumbWebSocketTrader::map_bithumb_order_state("trade"),
1014            OrderStatus::PartiallyFilled
1015        );
1016        assert_eq!(
1017            BithumbWebSocketTrader::map_bithumb_order_state("done"),
1018            OrderStatus::Filled
1019        );
1020        assert_eq!(
1021            BithumbWebSocketTrader::map_bithumb_order_state("cancel"),
1022            OrderStatus::Cancelled
1023        );
1024        assert_eq!(
1025            BithumbWebSocketTrader::map_bithumb_order_state("unknown"),
1026            OrderStatus::Unknown
1027        );
1028    }
1029
1030    #[tokio::test]
1031    async fn test_websocket_headers_include_jwt() {
1032        let auth = Arc::new(BithumbAuth::new("test_key".into(), "test_secret".into()));
1033
1034        // Generate headers to verify JWT is created
1035        let headers = auth
1036            .generate_headers("GET", "/websocket/v1/private", None)
1037            .unwrap();
1038        let auth_key: SmartString = "Authorization".into();
1039        let auth_header = headers.get(&auth_key).unwrap();
1040
1041        // Verify header starts with Bearer
1042        assert!(auth_header.starts_with("Bearer "));
1043
1044        // Extract JWT and verify it has three parts (header.payload.signature)
1045        let jwt = auth_header.strip_prefix("Bearer ").unwrap();
1046        let parts: Vec<&str> = jwt.split('.').collect();
1047        assert_eq!(parts.len(), 3, "JWT should have 3 parts separated by dots");
1048    }
1049}