rusty_feeder/exchange/bithumb/
provider.rs

1//! Bithumb exchange provider implementation
2//!
3//! High-performance provider for Bithumb markets with WebSocket streaming
4//! and REST API support. Optimized for HFT applications with nanosecond precision.
5
6use rusty_common::collections::FxHashMap;
7use std::any::Any;
8use std::str::FromStr;
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11
12use crate::common::rest_utils::{current_timestamp_ms, get_request};
13use crate::exchange::zerocopy_helpers::{deserialize_from_slice_borrowed, deserialize_from_vec};
14use crate::provider::prelude::*;
15use anyhow::{Result, anyhow};
16use async_trait::async_trait;
17use parking_lot::RwLock;
18use quanta::Clock;
19use rust_decimal::Decimal;
20use rusty_common::auth::exchanges::bithumb::BithumbAuth;
21use rusty_common::json;
22use rusty_common::websocket::{
23    Message, MessageHandler, WebSocketClient, WebSocketConfig, WebSocketError, WebSocketResult,
24};
25use rusty_model::{
26    data::{
27        book_snapshot::OrderBookSnapshot,
28        market_trade::MarketTrade,
29        orderbook::PriceLevel,
30        simd_orderbook::{SharedSimdOrderBook, SimdOrderBook},
31    },
32    enums::OrderSide,
33    instruments::{Instrument, InstrumentId},
34    venues::Venue,
35};
36use smallvec::SmallVec;
37use smartstring::alias::String;
38use tokio::sync::{mpsc, watch};
39use tokio::task::JoinHandle;
40
41use super::{
42    data::{
43        orderbook::{Orderbook, OrderbookMessage},
44        subscription::{BithumbSubscription, ResponseFormat, StatusMessage},
45        trade::TradeMessage,
46    },
47    types::{BITHUMB_PRIVATE_WS_URL, BITHUMB_PUBLIC_WS_URL, BITHUMB_RATE_LIMITS, TicketRequest},
48};
49
50/// Bithumb instrument implementation
51#[derive(Debug, Clone)]
52pub struct BithumbInstrument {
53    /// Trading pair symbol (e.g., "KRW-BTC")
54    pub symbol: String,
55    /// Base currency (e.g., "BTC")
56    pub base_currency: String,
57    /// Quote currency (e.g., "KRW")
58    pub quote_currency: String,
59    /// Minimum allowed price for orders
60    pub min_price: Decimal,
61    /// Maximum allowed price for orders
62    pub max_price: Decimal,
63    /// Minimum price increment (tick size)
64    pub price_tick: Decimal,
65    /// Minimum order quantity
66    pub min_quantity: Decimal,
67    /// Maximum order quantity
68    pub max_quantity: Decimal,
69}
70
71impl Instrument for BithumbInstrument {
72    fn id(&self) -> InstrumentId {
73        InstrumentId::new(self.symbol.clone(), Venue::Bithumb)
74    }
75
76    fn symbol(&self) -> String {
77        self.symbol.clone()
78    }
79
80    fn venue(&self) -> Venue {
81        Venue::Bithumb
82    }
83
84    fn as_any(&self) -> &dyn Any {
85        self
86    }
87
88    fn clone_box(&self) -> Box<dyn Instrument> {
89        Box::new(self.clone())
90    }
91}
92
93/// Thread-safe wrapper for BithumbInstrument
94#[derive(Debug, Clone)]
95struct ThreadSafeInstrument {
96    symbol: String,
97    base_currency: String,
98    quote_currency: String,
99    min_price: Decimal,
100    max_price: Decimal,
101    price_tick: Decimal,
102    min_quantity: Decimal,
103    max_quantity: Decimal,
104}
105
106impl ThreadSafeInstrument {
107    fn from_bithumb_instrument(instrument: &BithumbInstrument) -> Self {
108        Self {
109            symbol: instrument.symbol.clone(),
110            base_currency: instrument.base_currency.clone(),
111            quote_currency: instrument.quote_currency.clone(),
112            min_price: instrument.min_price,
113            max_price: instrument.max_price,
114            price_tick: instrument.price_tick,
115            min_quantity: instrument.min_quantity,
116            max_quantity: instrument.max_quantity,
117        }
118    }
119
120    fn to_bithumb_instrument(&self) -> BithumbInstrument {
121        BithumbInstrument {
122            symbol: self.symbol.clone(),
123            base_currency: self.base_currency.clone(),
124            quote_currency: self.quote_currency.clone(),
125            min_price: self.min_price,
126            max_price: self.max_price,
127            price_tick: self.price_tick,
128            min_quantity: self.min_quantity,
129            max_quantity: self.max_quantity,
130        }
131    }
132}
133
134// This is safe to share between threads
135unsafe impl Send for ThreadSafeInstrument {}
136unsafe impl Sync for ThreadSafeInstrument {}
137
138/// Bithumb market data provider
139#[derive(Debug)]
140pub struct BithumbProvider {
141    /// Connection configuration
142    config: ConnectionConfig,
143
144    /// WebSocket connection status
145    connection_status: Arc<RwLock<ConnectionState>>,
146
147    /// Connection statistics
148    stats: Arc<RwLock<ConnectionStats>>,
149
150    /// Shared clock for time synchronization
151    clock: Clock,
152
153    /// Active subscriptions
154    subscriptions: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
155
156    /// Cached instruments
157    instruments: Arc<RwLock<FxHashMap<String, ThreadSafeInstrument>>>,
158
159    /// Task handles for WebSocket connections
160    ws_handles: Arc<RwLock<FxHashMap<String, JoinHandle<()>>>>,
161
162    /// Last connection attempt timestamp
163    last_connection_attempt: Arc<RwLock<Instant>>,
164
165    /// HTTP client for REST API requests
166    http_client: reqwest::Client,
167
168    /// Authentication handler (optional, for private endpoints)
169    auth: Option<BithumbAuth>,
170}
171
172impl Default for BithumbProvider {
173    fn default() -> Self {
174        Self::new()
175    }
176}
177
178impl BithumbProvider {
179    /// Create a new Bithumb provider with default configuration
180    #[must_use]
181    pub fn new() -> Self {
182        Self::with_config(ConnectionConfig::default())
183    }
184
185    /// Create a new Bithumb provider with custom configuration
186    #[must_use]
187    pub fn with_config(config: ConnectionConfig) -> Self {
188        let clock = config.clock.clone();
189
190        // Create auth if credentials are provided
191        let auth =
192            if config.auth_config.api_key.is_some() && config.auth_config.secret_key.is_some() {
193                let auth_config = BithumbAuth::new(
194                    config.auth_config.api_key.clone().unwrap(),
195                    config.auth_config.secret_key.clone().unwrap(),
196                );
197                Some(auth_config)
198            } else {
199                None
200            };
201
202        // Create HTTP client with appropriate settings
203        let mut headers = reqwest::header::HeaderMap::new();
204        headers.insert(
205            "User-Agent",
206            reqwest::header::HeaderValue::from_str(&config.rest_config.user_agent)
207                .unwrap_or_else(|_| reqwest::header::HeaderValue::from_static("RustyHFT/1.0")),
208        );
209
210        let http_client = reqwest::Client::builder()
211            .timeout(Duration::from_millis(
212                config.rest_config.timeout_milliseconds,
213            ))
214            .connect_timeout(Duration::from_millis(
215                config.rest_config.timeout_milliseconds / 2,
216            ))
217            .pool_max_idle_per_host(config.rest_config.connection_pool_size)
218            .pool_idle_timeout(Duration::from_millis(
219                config.rest_config.keep_alive_milliseconds,
220            ))
221            .default_headers(headers)
222            .build()
223            .unwrap_or_default();
224
225        Self {
226            config,
227            connection_status: Arc::new(RwLock::new(ConnectionState::Disconnected)),
228            stats: Arc::new(RwLock::new(ConnectionStats::default())),
229            clock,
230            subscriptions: Arc::new(RwLock::new(FxHashMap::default())),
231            instruments: Arc::new(RwLock::new(FxHashMap::default())),
232            ws_handles: Arc::new(RwLock::new(FxHashMap::default())),
233            last_connection_attempt: Arc::new(RwLock::new(Instant::now())),
234            http_client,
235            auth,
236        }
237    }
238
239    /// Create WebSocket configuration
240    fn create_ws_config(&self, private: bool) -> WebSocketConfig {
241        let url = if private {
242            BITHUMB_PRIVATE_WS_URL.to_string()
243        } else {
244            BITHUMB_PUBLIC_WS_URL.to_string()
245        };
246
247        WebSocketConfig::builder(rusty_common::types::Exchange::Bithumb, url)
248            .ping_interval(Duration::from_millis(
249                self.config.websocket_config.ping_interval_milliseconds,
250            ))
251            .timeout(Duration::from_millis(
252                self.config.websocket_config.timeout_milliseconds,
253            ))
254            .compression(rusty_common::websocket::CompressionConfig::disabled())
255            .build()
256    }
257
258    /// Create a WebSocket connection for trade data
259    async fn create_trade_connection(
260        &self,
261        symbols: SmallVec<[String; 8]>,
262        tx: mpsc::Sender<TradeMessage>,
263    ) -> Result<JoinHandle<()>> {
264        let connection_id = format!("trade-{}", symbols.join(","));
265
266        // Create subscription messages using Bithumb format
267        let ticket = TicketRequest {
268            ticket: String::from(uuid::Uuid::new_v4().to_string()),
269        };
270        let subscription = BithumbSubscription::trade(symbols.to_vec());
271        let format = ResponseFormat::default();
272
273        // Create a batch subscription
274        let batch = simd_json::OwnedValue::Array(Box::new(vec![
275            simd_json::serde::to_owned_value(&ticket)?,
276            simd_json::serde::to_owned_value(&subscription)?,
277            simd_json::serde::to_owned_value(&format)?,
278        ]));
279
280        // Create stop signal
281        let (stop_tx, stop_rx) = watch::channel(false);
282        self.subscriptions
283            .write()
284            .insert(connection_id.clone().into(), stop_tx);
285
286        // Clone parameters for the task
287        let clock = self.clock.clone();
288        let stats = self.stats.clone();
289        let connection_status = self.connection_status.clone();
290        let ws_config = self.create_ws_config(false); // Public connection for trades
291
292        // Spawn a task for the WebSocket connection
293        let handle = tokio::spawn(async move {
294            loop {
295                // Check for stop signal
296                if *stop_rx.borrow() {
297                    break;
298                }
299
300                // Create message handler
301                let handler = BithumbMessageHandler::new_trade_handler(
302                    clock.clone(),
303                    stats.clone(),
304                    tx.clone(),
305                );
306
307                // Create WebSocket client
308                let mut client = WebSocketClient::new(ws_config.clone());
309
310                // Update connection status
311                *connection_status.write() = ConnectionState::Connecting;
312
313                // Run the WebSocket client
314                if let Err(e) = client.run(handler).await {
315                    log::error!("WebSocket client error: {e}");
316                    *connection_status.write() = ConnectionState::Error;
317                }
318
319                // Check for stop signal before reconnecting
320                if *stop_rx.borrow() {
321                    break;
322                }
323
324                // Reconnect after a delay
325                tokio::time::sleep(Duration::from_millis(1000)).await;
326            }
327        });
328
329        Ok(handle)
330    }
331
332    /// Create a WebSocket connection for orderbook data
333    async fn create_orderbook_connection(
334        &self,
335        symbols: SmallVec<[String; 8]>,
336        tx: mpsc::Sender<OrderbookMessage>,
337    ) -> Result<JoinHandle<()>> {
338        let connection_id = format!("orderbook:{}", symbols.join(","));
339
340        // Create subscription messages using Bithumb format
341        let ticket = TicketRequest {
342            ticket: String::from(uuid::Uuid::new_v4().to_string()),
343        };
344        let subscription = BithumbSubscription::orderbook(symbols.to_vec(), None);
345        let format = ResponseFormat::default();
346
347        // Create a batch subscription
348        let batch = simd_json::OwnedValue::Array(Box::new(vec![
349            simd_json::serde::to_owned_value(&ticket)?,
350            simd_json::serde::to_owned_value(&subscription)?,
351            simd_json::serde::to_owned_value(&format)?,
352        ]));
353
354        // Create stop signal
355        let (stop_tx, stop_rx) = watch::channel(false);
356        self.subscriptions
357            .write()
358            .insert(connection_id.clone().into(), stop_tx);
359
360        // Clone parameters for the task
361        let clock = self.clock.clone();
362        let stats = self.stats.clone();
363        let connection_status = self.connection_status.clone();
364        let ws_config = self.create_ws_config(false); // Public connection for orderbook
365
366        // Spawn a task for the WebSocket connection
367        let handle = tokio::spawn(async move {
368            loop {
369                // Check for stop signal
370                if *stop_rx.borrow() {
371                    break;
372                }
373
374                // Create message handler
375                let handler = BithumbMessageHandler::new_orderbook_handler(
376                    clock.clone(),
377                    stats.clone(),
378                    tx.clone(),
379                );
380
381                // Create WebSocket client
382                let mut client = WebSocketClient::new(ws_config.clone());
383
384                // Update connection status
385                *connection_status.write() = ConnectionState::Connecting;
386
387                // Run the WebSocket client
388                if let Err(e) = client.run(handler).await {
389                    log::error!("WebSocket client error: {e}");
390                    *connection_status.write() = ConnectionState::Error;
391                }
392
393                // Check for stop signal before reconnecting
394                if *stop_rx.borrow() {
395                    break;
396                }
397
398                // Reconnect after a delay
399                tokio::time::sleep(Duration::from_millis(1000)).await;
400            }
401        });
402
403        Ok(handle)
404    }
405
406    /// Convert exchange timestamp to nanosecond precision
407    #[inline]
408    const fn convert_exchange_timestamp(&self, timestamp_ms: u64) -> u64 {
409        timestamp_ms * 1_000_000 // Convert ms to ns
410    }
411}
412
413/// Message handler for Bithumb WebSocket
414struct BithumbMessageHandler {
415    clock: Clock,
416    stats: Arc<RwLock<ConnectionStats>>,
417    trade_tx: Option<mpsc::Sender<TradeMessage>>,
418    orderbook_tx: Option<mpsc::Sender<OrderbookMessage>>,
419}
420
421#[async_trait]
422impl MessageHandler for BithumbMessageHandler {
423    async fn on_connected(&mut self) -> WebSocketResult<()> {
424        log::info!("Bithumb WebSocket connected");
425        Ok(())
426    }
427
428    async fn on_message(&mut self, message: Message) -> WebSocketResult<()> {
429        match message {
430            Message::Text(text) => {
431                // Zero-copy parsing: parse directly from text bytes
432                match deserialize_from_slice_borrowed::<json::Value>(text.as_bytes()) {
433                    Ok(value) => {
434                        if let Err(e) = self.handle_market_data(&value).await {
435                            log::error!("Failed to handle market data: {e}");
436                        }
437                    }
438                    Err(e) => {
439                        log::error!("Failed to parse WebSocket message: {e}");
440                    }
441                }
442            }
443            Message::Binary(data) => {
444                // Zero-copy parsing: parse directly from binary data without string conversion
445                match deserialize_from_vec::<json::Value>(data) {
446                    Ok(value) => {
447                        if let Err(e) = self.handle_market_data(&value).await {
448                            log::error!("Failed to handle market data: {e}");
449                        }
450                    }
451                    Err(e) => {
452                        log::error!("Failed to parse binary WebSocket message: {e}");
453                    }
454                }
455            }
456            _ => {}
457        }
458        Ok(())
459    }
460
461    async fn on_error(&mut self, error: WebSocketError) -> WebSocketResult<()> {
462        log::error!("Bithumb WebSocket error: {}", error);
463        Ok(())
464    }
465
466    async fn on_disconnected(&mut self) -> WebSocketResult<()> {
467        log::info!("Bithumb WebSocket disconnected");
468        Ok(())
469    }
470}
471
472impl BithumbMessageHandler {
473    const fn new_trade_handler(
474        clock: Clock,
475        stats: Arc<RwLock<ConnectionStats>>,
476        tx: mpsc::Sender<TradeMessage>,
477    ) -> Self {
478        Self {
479            clock,
480            stats,
481            trade_tx: Some(tx),
482            orderbook_tx: None,
483        }
484    }
485
486    const fn new_orderbook_handler(
487        clock: Clock,
488        stats: Arc<RwLock<ConnectionStats>>,
489        tx: mpsc::Sender<OrderbookMessage>,
490    ) -> Self {
491        Self {
492            clock,
493            stats,
494            trade_tx: None,
495            orderbook_tx: Some(tx),
496        }
497    }
498
499    async fn handle_market_data(&self, value: &json::Value) -> Result<()> {
500        // Try to parse as different message types
501        // First check if it's a trade message
502        if let Ok(trade_msg) = json::from_value::<TradeMessage>(value.clone()) {
503            return self.handle_trade_message(trade_msg).await;
504        }
505
506        // Check if it's an orderbook message
507        if let Ok(orderbook_msg) = json::from_value::<OrderbookMessage>(value.clone()) {
508            return self.handle_orderbook_message(orderbook_msg).await;
509        }
510
511        // Check if it's a status message
512        if let Ok(_status) = json::from_value::<StatusMessage>(value.clone()) {
513            log::debug!("Received status message");
514            return Ok(());
515        }
516
517        log::debug!("Unhandled message type");
518        Ok(())
519    }
520
521    async fn handle_trade_message(&self, msg: TradeMessage) -> Result<()> {
522        // Update stats
523        self.stats.write().messages_received += 1;
524
525        // Forward to trade handler if available
526        if let Some(tx) = &self.trade_tx
527            && let Err(e) = tx.send(msg).await
528        {
529            log::error!("Failed to send trade message: {e}");
530        }
531
532        Ok(())
533    }
534
535    async fn handle_orderbook_message(&self, msg: OrderbookMessage) -> Result<()> {
536        // Update stats
537        self.stats.write().messages_received += 1;
538
539        // Forward to orderbook handler if available
540        if let Some(tx) = &self.orderbook_tx
541            && let Err(e) = tx.send(msg).await
542        {
543            log::error!("Failed to send orderbook message: {e}");
544        }
545
546        Ok(())
547    }
548}
549
550#[async_trait]
551impl Provider for BithumbProvider {
552    type TradeMessage = TradeMessage;
553    type DepthMessage = OrderbookMessage;
554    type InstrumentMessage = BithumbInstrument;
555
556    fn name(&self) -> &'static str {
557        "Bithumb"
558    }
559
560    fn venue(&self) -> rusty_model::venues::Venue {
561        rusty_model::venues::Venue::Bithumb
562    }
563
564    fn config(&self) -> &ConnectionConfig {
565        &self.config
566    }
567
568    async fn init(&mut self) -> Result<()> {
569        // Initialize the Bithumb provider
570        // Fetch initial instruments to populate cache
571        match self.get_instruments().await {
572            Ok(_) => Ok(()),
573            Err(e) => Err(anyhow!("Failed to initialize Bithumb provider: {}", e)),
574        }
575    }
576
577    async fn shutdown(&mut self) -> Result<()> {
578        // Unsubscribe from all active subscriptions
579        let mut keys_to_remove = Vec::new();
580
581        for (key, _) in self.subscriptions.read().iter() {
582            keys_to_remove.push(key.clone());
583        }
584
585        // Send stop signal for each subscription
586        for key in keys_to_remove {
587            if let Some(tx) = self.subscriptions.write().remove(&key) {
588                let _ = tx.send(true);
589            }
590
591            // Abort task
592            if let Some(handle) = self.ws_handles.write().remove(&key) {
593                handle.abort();
594            }
595        }
596
597        // Set connection status to disconnected
598        *self.connection_status.write() = ConnectionState::Disconnected;
599
600        Ok(())
601    }
602
603    async fn get_realtime_orderbook(&self, symbol: &str) -> Result<SharedSimdOrderBook> {
604        // Create a shared orderbook
605        let instrument_id = InstrumentId::new(symbol, Venue::Bithumb);
606        let ob = rusty_model::data::orderbook::OrderBook::<64>::new_empty(instrument_id);
607        let orderbook = SharedSimdOrderBook::from_orderbook(&ob);
608
609        let (_tx, mut rx) = mpsc::channel(100);
610        let mut symbol_list = SmallVec::<[String; 8]>::new();
611        symbol_list.push(symbol.into());
612        let _depth_rx = self.subscribe_orderbook(symbol_list).await?;
613
614        // Subscribe to the orderbook updates
615        let orderbook_clone = orderbook.clone();
616        let clock = self.clock.clone();
617
618        tokio::spawn(async move {
619            while let Some(msg) = rx.recv().await {
620                let now = clock.raw();
621                let ob = Orderbook::from_orderbook_message(&msg, now);
622
623                // Convert to OrderBook format and update
624                let bids_vec: SmallVec<[PriceLevel; 64]> = ob
625                    .bids
626                    .iter()
627                    .map(|level| PriceLevel::new(level.price, level.quantity))
628                    .collect();
629                let asks_vec: SmallVec<[PriceLevel; 64]> = ob
630                    .asks
631                    .iter()
632                    .map(|level| PriceLevel::new(level.price, level.quantity))
633                    .collect();
634
635                let orderbook = rusty_model::data::orderbook::OrderBook::new(
636                    ob.code.clone(),
637                    ob.timestamp_ns,
638                    now,
639                    bids_vec,
640                    asks_vec,
641                );
642                orderbook_clone.write(|book| {
643                    *book = SimdOrderBook::from_orderbook(&orderbook);
644                });
645            }
646        });
647
648        Ok(orderbook)
649    }
650
651    async fn subscribe_trades(
652        &self,
653        symbols: SmallVec<[String; 8]>,
654    ) -> Result<mpsc::Receiver<Self::TradeMessage>> {
655        // Create channel for trade messages
656        let (tx, rx) = mpsc::channel(1024);
657
658        // Create WebSocket connection
659        let handle = self.create_trade_connection(symbols.clone(), tx).await?;
660
661        // Save task handle
662        let connection_id = format!("trade-{}", symbols.join(","));
663        self.ws_handles.write().insert(connection_id.into(), handle);
664
665        Ok(rx)
666    }
667
668    async fn unsubscribe_trades(&self) -> Result<()> {
669        // Find all trade subscriptions
670        let mut keys_to_remove = Vec::new();
671
672        for (key, _) in self.subscriptions.read().iter() {
673            if key.starts_with("trade-") {
674                keys_to_remove.push(key.clone());
675            }
676        }
677
678        // Send stop signal for each subscription
679        for key in keys_to_remove {
680            if let Some(tx) = self.subscriptions.write().remove(&key) {
681                let _ = tx.send(true);
682            }
683
684            // Abort task
685            if let Some(handle) = self.ws_handles.write().remove(&key) {
686                handle.abort();
687            }
688        }
689
690        Ok(())
691    }
692
693    async fn subscribe_orderbook(
694        &self,
695        symbols: SmallVec<[String; 8]>,
696    ) -> Result<mpsc::Receiver<Self::DepthMessage>> {
697        // Create channel for orderbook messages
698        let (tx, rx) = mpsc::channel(1024);
699
700        // Create WebSocket connection
701        let handle = self
702            .create_orderbook_connection(symbols.clone(), tx)
703            .await?;
704
705        // Save task handle
706        let connection_id = format!("orderbook:{}", symbols.join(","));
707        self.ws_handles.write().insert(connection_id.into(), handle);
708
709        Ok(rx)
710    }
711
712    async fn unsubscribe_orderbook(&self) -> Result<()> {
713        // Find all orderbook subscriptions
714        let mut keys_to_remove = Vec::new();
715
716        for (key, _) in self.subscriptions.read().iter() {
717            if key.starts_with("orderbook:") {
718                keys_to_remove.push(key.clone());
719            }
720        }
721
722        // Send stop signal for each subscription
723        for key in keys_to_remove {
724            if let Some(tx) = self.subscriptions.write().remove(&key) {
725                let _ = tx.send(true);
726            }
727
728            // Abort task
729            if let Some(handle) = self.ws_handles.write().remove(&key) {
730                handle.abort();
731            }
732        }
733
734        Ok(())
735    }
736
737    async fn get_instruments(&self) -> Result<Vec<Box<dyn Instrument>>> {
738        // First, check if we have cached instruments
739        {
740            let instruments = self.instruments.read();
741            if !instruments.is_empty() {
742                // Convert the ThreadSafeInstrument to Box<dyn Instrument>
743                let boxed_instruments: Vec<Box<dyn Instrument>> = instruments
744                    .values()
745                    .map(|ts_instrument| {
746                        let bithumb_instrument = ts_instrument.to_bithumb_instrument();
747                        Box::new(bithumb_instrument) as Box<dyn Instrument>
748                    })
749                    .collect();
750                return Ok(boxed_instruments);
751            }
752        }
753
754        // If not, fetch from API
755        let client = &self.http_client;
756
757        #[derive(serde::Deserialize)]
758        struct TickerResponse {
759            status: String,
760            data: FxHashMap<String, TickerData>,
761        }
762
763        #[derive(serde::Deserialize)]
764        #[allow(dead_code)]
765        struct TickerData {
766            opening_price: String,
767            closing_price: String,
768            min_price: String,
769            max_price: String,
770            units_traded: String,
771            acc_trade_value: String,
772            prev_closing_price: String,
773            #[serde(rename = "units_traded_24H")]
774            units_traded_24h: String,
775            #[serde(rename = "acc_trade_value_24H")]
776            acc_trade_value_24h: String,
777            #[serde(rename = "fluctate_24H")]
778            fluctate_24h: String,
779            #[serde(rename = "fluctate_rate_24H")]
780            fluctate_rate_24h: String,
781        }
782
783        // Get all tickers
784        let endpoint = "/public/ticker/ALL_KRW";
785        let url = format!("https://api.bithumb.com{endpoint}");
786        let tickers: TickerResponse = get_request(client, &url).await?;
787
788        if tickers.status != "0000" {
789            return Err(anyhow!("Failed to fetch tickers: {}", tickers.status));
790        }
791
792        // Create a vector to collect our thread-safe instruments and Box<dyn Instrument> result
793        let mut instrument_boxes: SmallVec<[Box<dyn Instrument>; 128]> =
794            SmallVec::<[Box<dyn Instrument>; 128]>::new();
795        let mut thread_safe_instruments = SmallVec::<[(String, ThreadSafeInstrument); 128]>::new();
796
797        // Process the ticker data
798        for (symbol, ticker_data) in tickers.data.iter().filter(|(symbol, _)| *symbol != "date") {
799            // Try to extract min/max price from ticker data
800            let min_price = ticker_data
801                .min_price
802                .parse::<Decimal>()
803                .unwrap_or(Decimal::ZERO);
804            let max_price = ticker_data
805                .max_price
806                .parse::<Decimal>()
807                .unwrap_or(Decimal::ZERO);
808
809            // For price_tick, we'll use a reasonable default based on the price range
810            // This is an approximation as Bithumb doesn't directly provide tick size
811            let price_tick = if max_price > Decimal::ZERO {
812                // For higher priced assets, use larger tick size
813                if max_price > Decimal::from(1_000_000) {
814                    Decimal::from(100)
815                } else if max_price > Decimal::from(100_000) {
816                    Decimal::from(10)
817                } else if max_price > Decimal::from(10_000) {
818                    Decimal::from(1)
819                } else if max_price > Decimal::from(1_000) {
820                    Decimal::from_str("0.1").unwrap_or(Decimal::ONE)
821                } else if max_price > Decimal::from(100) {
822                    Decimal::from_str("0.01").unwrap_or(Decimal::ONE)
823                } else {
824                    Decimal::from_str("0.001").unwrap_or(Decimal::ONE)
825                }
826            } else {
827                Decimal::ONE
828            };
829
830            // For min/max quantity, we'll use reasonable defaults
831            // Bithumb typically has minimum order sizes but they're not in the API
832            let min_quantity = Decimal::from_str("0.0001").unwrap_or(Decimal::ZERO); // Common minimum for crypto
833            let max_quantity = Decimal::from(10000); // Large default
834
835            // Create a BithumbInstrument
836            let bithumb_instrument = BithumbInstrument {
837                symbol: format!("KRW-{symbol}").into(), // Format to match WebSocket format
838                base_currency: symbol.clone(),
839                quote_currency: "KRW".into(),
840                min_price,
841                max_price,
842                price_tick,
843                min_quantity,
844                max_quantity,
845            };
846
847            // Create a thread-safe version
848            let thread_safe = ThreadSafeInstrument::from_bithumb_instrument(&bithumb_instrument);
849
850            // Store both versions
851            thread_safe_instruments.push((bithumb_instrument.symbol.clone(), thread_safe));
852            instrument_boxes.push(Box::new(bithumb_instrument) as Box<dyn Instrument>);
853        }
854
855        // Cache the thread-safe instruments
856        let mut cache = self.instruments.write();
857        for (symbol, instrument) in thread_safe_instruments {
858            cache.insert(symbol, instrument);
859        }
860
861        Ok(instrument_boxes.into_vec())
862    }
863
864    async fn get_historical_trades(
865        &self,
866        symbol: &str,
867        limit: Option<u32>,
868    ) -> Result<Vec<MarketTrade>> {
869        let client = &self.http_client;
870        let mut params: FxHashMap<String, String> = FxHashMap::default();
871
872        // Parse symbol to get base currency (KRW-BTC -> BTC)
873        let parts: SmallVec<[&str; 4]> = symbol.split('-').collect();
874        let base_currency = if parts.len() >= 2 { parts[1] } else { symbol };
875        let quote_currency = if parts.len() >= 2 { parts[0] } else { "KRW" };
876
877        params.insert("order_currency".into(), base_currency.into());
878        params.insert("payment_currency".into(), quote_currency.into());
879
880        if let Some(limit) = limit {
881            params.insert("count".into(), limit.to_string().into());
882        }
883
884        let endpoint = "/public/transaction_history";
885
886        #[derive(serde::Deserialize)]
887        struct TransactionHistoryResponse {
888            status: String,
889            data: Vec<TransactionHistoryEntry>,
890        }
891
892        #[derive(serde::Deserialize)]
893        #[allow(dead_code)]
894        struct TransactionHistoryEntry {
895            transaction_date: String,
896            r#type: String, // "bid" for buy, "ask" for sell
897            units_traded: String,
898            price: String,
899            total: String,
900        }
901
902        // Build URL with query parameters
903        let mut url = format!("https://api.bithumb.com{endpoint}");
904        let query_string = params
905            .iter()
906            .map(|(k, v)| format!("{k}={v}"))
907            .collect::<SmallVec<[_; 8]>>()
908            .join("&");
909        if !query_string.is_empty() {
910            url.push('?');
911            url.push_str(&query_string);
912        }
913
914        let trades: TransactionHistoryResponse = get_request(client, &url).await?;
915
916        if trades.status != "0000" {
917            return Err(anyhow!("Failed to fetch trades: {}", trades.status));
918        }
919
920        // Convert to our Trade type
921        let clock = Clock::new();
922        let result = trades
923            .data
924            .into_iter()
925            .filter_map(|t| {
926                let price = match t.price.parse() {
927                    Ok(p) => p,
928                    Err(_) => return None,
929                };
930
931                let quantity = match t.units_traded.parse() {
932                    Ok(q) => q,
933                    Err(_) => return None,
934                };
935
936                let direction = match t.r#type.as_str() {
937                    "bid" => OrderSide::Buy,
938                    "ask" => OrderSide::Sell,
939                    _ => return None,
940                };
941
942                // Parse the transaction date to get timestamp
943                let exchange_time = match t.transaction_date.parse::<u64>() {
944                    Ok(ts) => ts * 1_000_000, // Convert from ms to ns
945                    Err(_) => clock.raw(),    // Fallback to current time
946                };
947
948                Some(MarketTrade {
949                    timestamp: clock.now(),
950                    exchange_time_ns: exchange_time,
951                    price,
952                    quantity,
953                    direction,
954                    instrument_id: InstrumentId::new(symbol, Venue::Bithumb),
955                })
956            })
957            .collect();
958
959        Ok(result)
960    }
961
962    async fn get_orderbook_snapshot(
963        &self,
964        symbol: &str,
965        depth: Option<u32>,
966    ) -> Result<OrderBookSnapshot> {
967        let client = &self.http_client;
968        let mut params: FxHashMap<String, String> = FxHashMap::default();
969
970        // Parse symbol to get base currency (KRW-BTC -> BTC)
971        let parts: SmallVec<[&str; 4]> = symbol.split('-').collect();
972        let base_currency = if parts.len() >= 2 { parts[1] } else { symbol };
973        let quote_currency = if parts.len() >= 2 { parts[0] } else { "KRW" };
974
975        params.insert("order_currency".into(), base_currency.into());
976        params.insert("payment_currency".into(), quote_currency.into());
977
978        if let Some(depth) = depth {
979            params.insert("count".into(), depth.to_string().into());
980        }
981
982        let endpoint = "/public/orderbook";
983
984        #[derive(serde::Deserialize)]
985        struct OrderbookRestResponse {
986            status: String,
987            data: OrderbookRestData,
988        }
989
990        #[derive(serde::Deserialize)]
991        #[allow(dead_code)]
992        struct OrderbookRestData {
993            timestamp: String,
994            order_currency: String,
995            payment_currency: String,
996            bids: Vec<OrderbookRestEntry>,
997            asks: Vec<OrderbookRestEntry>,
998        }
999
1000        #[derive(serde::Deserialize)]
1001        struct OrderbookRestEntry {
1002            quantity: String,
1003            price: String,
1004        }
1005
1006        // Build URL with query parameters
1007        let mut url = format!("https://api.bithumb.com{endpoint}");
1008        let query_string = params
1009            .iter()
1010            .map(|(k, v)| format!("{k}={v}"))
1011            .collect::<SmallVec<[_; 8]>>()
1012            .join("&");
1013        if !query_string.is_empty() {
1014            url.push('?');
1015            url.push_str(&query_string);
1016        }
1017
1018        let orderbook: OrderbookRestResponse = get_request(client, &url).await?;
1019
1020        if orderbook.status != "0000" {
1021            return Err(anyhow!("Failed to fetch orderbook: {}", orderbook.status));
1022        }
1023
1024        // Convert to our OrderBookSnapshot type
1025        let mut bids: SmallVec<[PriceLevel; 64]> =
1026            SmallVec::with_capacity(orderbook.data.bids.len());
1027        let mut asks: SmallVec<[PriceLevel; 64]> =
1028            SmallVec::with_capacity(orderbook.data.asks.len());
1029
1030        for bid in &orderbook.data.bids {
1031            let price = match bid.price.parse() {
1032                Ok(p) => p,
1033                Err(_) => continue,
1034            };
1035
1036            let quantity = match bid.quantity.parse() {
1037                Ok(q) => q,
1038                Err(_) => continue,
1039            };
1040
1041            bids.push(PriceLevel::new(price, quantity));
1042        }
1043
1044        for ask in &orderbook.data.asks {
1045            let price = match ask.price.parse() {
1046                Ok(p) => p,
1047                Err(_) => continue,
1048            };
1049
1050            let quantity = match ask.quantity.parse() {
1051                Ok(q) => q,
1052                Err(_) => continue,
1053            };
1054
1055            asks.push(PriceLevel::new(price, quantity));
1056        }
1057
1058        // Sort bids in descending order, asks in ascending order
1059        bids.sort_by(|a, b| {
1060            b.price
1061                .partial_cmp(&a.price)
1062                .unwrap_or(std::cmp::Ordering::Equal)
1063        });
1064        asks.sort_by(|a, b| {
1065            a.price
1066                .partial_cmp(&b.price)
1067                .unwrap_or(std::cmp::Ordering::Equal)
1068        });
1069
1070        let timestamp_ms = match orderbook.data.timestamp.parse::<u64>() {
1071            Ok(t) => t,
1072            Err(_) => current_timestamp_ms(),
1073        };
1074
1075        // Convert millisecond timestamp to nanoseconds for HFT precision
1076        let exchange_time = self.convert_exchange_timestamp(timestamp_ms);
1077
1078        let book = OrderBookSnapshot::new(
1079            rusty_model::instruments::InstrumentId::new(symbol, Venue::Bithumb),
1080            bids,
1081            asks,
1082            0, // No sequence number from Bithumb
1083            exchange_time,
1084            self.clock.raw(),
1085        );
1086
1087        Ok(book)
1088    }
1089
1090    async fn is_connected(&self) -> bool {
1091        *self.connection_status.read() == ConnectionState::Connected
1092    }
1093
1094    async fn connection_status(&self) -> ConnectionState {
1095        *self.connection_status.read()
1096    }
1097
1098    async fn get_stats(&self) -> ConnectionStats {
1099        self.stats.read().clone()
1100    }
1101
1102    async fn ping(&self) -> Result<u64> {
1103        // For WebSocketClient pattern, ping/pong is handled automatically
1104        // Return a simple network latency test via HTTP
1105        let start = self.clock.raw();
1106
1107        let response = self
1108            .http_client
1109            .get("https://api.bithumb.com/public/ticker/ALL_KRW")
1110            .send()
1111            .await?;
1112
1113        if response.status().is_success() {
1114            Ok(self.clock.raw().saturating_sub(start))
1115        } else {
1116            Err(anyhow!("Ping failed with status: {}", response.status()))
1117        }
1118    }
1119
1120    async fn reset_connection(&self) -> Result<()> {
1121        // First, unsubscribe all active subscriptions
1122        let mut keys_to_remove = Vec::new();
1123
1124        for (key, _) in self.subscriptions.read().iter() {
1125            keys_to_remove.push(key.clone());
1126        }
1127
1128        // Send stop signal for each subscription
1129        for key in &keys_to_remove {
1130            if let Some(tx) = self.subscriptions.write().remove(key) {
1131                let _ = tx.send(true);
1132            }
1133
1134            // Abort task
1135            if let Some(handle) = self.ws_handles.write().remove(key) {
1136                handle.abort();
1137            }
1138        }
1139
1140        // Set status to disconnected
1141        *self.connection_status.write() = ConnectionState::Disconnected;
1142
1143        // Reset stats
1144        *self.stats.write() = ConnectionStats::default();
1145
1146        Ok(())
1147    }
1148
1149    fn get_rate_limits(&self) -> Vec<RateLimit> {
1150        BITHUMB_RATE_LIMITS.to_vec()
1151    }
1152}