1use crate::utils::time::timestamp_nanos;
54use anyhow::{Result, bail};
55use async_trait::async_trait;
56use flume::Sender;
57use futures::{SinkExt, StreamExt};
58use log::{debug, error, info, warn};
59use parking_lot::RwLock;
60use quanta::Clock;
61use rust_decimal::Decimal;
62use rusty_common::auth::ExchangeAuthentication;
63use rusty_common::auth::exchanges::upbit::UpbitAuth;
64use rusty_common::types::Exchange;
65use rusty_common::utils::id_generation;
66use rusty_common::websocket::{
67 Message, WebSocketConfig,
68 client::ConnectionState as WsConnectionState,
69 connector::{WebSocketConnector, WebSocketSink, WebSocketStream},
70 stats::new_shared_stats,
71};
72use rusty_model::{
73 enums::OrderStatus, instruments::InstrumentId, trading_order::Order, venues::Venue,
74};
75use simd_json;
76use simd_json::prelude::{ValueAsArray, ValueAsScalar, ValueObjectAccess};
77use smartstring::alias::String as SmartString;
78use std::sync::Arc;
79use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
80use std::time::Duration;
81use tokio::sync::RwLock as AsyncRwLock;
82use tokio::task::JoinHandle;
83use uuid::Uuid;
84
85use crate::execution_engine::ExecutionReport;
86
87const UPBIT_PRIVATE_WS_URL: &str = "wss://api.upbit.com/websocket/v1/private";
89
90const PING_INTERVAL_SECONDS: u64 = 120;
92
93const PONG_TIMEOUT_SECONDS: u64 = 10;
95
96const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
98
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101#[repr(u8)]
102pub enum UpbitConnectionState {
103 Disconnected = 0,
105 Connecting = 1,
107 Connected = 2,
109 Authenticating = 3,
111 Authenticated = 4,
113 Subscribing = 5,
115 Subscribed = 6,
117 Disconnecting = 7,
119}
120
121impl From<u8> for UpbitConnectionState {
122 fn from(value: u8) -> Self {
123 match value {
124 0 => Self::Disconnected,
125 1 => Self::Connecting,
126 2 => Self::Connected,
127 3 => Self::Authenticating,
128 4 => Self::Authenticated,
129 5 => Self::Subscribing,
130 6 => Self::Subscribed,
131 7 => Self::Disconnecting,
132 _ => Self::Disconnected,
133 }
134 }
135}
136
137impl From<UpbitConnectionState> for WsConnectionState {
138 fn from(state: UpbitConnectionState) -> Self {
139 match state {
140 UpbitConnectionState::Disconnected => Self::Disconnected,
141 UpbitConnectionState::Connecting => Self::Connecting,
142 UpbitConnectionState::Connected => Self::Connected,
143 UpbitConnectionState::Authenticating => Self::Connecting,
144 UpbitConnectionState::Authenticated => Self::Connected,
145 UpbitConnectionState::Subscribing => Self::Connected,
146 UpbitConnectionState::Subscribed => Self::Connected,
147 UpbitConnectionState::Disconnecting => Self::Disconnected,
148 }
149 }
150}
151
152#[derive(Debug, Clone)]
154pub struct ConnectionHealth {
155 pub state: UpbitConnectionState,
157 pub is_connected: bool,
159 pub is_authenticated: bool,
161 pub is_subscribed: bool,
163 pub time_since_last_ping: Option<Duration>,
165 pub time_since_last_pong: Option<Duration>,
167 pub time_since_auth: Option<Duration>,
169 pub jwt_expires_in: Option<Duration>,
171 pub messages_sent: u64,
173 pub messages_received: u64,
175 pub reconnection_attempts: u64,
177 pub successful_reconnections: u64,
179 pub failed_reconnections: u64,
181}
182
183#[derive(Debug, Default)]
185struct ConnectionMetrics {
186 pings_sent: AtomicU64,
188 pongs_received: AtomicU64,
190 messages_sent: AtomicU64,
192 messages_received: AtomicU64,
194 reconnection_attempts: AtomicU64,
196 successful_reconnections: AtomicU64,
198 failed_reconnections: AtomicU64,
200}
201
202#[derive(Debug, Default)]
204struct TaskHandles {
205 ping_handle: Option<JoinHandle<()>>,
207 message_handler: Option<JoinHandle<()>>,
209}
210
211pub struct UpbitWebSocketTrader {
213 auth: Arc<UpbitAuth>,
215
216 ws_sink: Arc<AsyncRwLock<Option<WebSocketSink>>>,
218 ws_stream: Arc<AsyncRwLock<Option<WebSocketStream>>>,
219
220 state: Arc<AtomicU8>,
222
223 clock: Clock,
225
226 last_ping_time: Arc<AtomicU64>,
228
229 last_pong_time: Arc<AtomicU64>,
231
232 auth_completed_time: Arc<AtomicU64>,
234
235 task_handles: Arc<AsyncRwLock<TaskHandles>>,
237
238 metrics: Arc<ConnectionMetrics>,
240
241 reconnection_backoff_ms: Arc<AtomicU64>,
243}
244
245impl Clone for UpbitWebSocketTrader {
246 fn clone(&self) -> Self {
247 Self {
248 auth: self.auth.clone(),
249 ws_sink: self.ws_sink.clone(),
250 ws_stream: self.ws_stream.clone(),
251 state: self.state.clone(),
252 clock: self.clock.clone(),
253 last_ping_time: self.last_ping_time.clone(),
254 last_pong_time: self.last_pong_time.clone(),
255 auth_completed_time: self.auth_completed_time.clone(),
256 task_handles: self.task_handles.clone(),
257 metrics: self.metrics.clone(),
258 reconnection_backoff_ms: self.reconnection_backoff_ms.clone(),
259 }
260 }
261}
262
263impl UpbitWebSocketTrader {
264 #[must_use]
266 pub fn new(auth: Arc<UpbitAuth>) -> Self {
267 Self {
268 auth,
269 ws_sink: Arc::new(AsyncRwLock::new(None)),
270 ws_stream: Arc::new(AsyncRwLock::new(None)),
271 state: Arc::new(AtomicU8::new(UpbitConnectionState::Disconnected as u8)),
272 clock: Clock::new(),
273 last_ping_time: Arc::new(AtomicU64::new(0)),
274 last_pong_time: Arc::new(AtomicU64::new(0)),
275 auth_completed_time: Arc::new(AtomicU64::new(0)),
276 task_handles: Arc::new(AsyncRwLock::new(TaskHandles::default())),
277 metrics: Arc::new(ConnectionMetrics::default()),
278 reconnection_backoff_ms: Arc::new(AtomicU64::new(1000)), }
280 }
281
282 fn get_state(&self) -> UpbitConnectionState {
284 UpbitConnectionState::from(self.state.load(Ordering::Acquire))
285 }
286
287 fn set_state(&self, new_state: UpbitConnectionState) {
289 let old_state = self.get_state();
290 self.state.store(new_state as u8, Ordering::Release);
291
292 if old_state != new_state {
293 debug!("Connection state transition: {old_state:?} -> {new_state:?}");
294 }
295 }
296
297 #[must_use]
299 pub fn is_connected(&self) -> bool {
300 matches!(
301 self.get_state(),
302 UpbitConnectionState::Connected
303 | UpbitConnectionState::Authenticating
304 | UpbitConnectionState::Authenticated
305 | UpbitConnectionState::Subscribing
306 | UpbitConnectionState::Subscribed
307 )
308 }
309
310 #[must_use]
312 pub fn is_authenticated(&self) -> bool {
313 matches!(
314 self.get_state(),
315 UpbitConnectionState::Authenticated
316 | UpbitConnectionState::Subscribing
317 | UpbitConnectionState::Subscribed
318 )
319 }
320
321 #[must_use]
323 pub fn is_subscribed(&self) -> bool {
324 self.get_state() == UpbitConnectionState::Subscribed
325 }
326
327 #[must_use]
329 pub fn get_connection_health(&self) -> ConnectionHealth {
330 let now = self.clock.raw();
331 let last_ping = self.last_ping_time.load(Ordering::Acquire);
332 let last_pong = self.last_pong_time.load(Ordering::Acquire);
333 let auth_time = self.auth_completed_time.load(Ordering::Acquire);
334
335 let time_since_last_ping = if last_ping > 0 {
336 Some(Duration::from_nanos(now.saturating_sub(last_ping)))
337 } else {
338 None
339 };
340
341 let time_since_last_pong = if last_pong > 0 {
342 Some(Duration::from_nanos(now.saturating_sub(last_pong)))
343 } else {
344 None
345 };
346
347 let time_since_auth = if auth_time > 0 {
348 Some(Duration::from_nanos(now.saturating_sub(auth_time)))
349 } else {
350 None
351 };
352
353 ConnectionHealth {
354 state: self.get_state(),
355 is_connected: self.is_connected(),
356 is_authenticated: self.is_authenticated(),
357 is_subscribed: self.is_subscribed(),
358 time_since_last_ping,
359 time_since_last_pong,
360 time_since_auth,
361 jwt_expires_in: None, messages_sent: self.metrics.messages_sent.load(Ordering::Relaxed),
363 messages_received: self.metrics.messages_received.load(Ordering::Relaxed),
364 reconnection_attempts: self.metrics.reconnection_attempts.load(Ordering::Relaxed),
365 successful_reconnections: self
366 .metrics
367 .successful_reconnections
368 .load(Ordering::Relaxed),
369 failed_reconnections: self.metrics.failed_reconnections.load(Ordering::Relaxed),
370 }
371 }
372
373 pub async fn connect(&self, report_tx: Sender<ExecutionReport>) -> Result<()> {
375 info!("Connecting to Upbit private WebSocket: {UPBIT_PRIVATE_WS_URL}");
376
377 self.set_state(UpbitConnectionState::Connecting);
379
380 let jwt = self.auth.generate_websocket_authentication()?;
382
383 let config = WebSocketConfig::builder(Exchange::Upbit, UPBIT_PRIVATE_WS_URL.to_string())
385 .connect_timeout(Duration::from_secs(10))
386 .timeout(Duration::from_secs(30))
387 .ping_interval(Duration::from_secs(PING_INTERVAL_SECONDS))
388 .pong_timeout(Duration::from_secs(PONG_TIMEOUT_SECONDS))
389 .max_frame_size(65536) .max_message_size(MAX_MESSAGE_SIZE)
391 .header("Authorization".to_string(), format!("Bearer {jwt}"))
392 .build();
393
394 let stats = new_shared_stats();
395 let connection_state = Arc::new(RwLock::new(WsConnectionState::Disconnected));
396 let mut connector =
397 WebSocketConnector::new(config, stats.clone(), connection_state.clone());
398
399 let (sink, stream) = connector.connect_with_retry(UPBIT_PRIVATE_WS_URL).await?;
400
401 *self.ws_sink.write().await = Some(sink);
403 *self.ws_stream.write().await = Some(stream);
404
405 self.set_state(UpbitConnectionState::Connected);
407
408 self.subscribe_to_private_channels().await?;
410
411 self.start_ping_task().await;
413 self.start_message_task(report_tx).await;
414
415 info!("Successfully connected and subscribed to Upbit private WebSocket");
416 Ok(())
417 }
418
419 async fn subscribe_to_private_channels(&self) -> Result<()> {
421 debug!("Subscribing to Upbit private channels");
422
423 self.set_state(UpbitConnectionState::Subscribing);
424
425 let order_subscription = simd_json::json!([
427 {
428 "ticket": format!("order_{}", Uuid::new_v4())
429 },
430 {
431 "type": "myOrder"
432 },
433 {
434 "format": "DEFAULT" }
436 ]);
437
438 let asset_subscription = simd_json::json!([
440 {
441 "ticket": format!("asset_{}", Uuid::new_v4())
442 },
443 {
444 "type": "myAsset"
445 },
446 {
447 "format": "DEFAULT"
448 }
449 ]);
450
451 let order_message: SmartString = simd_json::to_string(&order_subscription)?.into();
453 self.send_message(Message::Text(order_message)).await?;
454
455 let asset_message: SmartString = simd_json::to_string(&asset_subscription)?.into();
457 self.send_message(Message::Text(asset_message)).await?;
458
459 self.set_state(UpbitConnectionState::Subscribed);
461 self.auth_completed_time
462 .store(self.clock.raw(), Ordering::Release);
463
464 debug!("Upbit private channels subscription completed");
465 Ok(())
466 }
467
468 async fn send_message(&self, message: Message) -> Result<()> {
472 if let Some(sink) = self.ws_sink.write().await.as_mut() {
473 let frame = message.to_frame_view();
474 sink.send(frame).await?;
475 self.metrics.messages_sent.fetch_add(1, Ordering::Relaxed);
476 Ok(())
477 } else {
478 bail!("WebSocket sink not available")
479 }
480 }
481
482 async fn start_ping_task(&self) {
484 let sink = self.ws_sink.clone();
485 let last_ping = self.last_ping_time.clone();
486 let clock = self.clock.clone();
487 let metrics = self.metrics.clone();
488
489 let task = tokio::spawn(async move {
490 let mut interval = tokio::time::interval(Duration::from_secs(PING_INTERVAL_SECONDS));
491
492 loop {
493 interval.tick().await;
494
495 if let Some(sink) = sink.write().await.as_mut() {
497 let ping_message = Message::Ping(Vec::new());
498 let frame = ping_message.to_frame_view();
499 if let Err(e) = sink.send(frame).await {
500 error!("Failed to send ping: {e}");
501 break;
502 }
503
504 last_ping.store(clock.raw(), Ordering::Release);
505 metrics.pings_sent.fetch_add(1, Ordering::Relaxed);
506 } else {
507 break;
508 }
509 }
510 });
511
512 self.task_handles.write().await.ping_handle = Some(task);
513 }
514
515 async fn start_message_task(&self, report_tx: Sender<ExecutionReport>) {
517 let stream = self.ws_stream.clone();
518 let last_pong_time = self.last_pong_time.clone();
519 let clock = self.clock.clone();
520 let metrics = self.metrics.clone();
521
522 let task = tokio::spawn(async move {
523 loop {
524 let mut stream_guard = stream.write().await;
526 if let Some(ws_stream) = stream_guard.as_mut() {
527 if let Some(frame) = ws_stream.next().await {
528 drop(stream_guard); let message = Message::from_frame_view(frame);
531 metrics.messages_received.fetch_add(1, Ordering::Relaxed);
532
533 match message {
534 Message::Text(text) => {
535 debug!("Received text message: {text}");
536 if let Err(e) =
537 Self::process_message(&text, &report_tx, &clock).await
538 {
539 error!("Failed to process message: {e}");
540 }
541 }
542 Message::Pong(_) => {
543 last_pong_time.store(clock.raw(), Ordering::Release);
544 metrics.pongs_received.fetch_add(1, Ordering::Relaxed);
545 }
546 Message::Close(_) => {
547 warn!("Received close message from Upbit WebSocket");
548 break;
549 }
550 _ => {
551 debug!("Received other message type: {message:?}");
552 }
553 }
554 } else {
555 break; }
557 } else {
558 break; }
560 }
561 });
562
563 self.task_handles.write().await.message_handler = Some(task);
564 }
565
566 async fn process_message(
568 text: &str,
569 report_tx: &Sender<ExecutionReport>,
570 clock: &Clock,
571 ) -> Result<()> {
572 let json_str = text.to_string();
574 let mut json_bytes = json_str.into_bytes();
575 let json: simd_json::OwnedValue = simd_json::from_slice(&mut json_bytes)?;
576
577 if let Some(msg_type) = json.get("type").and_then(|v| v.as_str()) {
579 match msg_type {
580 "myOrder" => {
581 Self::handle_order_update(&json, report_tx, clock).await?;
582 }
583 "myAsset" => {
584 Self::handle_asset_update(&json).await?;
585 }
586 _ => {
587 debug!("Unknown message type: {msg_type}");
588 }
589 }
590 } else {
591 debug!("Message without type field: {text}");
592 }
593
594 Ok(())
595 }
596
597 async fn handle_order_update(
599 json: &simd_json::OwnedValue,
600 report_tx: &Sender<ExecutionReport>,
601 clock: &Clock,
602 ) -> Result<()> {
603 debug!("Processing order update: {json:?}");
604
605 let order_id = json
606 .get("uuid")
607 .and_then(|v| v.as_str())
608 .unwrap_or("unknown")
609 .into();
610
611 let market = json
612 .get("code")
613 .and_then(|v| v.as_str())
614 .unwrap_or("UNKNOWN");
615
616 let state = json
617 .get("state")
618 .and_then(|v| v.as_str())
619 .unwrap_or("unknown");
620
621 let status = Self::map_order_status(state);
622
623 let executed_volume = json
624 .get("executed_volume")
625 .and_then(simd_json::prelude::ValueAsScalar::as_f64)
626 .and_then(|f| Decimal::try_from(f).ok())
627 .unwrap_or(Decimal::ZERO);
628
629 let remaining_volume = json
630 .get("remaining_volume")
631 .and_then(simd_json::prelude::ValueAsScalar::as_f64)
632 .and_then(|f| Decimal::try_from(f).ok())
633 .unwrap_or(Decimal::ZERO);
634
635 let execution_price = json
636 .get("price")
637 .and_then(simd_json::prelude::ValueAsScalar::as_f64)
638 .and_then(|f| Decimal::try_from(f).ok());
639
640 let order_timestamp = json
641 .get("order_timestamp")
642 .and_then(simd_json::prelude::ValueAsScalar::as_u64)
643 .map_or(0, |ts| ts * 1_000_000);
644
645 let trade_timestamp = json
646 .get("trade_timestamp")
647 .and_then(simd_json::prelude::ValueAsScalar::as_u64)
648 .map_or(0, |ts| ts * 1_000_000);
649
650 let report = ExecutionReport {
651 id: id_generation::generate_exchange_order_id("upbit"),
652 order_id,
653 exchange_timestamp: if trade_timestamp > 0 {
654 trade_timestamp
655 } else if order_timestamp > 0 {
656 order_timestamp
657 } else {
658 timestamp_nanos()
660 },
661 system_timestamp: timestamp_nanos(),
662 instrument_id: InstrumentId::new(market, Venue::Upbit),
663 status,
664 filled_quantity: executed_volume,
665 remaining_quantity: remaining_volume,
666 execution_price,
667 reject_reason: None,
668 exchange_execution_id: json
669 .get("trade_uuid")
670 .and_then(|v| v.as_str())
671 .map(std::convert::Into::into),
672 is_final: matches!(
673 status,
674 OrderStatus::Filled | OrderStatus::Cancelled | OrderStatus::Rejected
675 ),
676 };
677
678 if let Err(e) = report_tx.try_send(report) {
679 error!("Failed to send order update report: {e}");
680 }
681
682 Ok(())
683 }
684
685 async fn handle_asset_update(json: &simd_json::OwnedValue) -> Result<()> {
687 debug!("Processing asset update: {json:?}");
688
689 if let Some(assets) = json.get("assets").and_then(|v| v.as_array()) {
692 for asset in assets {
693 if let (Some(currency), Some(balance), Some(locked)) = (
694 asset.get("currency").and_then(|v| v.as_str()),
695 asset
696 .get("balance")
697 .and_then(simd_json::prelude::ValueAsScalar::as_f64),
698 asset
699 .get("locked")
700 .and_then(simd_json::prelude::ValueAsScalar::as_f64),
701 ) {
702 info!("Asset update - {currency}: balance={balance}, locked={locked}");
703 }
704 }
705 }
706
707 Ok(())
708 }
709
710 fn map_order_status(status: &str) -> OrderStatus {
712 match status {
713 "wait" => OrderStatus::New,
714 "watch" => OrderStatus::New, "trade" => OrderStatus::PartiallyFilled, "done" => OrderStatus::Filled,
717 "cancel" => OrderStatus::Cancelled,
718 _ => OrderStatus::Rejected,
719 }
720 }
721
722 pub async fn disconnect(&self) -> Result<()> {
724 info!("Disconnecting from Upbit WebSocket");
725
726 self.set_state(UpbitConnectionState::Disconnecting);
727
728 {
730 let mut handles = self.task_handles.write().await;
731 if let Some(handle) = handles.ping_handle.take() {
732 handle.abort();
733 }
734 if let Some(handle) = handles.message_handler.take() {
735 handle.abort();
736 }
737 }
738
739 *self.ws_sink.write().await = None;
741 *self.ws_stream.write().await = None;
742
743 self.set_state(UpbitConnectionState::Disconnected);
744
745 info!("Disconnected from Upbit WebSocket");
746 Ok(())
747 }
748}
749
750#[async_trait]
751impl crate::execution_engine::Exchange for UpbitWebSocketTrader {
752 fn venue(&self) -> Venue {
753 Venue::Upbit
754 }
755
756 async fn place_order(
757 &self,
758 _order: Order,
759 _report_sender: Sender<ExecutionReport>,
760 ) -> Result<()> {
761 bail!("Order placement via WebSocket not supported by Upbit - use REST API")
765 }
766
767 async fn cancel_order(
768 &self,
769 _order_id: SmartString,
770 _report_sender: Sender<ExecutionReport>,
771 ) -> Result<()> {
772 bail!("Order cancellation via WebSocket not supported by Upbit - use REST API")
774 }
775
776 async fn modify_order(
777 &self,
778 _order_id: SmartString,
779 _new_price: Option<Decimal>,
780 _new_quantity: Option<Decimal>,
781 _report_sender: Sender<ExecutionReport>,
782 ) -> Result<()> {
783 bail!("Order modification not supported by Upbit")
784 }
785
786 async fn cancel_all_orders(
787 &self,
788 _instrument_id: Option<InstrumentId>,
789 _report_sender: Sender<ExecutionReport>,
790 ) -> Result<()> {
791 bail!("Cancel all orders via WebSocket not supported by Upbit - use REST API")
792 }
793
794 async fn get_order_status(&self, _order_id: &str) -> Result<OrderStatus> {
795 bail!("Order status query via WebSocket not supported by Upbit - use REST API")
796 }
797
798 async fn connect(&self, report_sender: Sender<ExecutionReport>) -> Result<()> {
799 Self::connect(self, report_sender).await
802 }
803
804 async fn disconnect(&self) -> Result<()> {
805 self.disconnect().await
806 }
807
808 async fn is_connected(&self) -> bool {
809 self.is_connected()
810 }
811
812 async fn get_instruments(&self) -> Result<smallvec::SmallVec<[InstrumentId; 32]>> {
813 bail!("Instrument retrieval via WebSocket not supported by Upbit - use REST API")
814 }
815
816 async fn send_fix_message(&self, _message: Vec<u8>) -> Result<()> {
817 anyhow::bail!("FIX protocol not supported on Upbit WebSocket")
818 }
819
820 async fn receive_fix_message(&self) -> Result<Vec<u8>> {
821 anyhow::bail!("FIX protocol not supported on Upbit WebSocket")
822 }
823}
824
825#[cfg(test)]
826mod tests {
827 use super::*;
828 use rusty_common::auth::exchanges::upbit::UpbitAuthConfig;
829 use rusty_model::enums::OrderSide;
830 use simd_json::ValueBuilder;
831
832 fn create_test_auth() -> Arc<UpbitAuth> {
833 let config = UpbitAuthConfig::new("test_access_key".into(), "test_secret_key".into());
834 Arc::new(UpbitAuth::new(config))
835 }
836
837 #[test]
838 fn test_upbit_websocket_trader_creation() {
839 let auth = create_test_auth();
840 let trader = UpbitWebSocketTrader::new(auth);
841
842 assert_eq!(trader.get_state(), UpbitConnectionState::Disconnected);
843 assert!(!trader.is_connected());
844 assert!(!trader.is_authenticated());
845 assert!(!trader.is_subscribed());
846 }
847
848 #[test]
849 fn test_connection_state_transitions() {
850 let auth = create_test_auth();
851 let trader = UpbitWebSocketTrader::new(auth);
852
853 trader.set_state(UpbitConnectionState::Connecting);
855 assert_eq!(trader.get_state(), UpbitConnectionState::Connecting);
856
857 trader.set_state(UpbitConnectionState::Connected);
858 assert_eq!(trader.get_state(), UpbitConnectionState::Connected);
859 assert!(trader.is_connected());
860
861 trader.set_state(UpbitConnectionState::Subscribing);
862 assert_eq!(trader.get_state(), UpbitConnectionState::Subscribing);
863 assert!(trader.is_connected());
864 assert!(trader.is_authenticated());
865
866 trader.set_state(UpbitConnectionState::Subscribed);
867 assert_eq!(trader.get_state(), UpbitConnectionState::Subscribed);
868 assert!(trader.is_subscribed());
869 }
870
871 #[test]
872 fn test_order_status_mapping() {
873 assert_eq!(
874 UpbitWebSocketTrader::map_order_status("wait"),
875 OrderStatus::New
876 );
877 assert_eq!(
878 UpbitWebSocketTrader::map_order_status("watch"),
879 OrderStatus::New
880 );
881 assert_eq!(
882 UpbitWebSocketTrader::map_order_status("trade"),
883 OrderStatus::PartiallyFilled
884 );
885 assert_eq!(
886 UpbitWebSocketTrader::map_order_status("done"),
887 OrderStatus::Filled
888 );
889 assert_eq!(
890 UpbitWebSocketTrader::map_order_status("cancel"),
891 OrderStatus::Cancelled
892 );
893 assert_eq!(
894 UpbitWebSocketTrader::map_order_status("unknown"),
895 OrderStatus::Rejected
896 );
897 }
898
899 #[test]
900 fn test_connection_health() {
901 let auth = create_test_auth();
902 let trader = UpbitWebSocketTrader::new(auth);
903
904 let health = trader.get_connection_health();
905 assert_eq!(health.state, UpbitConnectionState::Disconnected);
906 assert!(!health.is_connected);
907 assert!(!health.is_authenticated);
908 assert!(!health.is_subscribed);
909 assert!(health.time_since_last_ping.is_none());
910 assert_eq!(health.messages_sent, 0);
911 assert_eq!(health.reconnection_attempts, 0);
912 }
913
914 #[tokio::test]
915 async fn test_websocket_only_operations() {
916 use crate::execution_engine::Exchange;
917 use rusty_model::types::OrderId;
918
919 let auth = create_test_auth();
920 let trader = UpbitWebSocketTrader::new(auth);
921 let (report_tx, _report_rx) = flume::bounded(100);
922
923 let order = Order {
925 id: OrderId::new(),
926 symbol: "KRW-BTC".into(),
927 side: OrderSide::Buy,
928 order_type: rusty_model::enums::OrderType::Limit,
929 quantity: Decimal::from(1),
930 price: Some(Decimal::from(50000)),
931 stop_price: None,
932 exchange_order_id: None,
933 venue: Venue::Upbit,
934 client_id: "test_client".into(),
935 creation_time_ns: 1000000000,
936 update_time_ns: 1000000000,
937 average_fill_price: None,
938 filled_quantity: Decimal::ZERO,
939 status: rusty_model::enums::OrderStatus::New,
940 time_in_force: rusty_model::enums::TimeInForce::GTC,
941 metadata: simd_json::value::owned::Value::null(),
942 };
943
944 assert!(trader.place_order(order, report_tx.clone()).await.is_err());
945 assert!(
946 trader
947 .cancel_order("test_id".into(), report_tx)
948 .await
949 .is_err()
950 );
951 assert!(trader.get_order_status("test_id").await.is_err());
952 }
953}