1use rusty_common::collections::FxHashMap;
7use std::any::Any;
8use std::str::FromStr;
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11
12use crate::common::rest_utils::{current_timestamp_ms, get_request};
13use crate::exchange::zerocopy_helpers::{deserialize_from_slice_borrowed, deserialize_from_vec};
14use crate::provider::prelude::*;
15use anyhow::{Result, anyhow};
16use async_trait::async_trait;
17use parking_lot::RwLock;
18use quanta::Clock;
19use rust_decimal::Decimal;
20use rusty_common::auth::exchanges::bithumb::BithumbAuth;
21use rusty_common::json;
22use rusty_common::websocket::{
23 Message, MessageHandler, WebSocketClient, WebSocketConfig, WebSocketError, WebSocketResult,
24};
25use rusty_model::{
26 data::{
27 book_snapshot::OrderBookSnapshot,
28 market_trade::MarketTrade,
29 orderbook::PriceLevel,
30 simd_orderbook::{SharedSimdOrderBook, SimdOrderBook},
31 },
32 enums::OrderSide,
33 instruments::{Instrument, InstrumentId},
34 venues::Venue,
35};
36use smallvec::SmallVec;
37use smartstring::alias::String;
38use tokio::sync::{mpsc, watch};
39use tokio::task::JoinHandle;
40
41use super::{
42 data::{
43 orderbook::{Orderbook, OrderbookMessage},
44 subscription::{BithumbSubscription, ResponseFormat, StatusMessage},
45 trade::TradeMessage,
46 },
47 types::{BITHUMB_PRIVATE_WS_URL, BITHUMB_PUBLIC_WS_URL, BITHUMB_RATE_LIMITS, TicketRequest},
48};
49
50#[derive(Debug, Clone)]
52pub struct BithumbInstrument {
53 pub symbol: String,
55 pub base_currency: String,
57 pub quote_currency: String,
59 pub min_price: Decimal,
61 pub max_price: Decimal,
63 pub price_tick: Decimal,
65 pub min_quantity: Decimal,
67 pub max_quantity: Decimal,
69}
70
71impl Instrument for BithumbInstrument {
72 fn id(&self) -> InstrumentId {
73 InstrumentId::new(self.symbol.clone(), Venue::Bithumb)
74 }
75
76 fn symbol(&self) -> String {
77 self.symbol.clone()
78 }
79
80 fn venue(&self) -> Venue {
81 Venue::Bithumb
82 }
83
84 fn as_any(&self) -> &dyn Any {
85 self
86 }
87
88 fn clone_box(&self) -> Box<dyn Instrument> {
89 Box::new(self.clone())
90 }
91}
92
93#[derive(Debug, Clone)]
95struct ThreadSafeInstrument {
96 symbol: String,
97 base_currency: String,
98 quote_currency: String,
99 min_price: Decimal,
100 max_price: Decimal,
101 price_tick: Decimal,
102 min_quantity: Decimal,
103 max_quantity: Decimal,
104}
105
106impl ThreadSafeInstrument {
107 fn from_bithumb_instrument(instrument: &BithumbInstrument) -> Self {
108 Self {
109 symbol: instrument.symbol.clone(),
110 base_currency: instrument.base_currency.clone(),
111 quote_currency: instrument.quote_currency.clone(),
112 min_price: instrument.min_price,
113 max_price: instrument.max_price,
114 price_tick: instrument.price_tick,
115 min_quantity: instrument.min_quantity,
116 max_quantity: instrument.max_quantity,
117 }
118 }
119
120 fn to_bithumb_instrument(&self) -> BithumbInstrument {
121 BithumbInstrument {
122 symbol: self.symbol.clone(),
123 base_currency: self.base_currency.clone(),
124 quote_currency: self.quote_currency.clone(),
125 min_price: self.min_price,
126 max_price: self.max_price,
127 price_tick: self.price_tick,
128 min_quantity: self.min_quantity,
129 max_quantity: self.max_quantity,
130 }
131 }
132}
133
134unsafe impl Send for ThreadSafeInstrument {}
136unsafe impl Sync for ThreadSafeInstrument {}
137
138#[derive(Debug)]
140pub struct BithumbProvider {
141 config: ConnectionConfig,
143
144 connection_status: Arc<RwLock<ConnectionState>>,
146
147 stats: Arc<RwLock<ConnectionStats>>,
149
150 clock: Clock,
152
153 subscriptions: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
155
156 instruments: Arc<RwLock<FxHashMap<String, ThreadSafeInstrument>>>,
158
159 ws_handles: Arc<RwLock<FxHashMap<String, JoinHandle<()>>>>,
161
162 last_connection_attempt: Arc<RwLock<Instant>>,
164
165 http_client: reqwest::Client,
167
168 auth: Option<BithumbAuth>,
170}
171
172impl Default for BithumbProvider {
173 fn default() -> Self {
174 Self::new()
175 }
176}
177
178impl BithumbProvider {
179 #[must_use]
181 pub fn new() -> Self {
182 Self::with_config(ConnectionConfig::default())
183 }
184
185 #[must_use]
187 pub fn with_config(config: ConnectionConfig) -> Self {
188 let clock = config.clock.clone();
189
190 let auth =
192 if config.auth_config.api_key.is_some() && config.auth_config.secret_key.is_some() {
193 let auth_config = BithumbAuth::new(
194 config.auth_config.api_key.clone().unwrap(),
195 config.auth_config.secret_key.clone().unwrap(),
196 );
197 Some(auth_config)
198 } else {
199 None
200 };
201
202 let mut headers = reqwest::header::HeaderMap::new();
204 headers.insert(
205 "User-Agent",
206 reqwest::header::HeaderValue::from_str(&config.rest_config.user_agent)
207 .unwrap_or_else(|_| reqwest::header::HeaderValue::from_static("RustyHFT/1.0")),
208 );
209
210 let http_client = reqwest::Client::builder()
211 .timeout(Duration::from_millis(
212 config.rest_config.timeout_milliseconds,
213 ))
214 .connect_timeout(Duration::from_millis(
215 config.rest_config.timeout_milliseconds / 2,
216 ))
217 .pool_max_idle_per_host(config.rest_config.connection_pool_size)
218 .pool_idle_timeout(Duration::from_millis(
219 config.rest_config.keep_alive_milliseconds,
220 ))
221 .default_headers(headers)
222 .build()
223 .unwrap_or_default();
224
225 Self {
226 config,
227 connection_status: Arc::new(RwLock::new(ConnectionState::Disconnected)),
228 stats: Arc::new(RwLock::new(ConnectionStats::default())),
229 clock,
230 subscriptions: Arc::new(RwLock::new(FxHashMap::default())),
231 instruments: Arc::new(RwLock::new(FxHashMap::default())),
232 ws_handles: Arc::new(RwLock::new(FxHashMap::default())),
233 last_connection_attempt: Arc::new(RwLock::new(Instant::now())),
234 http_client,
235 auth,
236 }
237 }
238
239 fn create_ws_config(&self, private: bool) -> WebSocketConfig {
241 let url = if private {
242 BITHUMB_PRIVATE_WS_URL.to_string()
243 } else {
244 BITHUMB_PUBLIC_WS_URL.to_string()
245 };
246
247 WebSocketConfig::builder(rusty_common::types::Exchange::Bithumb, url)
248 .ping_interval(Duration::from_millis(
249 self.config.websocket_config.ping_interval_milliseconds,
250 ))
251 .timeout(Duration::from_millis(
252 self.config.websocket_config.timeout_milliseconds,
253 ))
254 .compression(rusty_common::websocket::CompressionConfig::disabled())
255 .build()
256 }
257
258 async fn create_trade_connection(
260 &self,
261 symbols: SmallVec<[String; 8]>,
262 tx: mpsc::Sender<TradeMessage>,
263 ) -> Result<JoinHandle<()>> {
264 let connection_id = format!("trade-{}", symbols.join(","));
265
266 let ticket = TicketRequest {
268 ticket: String::from(uuid::Uuid::new_v4().to_string()),
269 };
270 let subscription = BithumbSubscription::trade(symbols.to_vec());
271 let format = ResponseFormat::default();
272
273 let batch = simd_json::OwnedValue::Array(Box::new(vec![
275 simd_json::serde::to_owned_value(&ticket)?,
276 simd_json::serde::to_owned_value(&subscription)?,
277 simd_json::serde::to_owned_value(&format)?,
278 ]));
279
280 let (stop_tx, stop_rx) = watch::channel(false);
282 self.subscriptions
283 .write()
284 .insert(connection_id.clone().into(), stop_tx);
285
286 let clock = self.clock.clone();
288 let stats = self.stats.clone();
289 let connection_status = self.connection_status.clone();
290 let ws_config = self.create_ws_config(false); let handle = tokio::spawn(async move {
294 loop {
295 if *stop_rx.borrow() {
297 break;
298 }
299
300 let handler = BithumbMessageHandler::new_trade_handler(
302 clock.clone(),
303 stats.clone(),
304 tx.clone(),
305 );
306
307 let mut client = WebSocketClient::new(ws_config.clone());
309
310 *connection_status.write() = ConnectionState::Connecting;
312
313 if let Err(e) = client.run(handler).await {
315 log::error!("WebSocket client error: {e}");
316 *connection_status.write() = ConnectionState::Error;
317 }
318
319 if *stop_rx.borrow() {
321 break;
322 }
323
324 tokio::time::sleep(Duration::from_millis(1000)).await;
326 }
327 });
328
329 Ok(handle)
330 }
331
332 async fn create_orderbook_connection(
334 &self,
335 symbols: SmallVec<[String; 8]>,
336 tx: mpsc::Sender<OrderbookMessage>,
337 ) -> Result<JoinHandle<()>> {
338 let connection_id = format!("orderbook:{}", symbols.join(","));
339
340 let ticket = TicketRequest {
342 ticket: String::from(uuid::Uuid::new_v4().to_string()),
343 };
344 let subscription = BithumbSubscription::orderbook(symbols.to_vec(), None);
345 let format = ResponseFormat::default();
346
347 let batch = simd_json::OwnedValue::Array(Box::new(vec![
349 simd_json::serde::to_owned_value(&ticket)?,
350 simd_json::serde::to_owned_value(&subscription)?,
351 simd_json::serde::to_owned_value(&format)?,
352 ]));
353
354 let (stop_tx, stop_rx) = watch::channel(false);
356 self.subscriptions
357 .write()
358 .insert(connection_id.clone().into(), stop_tx);
359
360 let clock = self.clock.clone();
362 let stats = self.stats.clone();
363 let connection_status = self.connection_status.clone();
364 let ws_config = self.create_ws_config(false); let handle = tokio::spawn(async move {
368 loop {
369 if *stop_rx.borrow() {
371 break;
372 }
373
374 let handler = BithumbMessageHandler::new_orderbook_handler(
376 clock.clone(),
377 stats.clone(),
378 tx.clone(),
379 );
380
381 let mut client = WebSocketClient::new(ws_config.clone());
383
384 *connection_status.write() = ConnectionState::Connecting;
386
387 if let Err(e) = client.run(handler).await {
389 log::error!("WebSocket client error: {e}");
390 *connection_status.write() = ConnectionState::Error;
391 }
392
393 if *stop_rx.borrow() {
395 break;
396 }
397
398 tokio::time::sleep(Duration::from_millis(1000)).await;
400 }
401 });
402
403 Ok(handle)
404 }
405
406 #[inline]
408 const fn convert_exchange_timestamp(&self, timestamp_ms: u64) -> u64 {
409 timestamp_ms * 1_000_000 }
411}
412
413struct BithumbMessageHandler {
415 clock: Clock,
416 stats: Arc<RwLock<ConnectionStats>>,
417 trade_tx: Option<mpsc::Sender<TradeMessage>>,
418 orderbook_tx: Option<mpsc::Sender<OrderbookMessage>>,
419}
420
421#[async_trait]
422impl MessageHandler for BithumbMessageHandler {
423 async fn on_connected(&mut self) -> WebSocketResult<()> {
424 log::info!("Bithumb WebSocket connected");
425 Ok(())
426 }
427
428 async fn on_message(&mut self, message: Message) -> WebSocketResult<()> {
429 match message {
430 Message::Text(text) => {
431 match deserialize_from_slice_borrowed::<json::Value>(text.as_bytes()) {
433 Ok(value) => {
434 if let Err(e) = self.handle_market_data(&value).await {
435 log::error!("Failed to handle market data: {e}");
436 }
437 }
438 Err(e) => {
439 log::error!("Failed to parse WebSocket message: {e}");
440 }
441 }
442 }
443 Message::Binary(data) => {
444 match deserialize_from_vec::<json::Value>(data) {
446 Ok(value) => {
447 if let Err(e) = self.handle_market_data(&value).await {
448 log::error!("Failed to handle market data: {e}");
449 }
450 }
451 Err(e) => {
452 log::error!("Failed to parse binary WebSocket message: {e}");
453 }
454 }
455 }
456 _ => {}
457 }
458 Ok(())
459 }
460
461 async fn on_error(&mut self, error: WebSocketError) -> WebSocketResult<()> {
462 log::error!("Bithumb WebSocket error: {}", error);
463 Ok(())
464 }
465
466 async fn on_disconnected(&mut self) -> WebSocketResult<()> {
467 log::info!("Bithumb WebSocket disconnected");
468 Ok(())
469 }
470}
471
472impl BithumbMessageHandler {
473 const fn new_trade_handler(
474 clock: Clock,
475 stats: Arc<RwLock<ConnectionStats>>,
476 tx: mpsc::Sender<TradeMessage>,
477 ) -> Self {
478 Self {
479 clock,
480 stats,
481 trade_tx: Some(tx),
482 orderbook_tx: None,
483 }
484 }
485
486 const fn new_orderbook_handler(
487 clock: Clock,
488 stats: Arc<RwLock<ConnectionStats>>,
489 tx: mpsc::Sender<OrderbookMessage>,
490 ) -> Self {
491 Self {
492 clock,
493 stats,
494 trade_tx: None,
495 orderbook_tx: Some(tx),
496 }
497 }
498
499 async fn handle_market_data(&self, value: &json::Value) -> Result<()> {
500 if let Ok(trade_msg) = json::from_value::<TradeMessage>(value.clone()) {
503 return self.handle_trade_message(trade_msg).await;
504 }
505
506 if let Ok(orderbook_msg) = json::from_value::<OrderbookMessage>(value.clone()) {
508 return self.handle_orderbook_message(orderbook_msg).await;
509 }
510
511 if let Ok(_status) = json::from_value::<StatusMessage>(value.clone()) {
513 log::debug!("Received status message");
514 return Ok(());
515 }
516
517 log::debug!("Unhandled message type");
518 Ok(())
519 }
520
521 async fn handle_trade_message(&self, msg: TradeMessage) -> Result<()> {
522 self.stats.write().messages_received += 1;
524
525 if let Some(tx) = &self.trade_tx
527 && let Err(e) = tx.send(msg).await
528 {
529 log::error!("Failed to send trade message: {e}");
530 }
531
532 Ok(())
533 }
534
535 async fn handle_orderbook_message(&self, msg: OrderbookMessage) -> Result<()> {
536 self.stats.write().messages_received += 1;
538
539 if let Some(tx) = &self.orderbook_tx
541 && let Err(e) = tx.send(msg).await
542 {
543 log::error!("Failed to send orderbook message: {e}");
544 }
545
546 Ok(())
547 }
548}
549
550#[async_trait]
551impl Provider for BithumbProvider {
552 type TradeMessage = TradeMessage;
553 type DepthMessage = OrderbookMessage;
554 type InstrumentMessage = BithumbInstrument;
555
556 fn name(&self) -> &'static str {
557 "Bithumb"
558 }
559
560 fn venue(&self) -> rusty_model::venues::Venue {
561 rusty_model::venues::Venue::Bithumb
562 }
563
564 fn config(&self) -> &ConnectionConfig {
565 &self.config
566 }
567
568 async fn init(&mut self) -> Result<()> {
569 match self.get_instruments().await {
572 Ok(_) => Ok(()),
573 Err(e) => Err(anyhow!("Failed to initialize Bithumb provider: {}", e)),
574 }
575 }
576
577 async fn shutdown(&mut self) -> Result<()> {
578 let mut keys_to_remove = Vec::new();
580
581 for (key, _) in self.subscriptions.read().iter() {
582 keys_to_remove.push(key.clone());
583 }
584
585 for key in keys_to_remove {
587 if let Some(tx) = self.subscriptions.write().remove(&key) {
588 let _ = tx.send(true);
589 }
590
591 if let Some(handle) = self.ws_handles.write().remove(&key) {
593 handle.abort();
594 }
595 }
596
597 *self.connection_status.write() = ConnectionState::Disconnected;
599
600 Ok(())
601 }
602
603 async fn get_realtime_orderbook(&self, symbol: &str) -> Result<SharedSimdOrderBook> {
604 let instrument_id = InstrumentId::new(symbol, Venue::Bithumb);
606 let ob = rusty_model::data::orderbook::OrderBook::<64>::new_empty(instrument_id);
607 let orderbook = SharedSimdOrderBook::from_orderbook(&ob);
608
609 let (_tx, mut rx) = mpsc::channel(100);
610 let mut symbol_list = SmallVec::<[String; 8]>::new();
611 symbol_list.push(symbol.into());
612 let _depth_rx = self.subscribe_orderbook(symbol_list).await?;
613
614 let orderbook_clone = orderbook.clone();
616 let clock = self.clock.clone();
617
618 tokio::spawn(async move {
619 while let Some(msg) = rx.recv().await {
620 let now = clock.raw();
621 let ob = Orderbook::from_orderbook_message(&msg, now);
622
623 let bids_vec: SmallVec<[PriceLevel; 64]> = ob
625 .bids
626 .iter()
627 .map(|level| PriceLevel::new(level.price, level.quantity))
628 .collect();
629 let asks_vec: SmallVec<[PriceLevel; 64]> = ob
630 .asks
631 .iter()
632 .map(|level| PriceLevel::new(level.price, level.quantity))
633 .collect();
634
635 let orderbook = rusty_model::data::orderbook::OrderBook::new(
636 ob.code.clone(),
637 ob.timestamp_ns,
638 now,
639 bids_vec,
640 asks_vec,
641 );
642 orderbook_clone.write(|book| {
643 *book = SimdOrderBook::from_orderbook(&orderbook);
644 });
645 }
646 });
647
648 Ok(orderbook)
649 }
650
651 async fn subscribe_trades(
652 &self,
653 symbols: SmallVec<[String; 8]>,
654 ) -> Result<mpsc::Receiver<Self::TradeMessage>> {
655 let (tx, rx) = mpsc::channel(1024);
657
658 let handle = self.create_trade_connection(symbols.clone(), tx).await?;
660
661 let connection_id = format!("trade-{}", symbols.join(","));
663 self.ws_handles.write().insert(connection_id.into(), handle);
664
665 Ok(rx)
666 }
667
668 async fn unsubscribe_trades(&self) -> Result<()> {
669 let mut keys_to_remove = Vec::new();
671
672 for (key, _) in self.subscriptions.read().iter() {
673 if key.starts_with("trade-") {
674 keys_to_remove.push(key.clone());
675 }
676 }
677
678 for key in keys_to_remove {
680 if let Some(tx) = self.subscriptions.write().remove(&key) {
681 let _ = tx.send(true);
682 }
683
684 if let Some(handle) = self.ws_handles.write().remove(&key) {
686 handle.abort();
687 }
688 }
689
690 Ok(())
691 }
692
693 async fn subscribe_orderbook(
694 &self,
695 symbols: SmallVec<[String; 8]>,
696 ) -> Result<mpsc::Receiver<Self::DepthMessage>> {
697 let (tx, rx) = mpsc::channel(1024);
699
700 let handle = self
702 .create_orderbook_connection(symbols.clone(), tx)
703 .await?;
704
705 let connection_id = format!("orderbook:{}", symbols.join(","));
707 self.ws_handles.write().insert(connection_id.into(), handle);
708
709 Ok(rx)
710 }
711
712 async fn unsubscribe_orderbook(&self) -> Result<()> {
713 let mut keys_to_remove = Vec::new();
715
716 for (key, _) in self.subscriptions.read().iter() {
717 if key.starts_with("orderbook:") {
718 keys_to_remove.push(key.clone());
719 }
720 }
721
722 for key in keys_to_remove {
724 if let Some(tx) = self.subscriptions.write().remove(&key) {
725 let _ = tx.send(true);
726 }
727
728 if let Some(handle) = self.ws_handles.write().remove(&key) {
730 handle.abort();
731 }
732 }
733
734 Ok(())
735 }
736
737 async fn get_instruments(&self) -> Result<Vec<Box<dyn Instrument>>> {
738 {
740 let instruments = self.instruments.read();
741 if !instruments.is_empty() {
742 let boxed_instruments: Vec<Box<dyn Instrument>> = instruments
744 .values()
745 .map(|ts_instrument| {
746 let bithumb_instrument = ts_instrument.to_bithumb_instrument();
747 Box::new(bithumb_instrument) as Box<dyn Instrument>
748 })
749 .collect();
750 return Ok(boxed_instruments);
751 }
752 }
753
754 let client = &self.http_client;
756
757 #[derive(serde::Deserialize)]
758 struct TickerResponse {
759 status: String,
760 data: FxHashMap<String, TickerData>,
761 }
762
763 #[derive(serde::Deserialize)]
764 #[allow(dead_code)]
765 struct TickerData {
766 opening_price: String,
767 closing_price: String,
768 min_price: String,
769 max_price: String,
770 units_traded: String,
771 acc_trade_value: String,
772 prev_closing_price: String,
773 #[serde(rename = "units_traded_24H")]
774 units_traded_24h: String,
775 #[serde(rename = "acc_trade_value_24H")]
776 acc_trade_value_24h: String,
777 #[serde(rename = "fluctate_24H")]
778 fluctate_24h: String,
779 #[serde(rename = "fluctate_rate_24H")]
780 fluctate_rate_24h: String,
781 }
782
783 let endpoint = "/public/ticker/ALL_KRW";
785 let url = format!("https://api.bithumb.com{endpoint}");
786 let tickers: TickerResponse = get_request(client, &url).await?;
787
788 if tickers.status != "0000" {
789 return Err(anyhow!("Failed to fetch tickers: {}", tickers.status));
790 }
791
792 let mut instrument_boxes: SmallVec<[Box<dyn Instrument>; 128]> =
794 SmallVec::<[Box<dyn Instrument>; 128]>::new();
795 let mut thread_safe_instruments = SmallVec::<[(String, ThreadSafeInstrument); 128]>::new();
796
797 for (symbol, ticker_data) in tickers.data.iter().filter(|(symbol, _)| *symbol != "date") {
799 let min_price = ticker_data
801 .min_price
802 .parse::<Decimal>()
803 .unwrap_or(Decimal::ZERO);
804 let max_price = ticker_data
805 .max_price
806 .parse::<Decimal>()
807 .unwrap_or(Decimal::ZERO);
808
809 let price_tick = if max_price > Decimal::ZERO {
812 if max_price > Decimal::from(1_000_000) {
814 Decimal::from(100)
815 } else if max_price > Decimal::from(100_000) {
816 Decimal::from(10)
817 } else if max_price > Decimal::from(10_000) {
818 Decimal::from(1)
819 } else if max_price > Decimal::from(1_000) {
820 Decimal::from_str("0.1").unwrap_or(Decimal::ONE)
821 } else if max_price > Decimal::from(100) {
822 Decimal::from_str("0.01").unwrap_or(Decimal::ONE)
823 } else {
824 Decimal::from_str("0.001").unwrap_or(Decimal::ONE)
825 }
826 } else {
827 Decimal::ONE
828 };
829
830 let min_quantity = Decimal::from_str("0.0001").unwrap_or(Decimal::ZERO); let max_quantity = Decimal::from(10000); let bithumb_instrument = BithumbInstrument {
837 symbol: format!("KRW-{symbol}").into(), base_currency: symbol.clone(),
839 quote_currency: "KRW".into(),
840 min_price,
841 max_price,
842 price_tick,
843 min_quantity,
844 max_quantity,
845 };
846
847 let thread_safe = ThreadSafeInstrument::from_bithumb_instrument(&bithumb_instrument);
849
850 thread_safe_instruments.push((bithumb_instrument.symbol.clone(), thread_safe));
852 instrument_boxes.push(Box::new(bithumb_instrument) as Box<dyn Instrument>);
853 }
854
855 let mut cache = self.instruments.write();
857 for (symbol, instrument) in thread_safe_instruments {
858 cache.insert(symbol, instrument);
859 }
860
861 Ok(instrument_boxes.into_vec())
862 }
863
864 async fn get_historical_trades(
865 &self,
866 symbol: &str,
867 limit: Option<u32>,
868 ) -> Result<Vec<MarketTrade>> {
869 let client = &self.http_client;
870 let mut params: FxHashMap<String, String> = FxHashMap::default();
871
872 let parts: SmallVec<[&str; 4]> = symbol.split('-').collect();
874 let base_currency = if parts.len() >= 2 { parts[1] } else { symbol };
875 let quote_currency = if parts.len() >= 2 { parts[0] } else { "KRW" };
876
877 params.insert("order_currency".into(), base_currency.into());
878 params.insert("payment_currency".into(), quote_currency.into());
879
880 if let Some(limit) = limit {
881 params.insert("count".into(), limit.to_string().into());
882 }
883
884 let endpoint = "/public/transaction_history";
885
886 #[derive(serde::Deserialize)]
887 struct TransactionHistoryResponse {
888 status: String,
889 data: Vec<TransactionHistoryEntry>,
890 }
891
892 #[derive(serde::Deserialize)]
893 #[allow(dead_code)]
894 struct TransactionHistoryEntry {
895 transaction_date: String,
896 r#type: String, units_traded: String,
898 price: String,
899 total: String,
900 }
901
902 let mut url = format!("https://api.bithumb.com{endpoint}");
904 let query_string = params
905 .iter()
906 .map(|(k, v)| format!("{k}={v}"))
907 .collect::<SmallVec<[_; 8]>>()
908 .join("&");
909 if !query_string.is_empty() {
910 url.push('?');
911 url.push_str(&query_string);
912 }
913
914 let trades: TransactionHistoryResponse = get_request(client, &url).await?;
915
916 if trades.status != "0000" {
917 return Err(anyhow!("Failed to fetch trades: {}", trades.status));
918 }
919
920 let clock = Clock::new();
922 let result = trades
923 .data
924 .into_iter()
925 .filter_map(|t| {
926 let price = match t.price.parse() {
927 Ok(p) => p,
928 Err(_) => return None,
929 };
930
931 let quantity = match t.units_traded.parse() {
932 Ok(q) => q,
933 Err(_) => return None,
934 };
935
936 let direction = match t.r#type.as_str() {
937 "bid" => OrderSide::Buy,
938 "ask" => OrderSide::Sell,
939 _ => return None,
940 };
941
942 let exchange_time = match t.transaction_date.parse::<u64>() {
944 Ok(ts) => ts * 1_000_000, Err(_) => clock.raw(), };
947
948 Some(MarketTrade {
949 timestamp: clock.now(),
950 exchange_time_ns: exchange_time,
951 price,
952 quantity,
953 direction,
954 instrument_id: InstrumentId::new(symbol, Venue::Bithumb),
955 })
956 })
957 .collect();
958
959 Ok(result)
960 }
961
962 async fn get_orderbook_snapshot(
963 &self,
964 symbol: &str,
965 depth: Option<u32>,
966 ) -> Result<OrderBookSnapshot> {
967 let client = &self.http_client;
968 let mut params: FxHashMap<String, String> = FxHashMap::default();
969
970 let parts: SmallVec<[&str; 4]> = symbol.split('-').collect();
972 let base_currency = if parts.len() >= 2 { parts[1] } else { symbol };
973 let quote_currency = if parts.len() >= 2 { parts[0] } else { "KRW" };
974
975 params.insert("order_currency".into(), base_currency.into());
976 params.insert("payment_currency".into(), quote_currency.into());
977
978 if let Some(depth) = depth {
979 params.insert("count".into(), depth.to_string().into());
980 }
981
982 let endpoint = "/public/orderbook";
983
984 #[derive(serde::Deserialize)]
985 struct OrderbookRestResponse {
986 status: String,
987 data: OrderbookRestData,
988 }
989
990 #[derive(serde::Deserialize)]
991 #[allow(dead_code)]
992 struct OrderbookRestData {
993 timestamp: String,
994 order_currency: String,
995 payment_currency: String,
996 bids: Vec<OrderbookRestEntry>,
997 asks: Vec<OrderbookRestEntry>,
998 }
999
1000 #[derive(serde::Deserialize)]
1001 struct OrderbookRestEntry {
1002 quantity: String,
1003 price: String,
1004 }
1005
1006 let mut url = format!("https://api.bithumb.com{endpoint}");
1008 let query_string = params
1009 .iter()
1010 .map(|(k, v)| format!("{k}={v}"))
1011 .collect::<SmallVec<[_; 8]>>()
1012 .join("&");
1013 if !query_string.is_empty() {
1014 url.push('?');
1015 url.push_str(&query_string);
1016 }
1017
1018 let orderbook: OrderbookRestResponse = get_request(client, &url).await?;
1019
1020 if orderbook.status != "0000" {
1021 return Err(anyhow!("Failed to fetch orderbook: {}", orderbook.status));
1022 }
1023
1024 let mut bids: SmallVec<[PriceLevel; 64]> =
1026 SmallVec::with_capacity(orderbook.data.bids.len());
1027 let mut asks: SmallVec<[PriceLevel; 64]> =
1028 SmallVec::with_capacity(orderbook.data.asks.len());
1029
1030 for bid in &orderbook.data.bids {
1031 let price = match bid.price.parse() {
1032 Ok(p) => p,
1033 Err(_) => continue,
1034 };
1035
1036 let quantity = match bid.quantity.parse() {
1037 Ok(q) => q,
1038 Err(_) => continue,
1039 };
1040
1041 bids.push(PriceLevel::new(price, quantity));
1042 }
1043
1044 for ask in &orderbook.data.asks {
1045 let price = match ask.price.parse() {
1046 Ok(p) => p,
1047 Err(_) => continue,
1048 };
1049
1050 let quantity = match ask.quantity.parse() {
1051 Ok(q) => q,
1052 Err(_) => continue,
1053 };
1054
1055 asks.push(PriceLevel::new(price, quantity));
1056 }
1057
1058 bids.sort_by(|a, b| {
1060 b.price
1061 .partial_cmp(&a.price)
1062 .unwrap_or(std::cmp::Ordering::Equal)
1063 });
1064 asks.sort_by(|a, b| {
1065 a.price
1066 .partial_cmp(&b.price)
1067 .unwrap_or(std::cmp::Ordering::Equal)
1068 });
1069
1070 let timestamp_ms = match orderbook.data.timestamp.parse::<u64>() {
1071 Ok(t) => t,
1072 Err(_) => current_timestamp_ms(),
1073 };
1074
1075 let exchange_time = self.convert_exchange_timestamp(timestamp_ms);
1077
1078 let book = OrderBookSnapshot::new(
1079 rusty_model::instruments::InstrumentId::new(symbol, Venue::Bithumb),
1080 bids,
1081 asks,
1082 0, exchange_time,
1084 self.clock.raw(),
1085 );
1086
1087 Ok(book)
1088 }
1089
1090 async fn is_connected(&self) -> bool {
1091 *self.connection_status.read() == ConnectionState::Connected
1092 }
1093
1094 async fn connection_status(&self) -> ConnectionState {
1095 *self.connection_status.read()
1096 }
1097
1098 async fn get_stats(&self) -> ConnectionStats {
1099 self.stats.read().clone()
1100 }
1101
1102 async fn ping(&self) -> Result<u64> {
1103 let start = self.clock.raw();
1106
1107 let response = self
1108 .http_client
1109 .get("https://api.bithumb.com/public/ticker/ALL_KRW")
1110 .send()
1111 .await?;
1112
1113 if response.status().is_success() {
1114 Ok(self.clock.raw().saturating_sub(start))
1115 } else {
1116 Err(anyhow!("Ping failed with status: {}", response.status()))
1117 }
1118 }
1119
1120 async fn reset_connection(&self) -> Result<()> {
1121 let mut keys_to_remove = Vec::new();
1123
1124 for (key, _) in self.subscriptions.read().iter() {
1125 keys_to_remove.push(key.clone());
1126 }
1127
1128 for key in &keys_to_remove {
1130 if let Some(tx) = self.subscriptions.write().remove(key) {
1131 let _ = tx.send(true);
1132 }
1133
1134 if let Some(handle) = self.ws_handles.write().remove(key) {
1136 handle.abort();
1137 }
1138 }
1139
1140 *self.connection_status.write() = ConnectionState::Disconnected;
1142
1143 *self.stats.write() = ConnectionStats::default();
1145
1146 Ok(())
1147 }
1148
1149 fn get_rate_limits(&self) -> Vec<RateLimit> {
1150 BITHUMB_RATE_LIMITS.to_vec()
1151 }
1152}