1use anyhow::{Result, bail};
53use async_trait::async_trait;
54use flume::Sender;
55use futures::{SinkExt, StreamExt};
56use log::{debug, error, info, warn};
57use parking_lot::RwLock;
58use quanta::Clock;
59use rust_decimal::Decimal;
60use rusty_common::auth::exchanges::bithumb::{BithumbAuth, header_keys};
61use rusty_common::types::Exchange;
62use rusty_common::utils::id_generation;
63use rusty_common::websocket::{
64 Message, WebSocketConfig,
65 client::ConnectionState as WsConnectionState,
66 connector::{WebSocketConnector, WebSocketSink, WebSocketStream},
67 stats::new_shared_stats,
68};
69use rusty_model::{
70 enums::OrderStatus, instruments::InstrumentId, trading_order::Order, venues::Venue,
71};
72
73use super::bithumb_errors::map_websocket_order_state;
74#[cfg(test)]
75use super::bithumb_errors::map_websocket_order_status;
76use simd_json;
77use simd_json::prelude::{ValueAsArray, ValueAsScalar, ValueObjectAccess};
78use smartstring::alias::String as SmartString;
79use std::sync::Arc;
80use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
81use std::time::Duration;
82use tokio::sync::RwLock as AsyncRwLock;
83use tokio::task::JoinHandle;
84
85use crate::execution_engine::ExecutionReport;
86
87const BITHUMB_PRIVATE_WS_URL: &str = "wss://ws-api.bithumb.com/websocket/v1/private";
89
90const PING_INTERVAL_SECONDS: u64 = 120;
92
93const PONG_TIMEOUT_SECONDS: u64 = 10;
95
96const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
98
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101#[repr(u8)]
102pub enum BithumbConnectionState {
103 Disconnected = 0,
105 Connecting = 1,
107 Connected = 2,
109 Authenticating = 3,
111 Authenticated = 4,
113 Disconnecting = 5,
115}
116
117impl From<u8> for BithumbConnectionState {
118 fn from(value: u8) -> Self {
119 match value {
120 0 => Self::Disconnected,
121 1 => Self::Connecting,
122 2 => Self::Connected,
123 3 => Self::Authenticating,
124 4 => Self::Authenticated,
125 5 => Self::Disconnecting,
126 _ => Self::Disconnected,
127 }
128 }
129}
130
131#[derive(Debug, Clone)]
133pub struct BithumbConnectionHealth {
134 pub state: BithumbConnectionState,
136 pub is_connected: bool,
138 pub is_authenticated: bool,
140 pub time_since_last_ping: Option<Duration>,
142 pub time_since_last_pong: Option<Duration>,
144 pub time_since_auth: Option<Duration>,
146 pub messages_sent: u64,
148 pub messages_received: u64,
150 pub reconnection_attempts: u64,
152}
153
154#[derive(Debug, Default)]
156struct ConnectionMetrics {
157 pings_sent: AtomicU64,
158 pongs_received: AtomicU64,
159 messages_sent: AtomicU64,
160 messages_received: AtomicU64,
161 reconnection_attempts: AtomicU64,
162}
163
164#[derive(Debug, Default)]
166struct TaskHandles {
167 ping_handle: Option<JoinHandle<()>>,
168 message_handler: Option<JoinHandle<()>>,
169}
170
171pub struct BithumbWebSocketTrader {
176 auth: Arc<BithumbAuth>,
178
179 ws_sink: Arc<AsyncRwLock<Option<WebSocketSink>>>,
181 ws_stream: Arc<AsyncRwLock<Option<WebSocketStream>>>,
182
183 state: Arc<AtomicU8>,
185
186 clock: Clock,
188
189 last_ping_time: Arc<AtomicU64>,
191
192 last_pong_time: Arc<AtomicU64>,
194
195 auth_completed_time: Arc<AtomicU64>,
197
198 task_handles: Arc<AsyncRwLock<TaskHandles>>,
200
201 metrics: Arc<ConnectionMetrics>,
203
204 messages_received: Arc<AtomicU64>,
206 messages_sent: Arc<AtomicU64>,
207 reconnect_count: Arc<AtomicU64>,
208}
209
210impl Clone for BithumbWebSocketTrader {
211 fn clone(&self) -> Self {
212 Self {
213 auth: self.auth.clone(),
214 ws_sink: self.ws_sink.clone(),
215 ws_stream: self.ws_stream.clone(),
216 state: self.state.clone(),
217 clock: self.clock.clone(),
218 last_ping_time: self.last_ping_time.clone(),
219 last_pong_time: self.last_pong_time.clone(),
220 auth_completed_time: self.auth_completed_time.clone(),
221 task_handles: self.task_handles.clone(),
222 metrics: self.metrics.clone(),
223 messages_received: self.messages_received.clone(),
224 messages_sent: self.messages_sent.clone(),
225 reconnect_count: self.reconnect_count.clone(),
226 }
227 }
228}
229
230impl BithumbWebSocketTrader {
231 #[must_use]
244 pub fn new(auth: Arc<BithumbAuth>) -> Self {
245 Self {
246 auth,
247 ws_sink: Arc::new(AsyncRwLock::new(None)),
248 ws_stream: Arc::new(AsyncRwLock::new(None)),
249 state: Arc::new(AtomicU8::new(BithumbConnectionState::Disconnected as u8)),
250 clock: Clock::new(),
251 last_ping_time: Arc::new(AtomicU64::new(0)),
252 last_pong_time: Arc::new(AtomicU64::new(0)),
253 auth_completed_time: Arc::new(AtomicU64::new(0)),
254 task_handles: Arc::new(AsyncRwLock::new(TaskHandles::default())),
255 metrics: Arc::new(ConnectionMetrics::default()),
256 messages_received: Arc::new(AtomicU64::new(0)),
257 messages_sent: Arc::new(AtomicU64::new(0)),
258 reconnect_count: Arc::new(AtomicU64::new(0)),
259 }
260 }
261
262 #[must_use]
264 pub fn connection_state(&self) -> BithumbConnectionState {
265 BithumbConnectionState::from(self.state.load(Ordering::Acquire))
266 }
267
268 fn set_state(&self, new_state: BithumbConnectionState) {
270 let old_state = self.connection_state();
271 self.state.store(new_state as u8, Ordering::Release);
272
273 if old_state != new_state {
274 debug!("Bithumb connection state transition: {old_state:?} -> {new_state:?}");
275 }
276 }
277
278 #[must_use]
280 pub fn is_connected(&self) -> bool {
281 matches!(
282 self.connection_state(),
283 BithumbConnectionState::Connected | BithumbConnectionState::Authenticated
284 )
285 }
286
287 #[must_use]
289 pub fn is_authenticated(&self) -> bool {
290 matches!(
291 self.connection_state(),
292 BithumbConnectionState::Authenticated
293 )
294 }
295
296 #[must_use]
298 pub fn get_stats(&self) -> (u64, u64, u64) {
299 (
300 self.messages_received.load(Ordering::Relaxed),
301 self.messages_sent.load(Ordering::Relaxed),
302 self.reconnect_count.load(Ordering::Relaxed),
303 )
304 }
305
306 #[must_use]
308 pub fn get_connection_health(&self) -> BithumbConnectionHealth {
309 let now = self.clock.raw();
310 let last_ping = self.last_ping_time.load(Ordering::Acquire);
311 let last_pong = self.last_pong_time.load(Ordering::Acquire);
312 let auth_time = self.auth_completed_time.load(Ordering::Acquire);
313
314 let time_since_last_ping = if last_ping > 0 {
315 Some(Duration::from_nanos(now.saturating_sub(last_ping)))
316 } else {
317 None
318 };
319
320 let time_since_last_pong = if last_pong > 0 {
321 Some(Duration::from_nanos(now.saturating_sub(last_pong)))
322 } else {
323 None
324 };
325
326 let time_since_auth = if auth_time > 0 {
327 Some(Duration::from_nanos(now.saturating_sub(auth_time)))
328 } else {
329 None
330 };
331
332 BithumbConnectionHealth {
333 state: self.connection_state(),
334 is_connected: self.is_connected(),
335 is_authenticated: self.is_authenticated(),
336 time_since_last_ping,
337 time_since_last_pong,
338 time_since_auth,
339 messages_sent: self.metrics.messages_sent.load(Ordering::Relaxed),
340 messages_received: self.metrics.messages_received.load(Ordering::Relaxed),
341 reconnection_attempts: self.metrics.reconnection_attempts.load(Ordering::Relaxed),
342 }
343 }
344
345 pub async fn connect(&self, report_tx: Sender<ExecutionReport>) -> Result<()> {
347 info!("Connecting to Bithumb private WebSocket: {BITHUMB_PRIVATE_WS_URL}");
348
349 self.set_state(BithumbConnectionState::Connecting);
351
352 let headers = self
355 .auth
356 .generate_headers("GET", "/websocket/v1/private", None)?;
357 let auth_header = headers
358 .get(&header_keys::authorization())
359 .ok_or_else(|| anyhow::anyhow!("No authorization header generated"))?
360 .clone();
361
362 let config =
364 WebSocketConfig::builder(Exchange::Bithumb, BITHUMB_PRIVATE_WS_URL.to_string())
365 .connect_timeout(Duration::from_secs(10))
366 .timeout(Duration::from_secs(30))
367 .ping_interval(Duration::from_secs(PING_INTERVAL_SECONDS))
368 .pong_timeout(Duration::from_secs(PONG_TIMEOUT_SECONDS))
369 .max_frame_size(65536) .max_message_size(MAX_MESSAGE_SIZE)
371 .header(
372 header_keys::AUTHORIZATION.to_string(),
373 auth_header.to_string(),
374 )
375 .build();
376
377 let stats = new_shared_stats();
378 let connection_state = Arc::new(RwLock::new(WsConnectionState::Disconnected));
379 let mut connector =
380 WebSocketConnector::new(config, stats.clone(), connection_state.clone());
381
382 let (sink, stream) = connector.connect_with_retry(BITHUMB_PRIVATE_WS_URL).await?;
383
384 *self.ws_sink.write().await = Some(sink);
386 *self.ws_stream.write().await = Some(stream);
387
388 self.set_state(BithumbConnectionState::Connected);
390
391 self.subscribe_to_private_channels().await?;
393
394 self.set_state(BithumbConnectionState::Authenticated);
396 self.auth_completed_time
397 .store(self.clock.raw(), Ordering::Release);
398
399 self.start_ping_task().await;
401 self.start_message_task(report_tx).await;
402
403 info!("Successfully connected and authenticated to Bithumb private WebSocket");
404 Ok(())
405 }
406
407 async fn subscribe_to_private_channels(&self) -> Result<()> {
411 debug!("Subscribing to Bithumb private channels");
412
413 let ticket_id = id_generation::generate_report_id_with_uuid("bithumb");
415
416 let order_subscription = simd_json::json!([
418 {
419 "ticket": ticket_id.clone()
420 },
421 {
422 "type": "myOrder",
423 "codes": [] },
425 {
426 "format": "DEFAULT" }
428 ]);
429
430 let asset_subscription = simd_json::json!([
432 {
433 "ticket": ticket_id
434 },
435 {
436 "type": "myAsset"
437 },
438 {
439 "format": "DEFAULT"
440 }
441 ]);
442
443 let order_msg: SmartString = simd_json::to_string(&order_subscription)?.into();
445 self.send_message(Message::Text(order_msg)).await?;
446
447 let asset_msg: SmartString = simd_json::to_string(&asset_subscription)?.into();
449 self.send_message(Message::Text(asset_msg)).await?;
450
451 debug!("Bithumb private channels subscription completed");
452 Ok(())
453 }
454
455 async fn send_message(&self, message: Message) -> Result<()> {
457 if let Some(sink) = self.ws_sink.write().await.as_mut() {
458 let frame = message.to_frame_view();
459 sink.send(frame).await?;
460 self.metrics.messages_sent.fetch_add(1, Ordering::Relaxed);
461 Ok(())
462 } else {
463 bail!("WebSocket sink not available")
464 }
465 }
466
467 async fn start_ping_task(&self) {
469 let sink = self.ws_sink.clone();
470 let last_ping = self.last_ping_time.clone();
471 let clock = self.clock.clone();
472 let metrics = self.metrics.clone();
473
474 let task = tokio::spawn(async move {
475 let mut interval = tokio::time::interval(Duration::from_secs(PING_INTERVAL_SECONDS));
476
477 loop {
478 interval.tick().await;
479
480 if let Some(sink) = sink.write().await.as_mut() {
482 let ping_message = Message::Ping(Vec::new());
483 let frame = ping_message.to_frame_view();
484 if let Err(e) = sink.send(frame).await {
485 error!("Failed to send ping: {e}");
486 break;
487 }
488
489 last_ping.store(clock.raw(), Ordering::Release);
490 metrics.pings_sent.fetch_add(1, Ordering::Relaxed);
491 } else {
492 break;
493 }
494 }
495 });
496
497 self.task_handles.write().await.ping_handle = Some(task);
498 }
499
500 async fn start_message_task(&self, report_tx: Sender<ExecutionReport>) {
502 let stream = self.ws_stream.clone();
503 let last_pong_time = self.last_pong_time.clone();
504 let clock = self.clock.clone();
505 let metrics = self.metrics.clone();
506
507 let task = tokio::spawn(async move {
508 loop {
509 let mut stream_guard = stream.write().await;
511 if let Some(ws_stream) = stream_guard.as_mut() {
512 if let Some(frame) = ws_stream.next().await {
513 drop(stream_guard); let message = Message::from_frame_view(frame);
516 metrics.messages_received.fetch_add(1, Ordering::Relaxed);
517
518 match message {
519 Message::Text(text) => {
520 debug!("Received text message: {text}");
521 if let Err(e) =
522 Self::process_message(&text, &report_tx, &clock).await
523 {
524 error!("Failed to process message: {e}");
525 }
526 }
527 Message::Pong(_) => {
528 last_pong_time.store(clock.raw(), Ordering::Release);
529 metrics.pongs_received.fetch_add(1, Ordering::Relaxed);
530 }
531 Message::Close(_) => {
532 warn!("Received close message from Bithumb WebSocket");
533 break;
534 }
535 _ => {
536 debug!("Received other message type: {message:?}");
537 }
538 }
539 } else {
540 break; }
542 } else {
543 break; }
545 }
546 });
547
548 self.task_handles.write().await.message_handler = Some(task);
549 }
550
551 async fn process_message(
553 text: &str,
554 report_tx: &Sender<ExecutionReport>,
555 clock: &Clock,
556 ) -> Result<()> {
557 let mut json_bytes = text.as_bytes().to_vec();
560 let json: simd_json::OwnedValue = simd_json::from_slice(&mut json_bytes)?;
561
562 if let Some(msg_type) = json.get("type").and_then(|v| v.as_str()) {
564 match msg_type {
565 "myOrder" => {
566 Self::handle_order_update(&json, report_tx, clock).await?;
567 }
568 "myAsset" => {
569 Self::handle_asset_update(&json).await?;
570 }
571 "auth" => {
572 debug!("Authentication response: {json:?}");
573 }
574 "subscribe" => {
575 debug!("Subscription response: {json:?}");
576 }
577 _ => {
578 debug!("Unknown message type: {msg_type}");
579 }
580 }
581 } else {
582 debug!("Message without type field: {text}");
583 }
584
585 Ok(())
586 }
587
588 async fn handle_order_update(
590 json: &simd_json::OwnedValue,
591 report_tx: &Sender<ExecutionReport>,
592 clock: &Clock,
593 ) -> Result<()> {
594 debug!("Processing Bithumb order update: {json:?}");
595
596 let order_id = json
598 .get("uuid")
599 .and_then(|v| v.as_str())
600 .unwrap_or("unknown")
601 .into();
602
603 let symbol = json
604 .get("code")
605 .and_then(|v| v.as_str())
606 .unwrap_or("UNKNOWN");
607
608 let state_str = json
609 .get("state")
610 .and_then(|v| v.as_str())
611 .unwrap_or("unknown");
612
613 let status = Self::map_bithumb_order_state(state_str);
614
615 let executed_volume = json
616 .get("executed_volume")
617 .and_then(simd_json::prelude::ValueAsScalar::as_f64)
618 .and_then(|f| Decimal::try_from(f).ok())
619 .unwrap_or(Decimal::ZERO);
620
621 let remaining_volume = json
622 .get("remaining_volume")
623 .and_then(simd_json::prelude::ValueAsScalar::as_f64)
624 .and_then(|f| Decimal::try_from(f).ok())
625 .unwrap_or(Decimal::ZERO);
626
627 let execution_price = json
628 .get("price")
629 .and_then(simd_json::prelude::ValueAsScalar::as_f64)
630 .and_then(|f| Decimal::try_from(f).ok());
631
632 let timestamp = json
633 .get("timestamp")
634 .and_then(simd_json::prelude::ValueAsScalar::as_u64)
635 .unwrap_or(0)
636 * 1_000_000; let report = ExecutionReport {
639 id: id_generation::generate_exchange_order_id("bithumb"),
640 order_id,
641 exchange_timestamp: timestamp,
642 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
643 instrument_id: InstrumentId::new(symbol, Venue::Bithumb),
644 status,
645 filled_quantity: executed_volume,
646 remaining_quantity: remaining_volume,
647 execution_price,
648 reject_reason: None,
649 exchange_execution_id: json
650 .get("trade_uuid")
651 .and_then(|v| v.as_str())
652 .map(std::convert::Into::into),
653 is_final: matches!(
654 status,
655 OrderStatus::Filled | OrderStatus::Cancelled | OrderStatus::Rejected
656 ),
657 };
658
659 if let Err(e) = report_tx.try_send(report) {
660 error!("Failed to send Bithumb order update report: {e}");
661 }
662
663 Ok(())
664 }
665
666 async fn handle_asset_update(json: &simd_json::OwnedValue) -> Result<()> {
668 debug!("Processing Bithumb asset update: {json:?}");
669
670 if let Some(assets) = json.get("assets").and_then(|v| v.as_array()) {
673 for (index, asset) in assets.iter().enumerate() {
674 match (
675 asset.get("currency").and_then(|v| v.as_str()),
676 asset.get("balance").and_then(|v| v.as_str()), asset.get("locked").and_then(|v| v.as_str()), ) {
679 (Some(currency), Some(balance), Some(locked)) => {
680 let balance_f64 = match balance.parse::<f64>() {
682 Ok(val) => val,
683 Err(e) => {
684 warn!("Failed to parse balance for {currency}: '{balance}' - {e}");
685 0.0
686 }
687 };
688 let locked_f64 = match locked.parse::<f64>() {
689 Ok(val) => val,
690 Err(e) => {
691 warn!(
692 "Failed to parse locked amount for {currency}: '{locked}' - {e}"
693 );
694 0.0
695 }
696 };
697
698 info!(
699 "Bithumb asset update - {currency}: balance={balance_f64}, locked={locked_f64}"
700 );
701 }
702 (currency, balance, locked) => {
703 warn!(
704 "Incomplete asset data at index {}: currency={:?}, balance={:?}, locked={:?}",
705 index,
706 currency.is_some(),
707 balance.is_some(),
708 locked.is_some()
709 );
710 }
711 }
712 }
713 } else {
714 warn!("No 'assets' array found in myAsset update or it's not an array");
715 }
716
717 Ok(())
718 }
719
720 fn map_bithumb_order_state(state: &str) -> OrderStatus {
722 map_websocket_order_state(state).unwrap_or(OrderStatus::Unknown)
723 }
724
725 pub async fn disconnect(&self) -> Result<()> {
727 info!("Disconnecting from Bithumb WebSocket");
728
729 self.set_state(BithumbConnectionState::Disconnecting);
730
731 {
733 let mut handles = self.task_handles.write().await;
734 if let Some(handle) = handles.ping_handle.take() {
735 handle.abort();
736 }
737 if let Some(handle) = handles.message_handler.take() {
738 handle.abort();
739 }
740 }
741
742 *self.ws_sink.write().await = None;
744 *self.ws_stream.write().await = None;
745
746 self.set_state(BithumbConnectionState::Disconnected);
747
748 info!("Disconnected from Bithumb WebSocket");
749 Ok(())
750 }
751}
752
753#[async_trait]
754impl crate::execution_engine::Exchange for BithumbWebSocketTrader {
755 fn venue(&self) -> Venue {
756 Venue::Bithumb
757 }
758
759 async fn place_order(
760 &self,
761 _order: Order,
762 _report_sender: Sender<ExecutionReport>,
763 ) -> Result<()> {
764 bail!("Order placement via WebSocket not supported by Bithumb - use REST API")
768 }
769
770 async fn cancel_order(
771 &self,
772 _order_id: SmartString,
773 _report_sender: Sender<ExecutionReport>,
774 ) -> Result<()> {
775 bail!("Order cancellation via WebSocket not supported by Bithumb - use REST API")
777 }
778
779 async fn modify_order(
780 &self,
781 _order_id: SmartString,
782 _new_price: Option<Decimal>,
783 _new_quantity: Option<Decimal>,
784 _report_sender: Sender<ExecutionReport>,
785 ) -> Result<()> {
786 bail!("Order modification not supported by Bithumb")
787 }
788
789 async fn cancel_all_orders(
790 &self,
791 _instrument_id: Option<InstrumentId>,
792 _report_sender: Sender<ExecutionReport>,
793 ) -> Result<()> {
794 bail!("Cancel all orders via WebSocket not supported by Bithumb - use REST API")
795 }
796
797 async fn get_order_status(&self, _order_id: &str) -> Result<OrderStatus> {
798 bail!("Order status query via WebSocket not supported by Bithumb - use REST API")
799 }
800
801 async fn connect(&self, _report_sender: Sender<ExecutionReport>) -> Result<()> {
802 bail!("Generic connect not implemented - use connect(report_tx)")
803 }
804
805 async fn disconnect(&self) -> Result<()> {
806 self.disconnect().await
807 }
808
809 async fn is_connected(&self) -> bool {
810 self.is_connected()
811 }
812
813 async fn get_instruments(&self) -> Result<smallvec::SmallVec<[InstrumentId; 32]>> {
814 bail!("Instrument discovery via WebSocket not supported by Bithumb - use REST API")
816 }
817
818 async fn send_fix_message(&self, _message: Vec<u8>) -> Result<()> {
819 bail!("FIX protocol not supported by Bithumb - use native WebSocket API")
820 }
821
822 async fn receive_fix_message(&self) -> Result<Vec<u8>> {
823 bail!("FIX protocol not supported by Bithumb - use native WebSocket API")
824 }
825}
826
827#[cfg(test)]
828mod tests {
829 use super::*;
830 use crate::execution_engine::Exchange;
831 use rust_decimal::Decimal;
832 use rusty_common::auth::exchanges::bithumb::BithumbAuth;
833 use rusty_model::enums::OrderSide;
834
835 #[test]
836 fn test_bithumb_websocket_trader_creation() {
837 let auth = Arc::new(BithumbAuth::new("test_key".into(), "test_secret".into()));
838 let trader = BithumbWebSocketTrader::new(auth);
839
840 assert_eq!(
841 trader.connection_state(),
842 BithumbConnectionState::Disconnected
843 );
844 assert!(!trader.is_connected());
845 assert_eq!(trader.venue(), Venue::Bithumb);
846 }
847
848 #[test]
849 fn test_connection_state_transitions() {
850 let auth = Arc::new(BithumbAuth::new("test_key".into(), "test_secret".into()));
851 let trader = BithumbWebSocketTrader::new(auth);
852
853 trader.set_state(BithumbConnectionState::Connecting);
855 assert_eq!(
856 trader.connection_state(),
857 BithumbConnectionState::Connecting
858 );
859
860 trader.set_state(BithumbConnectionState::Connected);
861 assert_eq!(trader.connection_state(), BithumbConnectionState::Connected);
862 assert!(trader.is_connected());
863
864 trader.set_state(BithumbConnectionState::Authenticated);
865 assert_eq!(
866 trader.connection_state(),
867 BithumbConnectionState::Authenticated
868 );
869 assert!(trader.is_connected());
870 assert!(trader.is_authenticated());
871 }
872
873 #[test]
874 fn test_connection_health() {
875 let auth = Arc::new(BithumbAuth::new("test_key".into(), "test_secret".into()));
876 let trader = BithumbWebSocketTrader::new(auth);
877
878 let health = trader.get_connection_health();
879 assert_eq!(health.state, BithumbConnectionState::Disconnected);
880 assert!(!health.is_connected);
881 assert!(!health.is_authenticated);
882 assert!(health.time_since_last_ping.is_none());
883 assert_eq!(health.messages_sent, 0);
884 assert_eq!(health.reconnection_attempts, 0);
885 }
886
887 #[test]
888 fn test_order_status_mapping() {
889 assert_eq!(
891 map_websocket_order_status("placed").unwrap(),
892 OrderStatus::New
893 );
894 assert_eq!(
895 map_websocket_order_status("pending").unwrap(),
896 OrderStatus::Open
897 );
898 assert_eq!(
899 map_websocket_order_status("partial").unwrap(),
900 OrderStatus::PartiallyFilled
901 );
902 assert_eq!(
903 map_websocket_order_status("completed").unwrap(),
904 OrderStatus::Filled
905 );
906 assert_eq!(
907 map_websocket_order_status("cancelled").unwrap(),
908 OrderStatus::Cancelled
909 );
910 assert_eq!(
911 map_websocket_order_status("rejected").unwrap(),
912 OrderStatus::Rejected
913 );
914 assert!(map_websocket_order_status("unknown").is_err());
915
916 assert_eq!(
918 map_websocket_order_state("wait").unwrap(),
919 OrderStatus::Open
920 );
921 assert_eq!(
922 map_websocket_order_state("trade").unwrap(),
923 OrderStatus::PartiallyFilled
924 );
925 assert_eq!(
926 map_websocket_order_state("done").unwrap(),
927 OrderStatus::Filled
928 );
929 assert_eq!(
930 map_websocket_order_state("cancel").unwrap(),
931 OrderStatus::Cancelled
932 );
933 assert!(map_websocket_order_state("unknown").is_err());
934 }
935
936 #[tokio::test]
937 async fn test_exchange_trait_methods_return_errors() {
938 let auth = Arc::new(BithumbAuth::new("test_key".into(), "test_secret".into()));
939 let trader = BithumbWebSocketTrader::new(auth);
940 let (report_tx, _report_rx) = flume::bounded(100);
941
942 let dummy_order = Order::new(
944 Venue::Bithumb,
945 "BTC_KRW",
946 OrderSide::Buy,
947 rusty_model::enums::OrderType::Limit,
948 Decimal::ONE,
949 Some(Decimal::from(50000)),
950 rusty_model::types::ClientId::new("test_client"),
951 );
952
953 assert!(
954 trader
955 .place_order(dummy_order, report_tx.clone())
956 .await
957 .is_err()
958 );
959 assert!(
960 trader
961 .cancel_order("test".into(), report_tx.clone())
962 .await
963 .is_err()
964 );
965 assert!(
966 trader
967 .modify_order("test".into(), None, None, report_tx)
968 .await
969 .is_err()
970 );
971 assert!(trader.get_order_status("test").await.is_err());
972 assert!(trader.get_instruments().await.is_err());
973 }
974
975 #[test]
976 fn test_subscription_format() {
977 let ticket = "test_ticket";
979
980 let order_sub = simd_json::json!([
982 {"ticket": ticket},
983 {"type": "myOrder", "codes": []},
984 {"format": "DEFAULT"}
985 ]);
986
987 let order_str = simd_json::to_string(&order_sub).unwrap();
988 assert!(order_str.contains(r"ticket"));
989 assert!(order_str.contains(r"myOrder"));
990 assert!(order_str.starts_with('[') && order_str.ends_with(']'));
991
992 let asset_sub = simd_json::json!([
994 {"ticket": ticket},
995 {"type": "myAsset"},
996 {"format": "DEFAULT"}
997 ]);
998
999 let asset_str = simd_json::to_string(&asset_sub).unwrap();
1000 assert!(asset_str.contains(r"ticket"));
1001 assert!(asset_str.contains(r"myAsset"));
1002 assert!(asset_str.starts_with('[') && asset_str.ends_with(']'));
1003 }
1004
1005 #[test]
1006 fn test_bithumb_order_state_mapping() {
1007 assert_eq!(
1009 BithumbWebSocketTrader::map_bithumb_order_state("wait"),
1010 OrderStatus::Open
1011 );
1012 assert_eq!(
1013 BithumbWebSocketTrader::map_bithumb_order_state("trade"),
1014 OrderStatus::PartiallyFilled
1015 );
1016 assert_eq!(
1017 BithumbWebSocketTrader::map_bithumb_order_state("done"),
1018 OrderStatus::Filled
1019 );
1020 assert_eq!(
1021 BithumbWebSocketTrader::map_bithumb_order_state("cancel"),
1022 OrderStatus::Cancelled
1023 );
1024 assert_eq!(
1025 BithumbWebSocketTrader::map_bithumb_order_state("unknown"),
1026 OrderStatus::Unknown
1027 );
1028 }
1029
1030 #[tokio::test]
1031 async fn test_websocket_headers_include_jwt() {
1032 let auth = Arc::new(BithumbAuth::new("test_key".into(), "test_secret".into()));
1033
1034 let headers = auth
1036 .generate_headers("GET", "/websocket/v1/private", None)
1037 .unwrap();
1038 let auth_key: SmartString = "Authorization".into();
1039 let auth_header = headers.get(&auth_key).unwrap();
1040
1041 assert!(auth_header.starts_with("Bearer "));
1043
1044 let jwt = auth_header.strip_prefix("Bearer ").unwrap();
1046 let parts: Vec<&str> = jwt.split('.').collect();
1047 assert_eq!(parts.len(), 3, "JWT should have 3 parts separated by dots");
1048 }
1049}