rusty_ems/exchanges/
upbit_websocket_trading.rs

1//! Upbit WebSocket Trading Implementation
2//!
3//! This module provides a high-performance WebSocket trading client for Upbit's private endpoints
4//! with JWT authentication, real-time order updates, and asset tracking.
5//!
6//! # Features
7//!
8//! - **JWT authentication**: Support for Upbit's JWT-based WebSocket authentication
9//! - **Private channels**: Real-time myOrder and myAsset data streams
10//! - **Order management**: Integration with REST API for order placement
11//! - **Asset tracking**: Real-time balance and position updates
12//! - **Connection health monitoring**: Real-time metrics and health status
13//! - **Automatic reconnection**: Configurable backoff strategy with exponential delays
14//! - **Zero-copy message processing**: Optimized for low-latency trading
15//!
16//! # Connection Management
17//!
18//! The client maintains connection health through:
19//! - WebSocket ping/pong frames sent every 2 minutes (Upbit recommendation)
20//! - Automatic reconnection on connection loss
21//! - JWT token refresh before expiration
22//! - Configurable timeout handling
23//!
24//! # Private Channels
25//!
26//! Supports Upbit's private WebSocket channels:
27//! - `myOrder`: Real-time order updates (creation/execution/cancellation)
28//! - `myAsset`: Real-time asset/balance updates
29//! - Event-driven updates only (no snapshot data)
30//!
31//! # Usage Example
32//!
33//! ```rust,no_run
34//! use rusty_common::auth::exchanges::upbit::{UpbitAuth, UpbitAuthConfig};
35//! use rusty_ems::exchanges::upbit_websocket_trading::UpbitWebSocketTrader;
36//! use std::sync::Arc;
37//!
38//! #[tokio::main]
39//! async fn main() -> Result<()> {
40//!     let config = UpbitAuthConfig::new("access_key".into(), "secret_key".into());
41//!     let auth = Arc::new(UpbitAuth::new(config));
42//!
43//!     let trader = UpbitWebSocketTrader::new(auth);
44//!     let (report_tx, report_rx) = flume::bounded(100);
45//!
46//!     // Connect and authenticate
47//!     trader.connect(report_tx).await?;
48//!
49//!     Ok(())
50//! }
51//! ```
52
53use crate::utils::time::timestamp_nanos;
54use anyhow::{Result, bail};
55use async_trait::async_trait;
56use flume::Sender;
57use futures::{SinkExt, StreamExt};
58use log::{debug, error, info, warn};
59use parking_lot::RwLock;
60use quanta::Clock;
61use rust_decimal::Decimal;
62use rusty_common::auth::ExchangeAuthentication;
63use rusty_common::auth::exchanges::upbit::UpbitAuth;
64use rusty_common::types::Exchange;
65use rusty_common::utils::id_generation;
66use rusty_common::websocket::{
67    Message, WebSocketConfig,
68    client::ConnectionState as WsConnectionState,
69    connector::{WebSocketConnector, WebSocketSink, WebSocketStream},
70    stats::new_shared_stats,
71};
72use rusty_model::{
73    enums::OrderStatus, instruments::InstrumentId, trading_order::Order, venues::Venue,
74};
75use simd_json;
76use simd_json::prelude::{ValueAsArray, ValueAsScalar, ValueObjectAccess};
77use smartstring::alias::String as SmartString;
78use std::sync::Arc;
79use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
80use std::time::Duration;
81use tokio::sync::RwLock as AsyncRwLock;
82use tokio::task::JoinHandle;
83use uuid::Uuid;
84
85use crate::execution_engine::ExecutionReport;
86
87/// Upbit private WebSocket API URL
88const UPBIT_PRIVATE_WS_URL: &str = "wss://api.upbit.com/websocket/v1/private";
89
90/// Ping interval for keeping WebSocket connection alive (2 minutes as recommended by Upbit)
91const PING_INTERVAL_SECONDS: u64 = 120;
92
93/// Pong timeout (10 seconds - if no pong received within this time, consider connection dead)
94const PONG_TIMEOUT_SECONDS: u64 = 10;
95
96/// Maximum message size allowed (10MB)
97const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
98
99/// Upbit-specific connection state for proper state machine tracking
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101#[repr(u8)]
102pub enum UpbitConnectionState {
103    /// WebSocket connection is not established
104    Disconnected = 0,
105    /// WebSocket connection is in progress
106    Connecting = 1,
107    /// WebSocket connection is established but not authenticated
108    Connected = 2,
109    /// Authentication process is in progress
110    Authenticating = 3,
111    /// Authentication completed successfully
112    Authenticated = 4,
113    /// Subscribing to private channels (myOrder, myAsset)
114    Subscribing = 5,
115    /// Successfully subscribed to all required channels
116    Subscribed = 6,
117    /// WebSocket disconnection is in progress
118    Disconnecting = 7,
119}
120
121impl From<u8> for UpbitConnectionState {
122    fn from(value: u8) -> Self {
123        match value {
124            0 => Self::Disconnected,
125            1 => Self::Connecting,
126            2 => Self::Connected,
127            3 => Self::Authenticating,
128            4 => Self::Authenticated,
129            5 => Self::Subscribing,
130            6 => Self::Subscribed,
131            7 => Self::Disconnecting,
132            _ => Self::Disconnected,
133        }
134    }
135}
136
137impl From<UpbitConnectionState> for WsConnectionState {
138    fn from(state: UpbitConnectionState) -> Self {
139        match state {
140            UpbitConnectionState::Disconnected => Self::Disconnected,
141            UpbitConnectionState::Connecting => Self::Connecting,
142            UpbitConnectionState::Connected => Self::Connected,
143            UpbitConnectionState::Authenticating => Self::Connecting,
144            UpbitConnectionState::Authenticated => Self::Connected,
145            UpbitConnectionState::Subscribing => Self::Connected,
146            UpbitConnectionState::Subscribed => Self::Connected,
147            UpbitConnectionState::Disconnecting => Self::Disconnected,
148        }
149    }
150}
151
152/// Connection health information
153#[derive(Debug, Clone)]
154pub struct ConnectionHealth {
155    /// Current connection state
156    pub state: UpbitConnectionState,
157    /// Whether the WebSocket connection is established
158    pub is_connected: bool,
159    /// Whether authentication is completed
160    pub is_authenticated: bool,
161    /// Whether subscribed to private channels
162    pub is_subscribed: bool,
163    /// Time elapsed since last ping was sent
164    pub time_since_last_ping: Option<Duration>,
165    /// Time elapsed since last pong was received
166    pub time_since_last_pong: Option<Duration>,
167    /// Time elapsed since authentication completed
168    pub time_since_auth: Option<Duration>,
169    /// Time remaining until JWT expires (not applicable for Upbit)
170    pub jwt_expires_in: Option<Duration>,
171    /// Total number of messages sent
172    pub messages_sent: u64,
173    /// Total number of messages received
174    pub messages_received: u64,
175    /// Number of reconnection attempts made
176    pub reconnection_attempts: u64,
177    /// Number of successful reconnections
178    pub successful_reconnections: u64,
179    /// Number of failed reconnections
180    pub failed_reconnections: u64,
181}
182
183/// Connection metrics for monitoring
184#[derive(Debug, Default)]
185struct ConnectionMetrics {
186    /// Number of ping frames sent
187    pings_sent: AtomicU64,
188    /// Number of pong frames received
189    pongs_received: AtomicU64,
190    /// Total number of messages sent
191    messages_sent: AtomicU64,
192    /// Total number of messages received
193    messages_received: AtomicU64,
194    /// Number of reconnection attempts made
195    reconnection_attempts: AtomicU64,
196    /// Number of successful reconnections
197    successful_reconnections: AtomicU64,
198    /// Number of failed reconnections
199    failed_reconnections: AtomicU64,
200}
201
202/// Task handles for lifecycle management
203#[derive(Debug, Default)]
204struct TaskHandles {
205    /// Handle for the ping task that sends periodic ping frames
206    ping_handle: Option<JoinHandle<()>>,
207    /// Handle for the message processing task that handles incoming messages
208    message_handler: Option<JoinHandle<()>>,
209}
210
211/// Upbit WebSocket Trading Client
212pub struct UpbitWebSocketTrader {
213    /// Authentication handler
214    auth: Arc<UpbitAuth>,
215
216    /// WebSocket connection for private data
217    ws_sink: Arc<AsyncRwLock<Option<WebSocketSink>>>,
218    ws_stream: Arc<AsyncRwLock<Option<WebSocketStream>>>,
219
220    /// Connection state machine
221    state: Arc<AtomicU8>,
222
223    /// High-precision clock (using monotonic time)
224    clock: Clock,
225
226    /// Last ping sent timestamp (nanoseconds, monotonic)
227    last_ping_time: Arc<AtomicU64>,
228
229    /// Last pong received timestamp (nanoseconds, monotonic)
230    last_pong_time: Arc<AtomicU64>,
231
232    /// Authentication completed timestamp (nanoseconds, monotonic)
233    auth_completed_time: Arc<AtomicU64>,
234
235    /// Task handles for lifecycle management
236    task_handles: Arc<AsyncRwLock<TaskHandles>>,
237
238    /// Connection metrics
239    metrics: Arc<ConnectionMetrics>,
240
241    /// Reconnection backoff state
242    reconnection_backoff_ms: Arc<AtomicU64>,
243}
244
245impl Clone for UpbitWebSocketTrader {
246    fn clone(&self) -> Self {
247        Self {
248            auth: self.auth.clone(),
249            ws_sink: self.ws_sink.clone(),
250            ws_stream: self.ws_stream.clone(),
251            state: self.state.clone(),
252            clock: self.clock.clone(),
253            last_ping_time: self.last_ping_time.clone(),
254            last_pong_time: self.last_pong_time.clone(),
255            auth_completed_time: self.auth_completed_time.clone(),
256            task_handles: self.task_handles.clone(),
257            metrics: self.metrics.clone(),
258            reconnection_backoff_ms: self.reconnection_backoff_ms.clone(),
259        }
260    }
261}
262
263impl UpbitWebSocketTrader {
264    /// Create a new Upbit WebSocket trading client
265    #[must_use]
266    pub fn new(auth: Arc<UpbitAuth>) -> Self {
267        Self {
268            auth,
269            ws_sink: Arc::new(AsyncRwLock::new(None)),
270            ws_stream: Arc::new(AsyncRwLock::new(None)),
271            state: Arc::new(AtomicU8::new(UpbitConnectionState::Disconnected as u8)),
272            clock: Clock::new(),
273            last_ping_time: Arc::new(AtomicU64::new(0)),
274            last_pong_time: Arc::new(AtomicU64::new(0)),
275            auth_completed_time: Arc::new(AtomicU64::new(0)),
276            task_handles: Arc::new(AsyncRwLock::new(TaskHandles::default())),
277            metrics: Arc::new(ConnectionMetrics::default()),
278            reconnection_backoff_ms: Arc::new(AtomicU64::new(1000)), // Start with 1 second
279        }
280    }
281
282    /// Get current connection state
283    fn get_state(&self) -> UpbitConnectionState {
284        UpbitConnectionState::from(self.state.load(Ordering::Acquire))
285    }
286
287    /// Set connection state
288    fn set_state(&self, new_state: UpbitConnectionState) {
289        let old_state = self.get_state();
290        self.state.store(new_state as u8, Ordering::Release);
291
292        if old_state != new_state {
293            debug!("Connection state transition: {old_state:?} -> {new_state:?}");
294        }
295    }
296
297    /// Check if WebSocket is connected
298    #[must_use]
299    pub fn is_connected(&self) -> bool {
300        matches!(
301            self.get_state(),
302            UpbitConnectionState::Connected
303                | UpbitConnectionState::Authenticating
304                | UpbitConnectionState::Authenticated
305                | UpbitConnectionState::Subscribing
306                | UpbitConnectionState::Subscribed
307        )
308    }
309
310    /// Check if WebSocket is authenticated
311    #[must_use]
312    pub fn is_authenticated(&self) -> bool {
313        matches!(
314            self.get_state(),
315            UpbitConnectionState::Authenticated
316                | UpbitConnectionState::Subscribing
317                | UpbitConnectionState::Subscribed
318        )
319    }
320
321    /// Check if subscribed to private channels
322    #[must_use]
323    pub fn is_subscribed(&self) -> bool {
324        self.get_state() == UpbitConnectionState::Subscribed
325    }
326
327    /// Get detailed connection health information
328    #[must_use]
329    pub fn get_connection_health(&self) -> ConnectionHealth {
330        let now = self.clock.raw();
331        let last_ping = self.last_ping_time.load(Ordering::Acquire);
332        let last_pong = self.last_pong_time.load(Ordering::Acquire);
333        let auth_time = self.auth_completed_time.load(Ordering::Acquire);
334
335        let time_since_last_ping = if last_ping > 0 {
336            Some(Duration::from_nanos(now.saturating_sub(last_ping)))
337        } else {
338            None
339        };
340
341        let time_since_last_pong = if last_pong > 0 {
342            Some(Duration::from_nanos(now.saturating_sub(last_pong)))
343        } else {
344            None
345        };
346
347        let time_since_auth = if auth_time > 0 {
348            Some(Duration::from_nanos(now.saturating_sub(auth_time)))
349        } else {
350            None
351        };
352
353        ConnectionHealth {
354            state: self.get_state(),
355            is_connected: self.is_connected(),
356            is_authenticated: self.is_authenticated(),
357            is_subscribed: self.is_subscribed(),
358            time_since_last_ping,
359            time_since_last_pong,
360            time_since_auth,
361            jwt_expires_in: None, // JWT doesn't have explicit expiration in Upbit
362            messages_sent: self.metrics.messages_sent.load(Ordering::Relaxed),
363            messages_received: self.metrics.messages_received.load(Ordering::Relaxed),
364            reconnection_attempts: self.metrics.reconnection_attempts.load(Ordering::Relaxed),
365            successful_reconnections: self
366                .metrics
367                .successful_reconnections
368                .load(Ordering::Relaxed),
369            failed_reconnections: self.metrics.failed_reconnections.load(Ordering::Relaxed),
370        }
371    }
372
373    /// Connect to Upbit private WebSocket API
374    pub async fn connect(&self, report_tx: Sender<ExecutionReport>) -> Result<()> {
375        info!("Connecting to Upbit private WebSocket: {UPBIT_PRIVATE_WS_URL}");
376
377        // Set connecting state
378        self.set_state(UpbitConnectionState::Connecting);
379
380        // Generate JWT token for WebSocket authentication
381        let jwt = self.auth.generate_websocket_authentication()?;
382
383        // Create WebSocket configuration with JWT in Authorization header
384        let config = WebSocketConfig::builder(Exchange::Upbit, UPBIT_PRIVATE_WS_URL.to_string())
385            .connect_timeout(Duration::from_secs(10))
386            .timeout(Duration::from_secs(30))
387            .ping_interval(Duration::from_secs(PING_INTERVAL_SECONDS))
388            .pong_timeout(Duration::from_secs(PONG_TIMEOUT_SECONDS))
389            .max_frame_size(65536) // 64KB
390            .max_message_size(MAX_MESSAGE_SIZE)
391            .header("Authorization".to_string(), format!("Bearer {jwt}"))
392            .build();
393
394        let stats = new_shared_stats();
395        let connection_state = Arc::new(RwLock::new(WsConnectionState::Disconnected));
396        let mut connector =
397            WebSocketConnector::new(config, stats.clone(), connection_state.clone());
398
399        let (sink, stream) = connector.connect_with_retry(UPBIT_PRIVATE_WS_URL).await?;
400
401        // Store connection references
402        *self.ws_sink.write().await = Some(sink);
403        *self.ws_stream.write().await = Some(stream);
404
405        // Update state
406        self.set_state(UpbitConnectionState::Connected);
407
408        // Subscribe to private channels (authentication is done via headers)
409        self.subscribe_to_private_channels().await?;
410
411        // Start background tasks
412        self.start_ping_task().await;
413        self.start_message_task(report_tx).await;
414
415        info!("Successfully connected and subscribed to Upbit private WebSocket");
416        Ok(())
417    }
418
419    /// Subscribe to Upbit private channels (myOrder and myAsset)
420    async fn subscribe_to_private_channels(&self) -> Result<()> {
421        debug!("Subscribing to Upbit private channels");
422
423        self.set_state(UpbitConnectionState::Subscribing);
424
425        // Create subscription for myOrder channel
426        let order_subscription = simd_json::json!([
427            {
428                "ticket": format!("order_{}", Uuid::new_v4())
429            },
430            {
431                "type": "myOrder"
432            },
433            {
434                "format": "DEFAULT" // Can also be "SIMPLE" for abbreviated field names
435            }
436        ]);
437
438        // Create subscription for myAsset channel
439        let asset_subscription = simd_json::json!([
440            {
441                "ticket": format!("asset_{}", Uuid::new_v4())
442            },
443            {
444                "type": "myAsset"
445            },
446            {
447                "format": "DEFAULT"
448            }
449        ]);
450
451        // Send order subscription (authentication already done via headers)
452        let order_message: SmartString = simd_json::to_string(&order_subscription)?.into();
453        self.send_message(Message::Text(order_message)).await?;
454
455        // Send asset subscription (authentication already done via headers)
456        let asset_message: SmartString = simd_json::to_string(&asset_subscription)?.into();
457        self.send_message(Message::Text(asset_message)).await?;
458
459        // Mark as subscribed
460        self.set_state(UpbitConnectionState::Subscribed);
461        self.auth_completed_time
462            .store(self.clock.raw(), Ordering::Release);
463
464        debug!("Upbit private channels subscription completed");
465        Ok(())
466    }
467
468    // Removed send_authenticated_message method - authentication is now handled via headers
469
470    /// Send a message through the WebSocket connection
471    async fn send_message(&self, message: Message) -> Result<()> {
472        if let Some(sink) = self.ws_sink.write().await.as_mut() {
473            let frame = message.to_frame_view();
474            sink.send(frame).await?;
475            self.metrics.messages_sent.fetch_add(1, Ordering::Relaxed);
476            Ok(())
477        } else {
478            bail!("WebSocket sink not available")
479        }
480    }
481
482    /// Start ping task to keep connection alive
483    async fn start_ping_task(&self) {
484        let sink = self.ws_sink.clone();
485        let last_ping = self.last_ping_time.clone();
486        let clock = self.clock.clone();
487        let metrics = self.metrics.clone();
488
489        let task = tokio::spawn(async move {
490            let mut interval = tokio::time::interval(Duration::from_secs(PING_INTERVAL_SECONDS));
491
492            loop {
493                interval.tick().await;
494
495                // Send ping frame
496                if let Some(sink) = sink.write().await.as_mut() {
497                    let ping_message = Message::Ping(Vec::new());
498                    let frame = ping_message.to_frame_view();
499                    if let Err(e) = sink.send(frame).await {
500                        error!("Failed to send ping: {e}");
501                        break;
502                    }
503
504                    last_ping.store(clock.raw(), Ordering::Release);
505                    metrics.pings_sent.fetch_add(1, Ordering::Relaxed);
506                } else {
507                    break;
508                }
509            }
510        });
511
512        self.task_handles.write().await.ping_handle = Some(task);
513    }
514
515    /// Start message processing task
516    async fn start_message_task(&self, report_tx: Sender<ExecutionReport>) {
517        let stream = self.ws_stream.clone();
518        let last_pong_time = self.last_pong_time.clone();
519        let clock = self.clock.clone();
520        let metrics = self.metrics.clone();
521
522        let task = tokio::spawn(async move {
523            loop {
524                // Take a lock scope to get the stream
525                let mut stream_guard = stream.write().await;
526                if let Some(ws_stream) = stream_guard.as_mut() {
527                    if let Some(frame) = ws_stream.next().await {
528                        drop(stream_guard); // Release the lock before processing
529
530                        let message = Message::from_frame_view(frame);
531                        metrics.messages_received.fetch_add(1, Ordering::Relaxed);
532
533                        match message {
534                            Message::Text(text) => {
535                                debug!("Received text message: {text}");
536                                if let Err(e) =
537                                    Self::process_message(&text, &report_tx, &clock).await
538                                {
539                                    error!("Failed to process message: {e}");
540                                }
541                            }
542                            Message::Pong(_) => {
543                                last_pong_time.store(clock.raw(), Ordering::Release);
544                                metrics.pongs_received.fetch_add(1, Ordering::Relaxed);
545                            }
546                            Message::Close(_) => {
547                                warn!("Received close message from Upbit WebSocket");
548                                break;
549                            }
550                            _ => {
551                                debug!("Received other message type: {message:?}");
552                            }
553                        }
554                    } else {
555                        break; // Stream ended
556                    }
557                } else {
558                    break; // No stream available
559                }
560            }
561        });
562
563        self.task_handles.write().await.message_handler = Some(task);
564    }
565
566    /// Process incoming WebSocket messages
567    async fn process_message(
568        text: &str,
569        report_tx: &Sender<ExecutionReport>,
570        clock: &Clock,
571    ) -> Result<()> {
572        // Parse JSON message
573        let json_str = text.to_string();
574        let mut json_bytes = json_str.into_bytes();
575        let json: simd_json::OwnedValue = simd_json::from_slice(&mut json_bytes)?;
576
577        // Determine message type
578        if let Some(msg_type) = json.get("type").and_then(|v| v.as_str()) {
579            match msg_type {
580                "myOrder" => {
581                    Self::handle_order_update(&json, report_tx, clock).await?;
582                }
583                "myAsset" => {
584                    Self::handle_asset_update(&json).await?;
585                }
586                _ => {
587                    debug!("Unknown message type: {msg_type}");
588                }
589            }
590        } else {
591            debug!("Message without type field: {text}");
592        }
593
594        Ok(())
595    }
596
597    /// Handle myOrder updates
598    async fn handle_order_update(
599        json: &simd_json::OwnedValue,
600        report_tx: &Sender<ExecutionReport>,
601        clock: &Clock,
602    ) -> Result<()> {
603        debug!("Processing order update: {json:?}");
604
605        let order_id = json
606            .get("uuid")
607            .and_then(|v| v.as_str())
608            .unwrap_or("unknown")
609            .into();
610
611        let market = json
612            .get("code")
613            .and_then(|v| v.as_str())
614            .unwrap_or("UNKNOWN");
615
616        let state = json
617            .get("state")
618            .and_then(|v| v.as_str())
619            .unwrap_or("unknown");
620
621        let status = Self::map_order_status(state);
622
623        let executed_volume = json
624            .get("executed_volume")
625            .and_then(simd_json::prelude::ValueAsScalar::as_f64)
626            .and_then(|f| Decimal::try_from(f).ok())
627            .unwrap_or(Decimal::ZERO);
628
629        let remaining_volume = json
630            .get("remaining_volume")
631            .and_then(simd_json::prelude::ValueAsScalar::as_f64)
632            .and_then(|f| Decimal::try_from(f).ok())
633            .unwrap_or(Decimal::ZERO);
634
635        let execution_price = json
636            .get("price")
637            .and_then(simd_json::prelude::ValueAsScalar::as_f64)
638            .and_then(|f| Decimal::try_from(f).ok());
639
640        let order_timestamp = json
641            .get("order_timestamp")
642            .and_then(simd_json::prelude::ValueAsScalar::as_u64)
643            .map_or(0, |ts| ts * 1_000_000);
644
645        let trade_timestamp = json
646            .get("trade_timestamp")
647            .and_then(simd_json::prelude::ValueAsScalar::as_u64)
648            .map_or(0, |ts| ts * 1_000_000);
649
650        let report = ExecutionReport {
651            id: id_generation::generate_exchange_order_id("upbit"),
652            order_id,
653            exchange_timestamp: if trade_timestamp > 0 {
654                trade_timestamp
655            } else if order_timestamp > 0 {
656                order_timestamp
657            } else {
658                // Fallback to system time if both exchange timestamps are unavailable
659                timestamp_nanos()
660            },
661            system_timestamp: timestamp_nanos(),
662            instrument_id: InstrumentId::new(market, Venue::Upbit),
663            status,
664            filled_quantity: executed_volume,
665            remaining_quantity: remaining_volume,
666            execution_price,
667            reject_reason: None,
668            exchange_execution_id: json
669                .get("trade_uuid")
670                .and_then(|v| v.as_str())
671                .map(std::convert::Into::into),
672            is_final: matches!(
673                status,
674                OrderStatus::Filled | OrderStatus::Cancelled | OrderStatus::Rejected
675            ),
676        };
677
678        if let Err(e) = report_tx.try_send(report) {
679            error!("Failed to send order update report: {e}");
680        }
681
682        Ok(())
683    }
684
685    /// Handle myAsset updates
686    async fn handle_asset_update(json: &simd_json::OwnedValue) -> Result<()> {
687        debug!("Processing asset update: {json:?}");
688
689        // For now, we'll just log asset updates
690        // In a full implementation, this could update a portfolio manager
691        if let Some(assets) = json.get("assets").and_then(|v| v.as_array()) {
692            for asset in assets {
693                if let (Some(currency), Some(balance), Some(locked)) = (
694                    asset.get("currency").and_then(|v| v.as_str()),
695                    asset
696                        .get("balance")
697                        .and_then(simd_json::prelude::ValueAsScalar::as_f64),
698                    asset
699                        .get("locked")
700                        .and_then(simd_json::prelude::ValueAsScalar::as_f64),
701                ) {
702                    info!("Asset update - {currency}: balance={balance}, locked={locked}");
703                }
704            }
705        }
706
707        Ok(())
708    }
709
710    /// Map Upbit order status to internal `OrderStatus`
711    fn map_order_status(status: &str) -> OrderStatus {
712        match status {
713            "wait" => OrderStatus::New,
714            "watch" => OrderStatus::New, // Reserved order waiting
715            "trade" => OrderStatus::PartiallyFilled, // Partial execution occurred
716            "done" => OrderStatus::Filled,
717            "cancel" => OrderStatus::Cancelled,
718            _ => OrderStatus::Rejected,
719        }
720    }
721
722    /// Disconnect from WebSocket
723    pub async fn disconnect(&self) -> Result<()> {
724        info!("Disconnecting from Upbit WebSocket");
725
726        self.set_state(UpbitConnectionState::Disconnecting);
727
728        // Cancel background tasks
729        {
730            let mut handles = self.task_handles.write().await;
731            if let Some(handle) = handles.ping_handle.take() {
732                handle.abort();
733            }
734            if let Some(handle) = handles.message_handler.take() {
735                handle.abort();
736            }
737        }
738
739        // Close WebSocket connections
740        *self.ws_sink.write().await = None;
741        *self.ws_stream.write().await = None;
742
743        self.set_state(UpbitConnectionState::Disconnected);
744
745        info!("Disconnected from Upbit WebSocket");
746        Ok(())
747    }
748}
749
750#[async_trait]
751impl crate::execution_engine::Exchange for UpbitWebSocketTrader {
752    fn venue(&self) -> Venue {
753        Venue::Upbit
754    }
755
756    async fn place_order(
757        &self,
758        _order: Order,
759        _report_sender: Sender<ExecutionReport>,
760    ) -> Result<()> {
761        // Upbit WebSocket doesn't support order placement - orders must be placed via REST API
762        // The existing UpbitExchange handles REST order placement
763        // This WebSocket client focuses on real-time updates only
764        bail!("Order placement via WebSocket not supported by Upbit - use REST API")
765    }
766
767    async fn cancel_order(
768        &self,
769        _order_id: SmartString,
770        _report_sender: Sender<ExecutionReport>,
771    ) -> Result<()> {
772        // Upbit WebSocket doesn't support order cancellation - must use REST API
773        bail!("Order cancellation via WebSocket not supported by Upbit - use REST API")
774    }
775
776    async fn modify_order(
777        &self,
778        _order_id: SmartString,
779        _new_price: Option<Decimal>,
780        _new_quantity: Option<Decimal>,
781        _report_sender: Sender<ExecutionReport>,
782    ) -> Result<()> {
783        bail!("Order modification not supported by Upbit")
784    }
785
786    async fn cancel_all_orders(
787        &self,
788        _instrument_id: Option<InstrumentId>,
789        _report_sender: Sender<ExecutionReport>,
790    ) -> Result<()> {
791        bail!("Cancel all orders via WebSocket not supported by Upbit - use REST API")
792    }
793
794    async fn get_order_status(&self, _order_id: &str) -> Result<OrderStatus> {
795        bail!("Order status query via WebSocket not supported by Upbit - use REST API")
796    }
797
798    async fn connect(&self, report_sender: Sender<ExecutionReport>) -> Result<()> {
799        // Now properly implemented with report_sender parameter
800        // Call the inherent method directly to avoid recursion
801        Self::connect(self, report_sender).await
802    }
803
804    async fn disconnect(&self) -> Result<()> {
805        self.disconnect().await
806    }
807
808    async fn is_connected(&self) -> bool {
809        self.is_connected()
810    }
811
812    async fn get_instruments(&self) -> Result<smallvec::SmallVec<[InstrumentId; 32]>> {
813        bail!("Instrument retrieval via WebSocket not supported by Upbit - use REST API")
814    }
815
816    async fn send_fix_message(&self, _message: Vec<u8>) -> Result<()> {
817        anyhow::bail!("FIX protocol not supported on Upbit WebSocket")
818    }
819
820    async fn receive_fix_message(&self) -> Result<Vec<u8>> {
821        anyhow::bail!("FIX protocol not supported on Upbit WebSocket")
822    }
823}
824
825#[cfg(test)]
826mod tests {
827    use super::*;
828    use rusty_common::auth::exchanges::upbit::UpbitAuthConfig;
829    use rusty_model::enums::OrderSide;
830    use simd_json::ValueBuilder;
831
832    fn create_test_auth() -> Arc<UpbitAuth> {
833        let config = UpbitAuthConfig::new("test_access_key".into(), "test_secret_key".into());
834        Arc::new(UpbitAuth::new(config))
835    }
836
837    #[test]
838    fn test_upbit_websocket_trader_creation() {
839        let auth = create_test_auth();
840        let trader = UpbitWebSocketTrader::new(auth);
841
842        assert_eq!(trader.get_state(), UpbitConnectionState::Disconnected);
843        assert!(!trader.is_connected());
844        assert!(!trader.is_authenticated());
845        assert!(!trader.is_subscribed());
846    }
847
848    #[test]
849    fn test_connection_state_transitions() {
850        let auth = create_test_auth();
851        let trader = UpbitWebSocketTrader::new(auth);
852
853        // Test state transitions
854        trader.set_state(UpbitConnectionState::Connecting);
855        assert_eq!(trader.get_state(), UpbitConnectionState::Connecting);
856
857        trader.set_state(UpbitConnectionState::Connected);
858        assert_eq!(trader.get_state(), UpbitConnectionState::Connected);
859        assert!(trader.is_connected());
860
861        trader.set_state(UpbitConnectionState::Subscribing);
862        assert_eq!(trader.get_state(), UpbitConnectionState::Subscribing);
863        assert!(trader.is_connected());
864        assert!(trader.is_authenticated());
865
866        trader.set_state(UpbitConnectionState::Subscribed);
867        assert_eq!(trader.get_state(), UpbitConnectionState::Subscribed);
868        assert!(trader.is_subscribed());
869    }
870
871    #[test]
872    fn test_order_status_mapping() {
873        assert_eq!(
874            UpbitWebSocketTrader::map_order_status("wait"),
875            OrderStatus::New
876        );
877        assert_eq!(
878            UpbitWebSocketTrader::map_order_status("watch"),
879            OrderStatus::New
880        );
881        assert_eq!(
882            UpbitWebSocketTrader::map_order_status("trade"),
883            OrderStatus::PartiallyFilled
884        );
885        assert_eq!(
886            UpbitWebSocketTrader::map_order_status("done"),
887            OrderStatus::Filled
888        );
889        assert_eq!(
890            UpbitWebSocketTrader::map_order_status("cancel"),
891            OrderStatus::Cancelled
892        );
893        assert_eq!(
894            UpbitWebSocketTrader::map_order_status("unknown"),
895            OrderStatus::Rejected
896        );
897    }
898
899    #[test]
900    fn test_connection_health() {
901        let auth = create_test_auth();
902        let trader = UpbitWebSocketTrader::new(auth);
903
904        let health = trader.get_connection_health();
905        assert_eq!(health.state, UpbitConnectionState::Disconnected);
906        assert!(!health.is_connected);
907        assert!(!health.is_authenticated);
908        assert!(!health.is_subscribed);
909        assert!(health.time_since_last_ping.is_none());
910        assert_eq!(health.messages_sent, 0);
911        assert_eq!(health.reconnection_attempts, 0);
912    }
913
914    #[tokio::test]
915    async fn test_websocket_only_operations() {
916        use crate::execution_engine::Exchange;
917        use rusty_model::types::OrderId;
918
919        let auth = create_test_auth();
920        let trader = UpbitWebSocketTrader::new(auth);
921        let (report_tx, _report_rx) = flume::bounded(100);
922
923        // Test that order operations are not supported via WebSocket
924        let order = Order {
925            id: OrderId::new(),
926            symbol: "KRW-BTC".into(),
927            side: OrderSide::Buy,
928            order_type: rusty_model::enums::OrderType::Limit,
929            quantity: Decimal::from(1),
930            price: Some(Decimal::from(50000)),
931            stop_price: None,
932            exchange_order_id: None,
933            venue: Venue::Upbit,
934            client_id: "test_client".into(),
935            creation_time_ns: 1000000000,
936            update_time_ns: 1000000000,
937            average_fill_price: None,
938            filled_quantity: Decimal::ZERO,
939            status: rusty_model::enums::OrderStatus::New,
940            time_in_force: rusty_model::enums::TimeInForce::GTC,
941            metadata: simd_json::value::owned::Value::null(),
942        };
943
944        assert!(trader.place_order(order, report_tx.clone()).await.is_err());
945        assert!(
946            trader
947                .cancel_order("test_id".into(), report_tx)
948                .await
949                .is_err()
950        );
951        assert!(trader.get_order_status("test_id").await.is_err());
952    }
953}