1use super::bithumb_config::BithumbConfig;
4use super::bithumb_errors::{BithumbError, BithumbResult, map_order_status, validate_symbol};
5use crate::execution_engine::{Exchange, ExecutionReport};
6use anyhow::{Result, anyhow};
7
8#[derive(Debug, Clone, Default)]
10pub struct HttpRequestMetrics {
11 pub total_requests: u64,
13 pub successful_requests: u64,
15 pub failed_requests: u64,
17 pub avg_response_time_ms: u64,
19 pub last_request_time: u64,
21 pub connection_uptime: u64,
23}
24
25impl HttpRequestMetrics {
26 #[must_use]
28 pub fn new() -> Self {
29 Self::default()
30 }
31}
32use async_trait::async_trait;
33use flume::Sender;
34use log::{debug, error, info, warn};
35use parking_lot::RwLock;
36use quanta::Clock;
37use reqwest::Client;
38use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
39use rust_decimal::Decimal;
40use rust_decimal::prelude::ToPrimitive;
41use rusty_common::SmartString;
42use rusty_common::auth::exchanges::bithumb::BithumbAuth;
43use rusty_common::collections::FxHashMap;
44use rusty_common::utils::id_generation;
45use rusty_model::{
46 OrderSide, OrderStatus, OrderType, instruments::InstrumentId, trading_order::Order,
47 venues::Venue,
48};
49use serde::{Deserialize, Serialize};
50use simd_json::prelude::ValueAsObject;
51use simd_json::value::owned::Value as JsonValue;
52use smallvec::SmallVec;
53use std::str::FromStr;
54use std::sync::Arc;
55use tokio::time::timeout;
56
57type OrderCacheEntry = (String, String, OrderSide);
59
60#[derive(Debug, Serialize, Deserialize)]
61struct BithumbOrderResponse {
62 status: String,
63 message: Option<String>,
64 data: Option<BithumbOrderData>,
65}
66
67#[derive(Debug, Serialize, Deserialize)]
68struct BithumbOrderData {
69 _order_id: String,
70 #[serde(default)]
71 units: String,
72 #[serde(default)]
73 price: String,
74 #[serde(default)]
75 order_status: String,
76}
77
78#[derive(Debug, Clone)]
80pub struct BithumbExchange {
81 auth: Arc<BithumbAuth>,
83
84 client: Arc<Client>,
86
87 config: BithumbConfig,
89
90 connected: Arc<RwLock<bool>>,
92
93 clock: Clock,
95
96 instruments_cache: Arc<RwLock<SmallVec<[InstrumentId; 32]>>>,
98
99 order_cache: Arc<RwLock<FxHashMap<String, OrderCacheEntry>>>, http_metrics: Arc<RwLock<HttpRequestMetrics>>,
104}
105
106impl BithumbExchange {
107 fn build_param(params: &[(&str, &str)]) -> BithumbResult<String> {
109 BithumbAuth::build_param_string_optimized(params)
110 .map(|s| s.to_string())
111 .map_err(|e| BithumbError::internal(format!("Parameter building failed: {e}")))
112 }
113
114 #[must_use]
116 pub fn get_health_metrics(&self) -> (bool, HttpRequestMetrics) {
117 let connected = *self.connected.read();
118 let metrics = self.http_metrics.read().clone();
119 (connected, metrics)
120 }
121
122 #[must_use]
124 pub const fn get_config(&self) -> &BithumbConfig {
125 &self.config
126 }
127
128 #[must_use]
130 pub fn supports_symbol(&self, symbol: &str) -> bool {
131 self.config.is_symbol_supported(symbol)
132 }
133
134 #[must_use]
136 pub fn get_min_order_amount(&self, symbol: &str) -> Option<u64> {
137 self.config.get_min_order_amount(symbol)
138 }
139
140 #[must_use]
147 pub fn new(api_key: String, secret_key: String) -> Self {
148 Self::try_new(api_key, secret_key).expect("Failed to create Bithumb exchange instance")
149 }
150
151 pub fn try_new(api_key: String, secret_key: String) -> BithumbResult<Self> {
155 Self::with_config(api_key, secret_key, BithumbConfig::production())
156 }
157
158 pub fn with_config(
160 api_key: String,
161 secret_key: String,
162 config: BithumbConfig,
163 ) -> BithumbResult<Self> {
164 config.validate().map_err(BithumbError::configuration)?;
166
167 let client = Client::builder()
168 .timeout(config.get_http_timeout())
169 .user_agent(config.user_agent.as_str())
170 .pool_max_idle_per_host(config.connection_pool_size)
171 .pool_idle_timeout(Some(config.keep_alive_timeout))
172 .build()
173 .map_err(|e| BithumbError::connection(format!("Failed to build HTTP client: {e}")))?;
174
175 let auth = Arc::new(BithumbAuth::new(api_key.into(), secret_key.into()));
176
177 Ok(Self {
178 auth,
179 client: Arc::new(client),
180 config,
181 connected: Arc::new(RwLock::new(false)),
182 clock: Clock::new(),
183 instruments_cache: Arc::new(RwLock::new(SmallVec::new())),
184 order_cache: Arc::new(RwLock::new(FxHashMap::default())),
185 http_metrics: Arc::new(RwLock::new(HttpRequestMetrics::new())),
186 })
187 }
188
189 pub fn high_frequency(api_key: String, secret_key: String) -> BithumbResult<Self> {
191 Self::with_config(api_key, secret_key, BithumbConfig::high_frequency())
192 }
193
194 const fn map_order_type(order_type: OrderType) -> &'static str {
196 match order_type {
197 OrderType::Limit => "limit",
198 OrderType::Market => "market",
199 _ => "limit", }
201 }
202
203 const fn map_order_side(side: OrderSide) -> &'static str {
205 match side {
206 OrderSide::Buy => "bid",
207 OrderSide::Sell => "ask",
208 }
209 }
210
211 fn parse_symbol(symbol: &str) -> BithumbResult<(SmartString, SmartString)> {
213 validate_symbol(symbol)
214 }
215
216 fn validate_order(&self, order: &Order) -> BithumbResult<()> {
218 let (base, quote) = Self::parse_symbol(&order.symbol)?;
220
221 if !self.config.is_symbol_supported(&order.symbol) {
223 return Err(BithumbError::invalid_symbol(format!(
224 "Unsupported symbol: {}",
225 order.symbol
226 )));
227 }
228
229 if let Some(min_amount) = self.config.get_min_order_amount(&order.symbol) {
231 let order_value = if let Some(price) = order.price {
232 order.quantity * price
233 } else {
234 return Ok(());
237 };
238
239 if order_value.to_u64().unwrap_or(0) < min_amount {
240 return Err(BithumbError::order_operation(
241 "place_order",
242 format!(
243 "Order value {} KRW is below minimum {} KRW for {}",
244 order_value, min_amount, order.symbol
245 ),
246 ));
247 }
248 }
249
250 if order.quantity <= Decimal::ZERO {
252 return Err(BithumbError::order_operation(
253 "place_order",
254 "Order quantity must be positive",
255 ));
256 }
257
258 if order.order_type == OrderType::Limit && order.price.is_none() {
260 return Err(BithumbError::order_operation(
261 "place_order",
262 "Limit orders must have a price",
263 ));
264 }
265
266 Ok(())
267 }
268
269 fn update_metrics(&self, operation: &str, success: bool, duration_ms: u64) {
271 if let Some(mut metrics) = self.http_metrics.try_write() {
272 if success {
273 metrics.total_requests += 1;
274 metrics.successful_requests += 1;
275 } else {
276 metrics.total_requests += 1;
277 metrics.failed_requests += 1;
278 }
279
280 if metrics.total_requests == 1 {
282 metrics.avg_response_time_ms = duration_ms;
283 } else {
284 metrics.avg_response_time_ms =
285 (metrics.avg_response_time_ms * (metrics.total_requests - 1) + duration_ms)
286 / metrics.total_requests;
287 }
288
289 if self.config.enable_logging {
290 debug!(
291 "Updated metrics for {operation}: success={success}, duration={duration_ms}ms"
292 );
293 }
294 }
295 }
296
297 fn parse_headers(
298 &self,
299 endpoint: &str,
300 params: &Vec<(&str, &str)>,
301 ) -> BithumbResult<(SmartString, String, HeaderMap)> {
302 let headers = self
303 .auth
304 .generate_headers("POST", endpoint, Some(params.as_slice()))
305 .map_err(|e| BithumbError::auth(format!("Header generation failed: {e}")))?;
306
307 let url = format!("{}{}", self.config.get_api_url()?, endpoint);
308 let body = Self::build_param(params)
309 .map_err(|e| BithumbError::internal(format!("Parameter building failed: {e}")))?;
310
311 let mut header_map = HeaderMap::new();
312
313 for (key, value) in &headers {
314 match (HeaderName::from_str(key), HeaderValue::from_str(value)) {
315 (Ok(header_name), Ok(header_value)) => {
316 header_map.insert(header_name, header_value);
317 }
318 (Err(e), _) => {
319 error!("Failed to parse header name '{key}': {e}");
320 return Err(BithumbError::internal(format!(
321 "Invalid header name: {key}"
322 )));
323 }
324 (_, Err(e)) => {
325 error!("Failed to parse header value '{value}': {e}");
326 return Err(BithumbError::internal(format!(
327 "Invalid header value: {value}"
328 )));
329 }
330 }
331 }
332 Ok((url.into(), body, header_map))
333 }
334}
335
336#[async_trait]
337impl Exchange for BithumbExchange {
338 fn venue(&self) -> Venue {
339 Venue::Bithumb
340 }
341
342 async fn place_order(&self, order: Order, report_tx: Sender<ExecutionReport>) -> Result<()> {
343 let start_time = self.clock.raw();
344
345 if !*self.connected.read() {
347 let err = BithumbError::connection("Not connected to Bithumb");
348 self.update_metrics("place_order", false, 0);
349 return Err(anyhow!(err));
350 }
351
352 if let Err(e) = self.validate_order(&order) {
354 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
355 self.update_metrics("place_order", false, duration_ms);
356
357 let report = ExecutionReport {
359 id: id_generation::generate_rejection_id(&order.id.to_string()),
360 order_id: order.id.to_string().into(),
361 exchange_timestamp: 0,
362 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
363 instrument_id: InstrumentId::new(order.symbol.clone(), order.venue),
364 status: OrderStatus::Rejected,
365 filled_quantity: Decimal::ZERO,
366 remaining_quantity: order.quantity,
367 execution_price: None,
368 reject_reason: Some(e.to_string().into()),
369 exchange_execution_id: None,
370 is_final: true,
371 };
372
373 if let Err(send_err) = report_tx.send_async(report).await {
374 error!("Failed to send rejection report: {send_err}");
375 }
376
377 return Err(e.into());
378 }
379
380 let endpoint = "/trade/place";
381
382 let (base_currency, quote_currency) = match Self::parse_symbol(&order.symbol) {
384 Ok(currencies) => currencies,
385 Err(e) => {
386 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
387 self.update_metrics("place_order", false, duration_ms);
388 return Err(e.into());
389 }
390 };
391
392 let units_str: String = order.quantity.to_string();
396 let price_str: String = order
397 .price
398 .map_or_else(|| "0".to_string(), |p| p.to_string());
399
400 let params = vec![
401 ("order_currency", base_currency.as_str()),
402 ("payment_currency", quote_currency.as_str()),
403 ("units", units_str.as_str()),
404 ("price", price_str.as_str()),
405 ("type", Self::map_order_side(order.side)),
406 ("order_type", Self::map_order_type(order.order_type)),
407 ];
408
409 let (url, body, header_map) = match self.parse_headers(endpoint, ¶ms) {
411 Ok(result) => result,
412 Err(e) => {
413 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
414 self.update_metrics("place_order", false, duration_ms);
415 return Err(e.into());
416 }
417 };
418
419 let response = match timeout(
420 self.config.get_http_timeout(),
421 self.client
422 .post(url.as_str())
423 .headers(header_map)
424 .body(body)
425 .send(),
426 )
427 .await
428 {
429 Ok(Ok(response)) => response,
430 Ok(Err(e)) => {
431 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
432 self.update_metrics("place_order", false, duration_ms);
433 let err = BithumbError::connection(format!("HTTP request failed: {e}"));
434 return Err(anyhow!(err));
435 }
436 Err(_) => {
437 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
438 self.update_metrics("place_order", false, duration_ms);
439 let err = BithumbError::timeout(self.config.get_http_timeout().as_millis() as u64);
440 return Err(anyhow!(err));
441 }
442 };
443
444 let timestamp = BithumbAuth::generate_timestamp();
445 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
446 let status = response.status();
447
448 if status.is_success() {
449 let order_response: BithumbOrderResponse = match response.json().await {
450 Ok(resp) => resp,
451 Err(e) => {
452 self.update_metrics("place_order", false, duration_ms);
453 let err = BithumbError::json_parsing(format!("Failed to parse response: {e}"));
454 return Err(anyhow!(err));
455 }
456 };
457
458 if order_response.status == "0000" {
459 if let Some(data) = order_response.data {
460 self.order_cache.write().insert(
462 order.id.to_string(),
463 (
464 base_currency.to_string(),
465 quote_currency.to_string(),
466 order.side,
467 ),
468 );
469
470 self.update_metrics("place_order", true, duration_ms);
471
472 let report = ExecutionReport {
473 id: id_generation::generate_ack_id(&order.id.to_string()),
474 order_id: order.id.to_string().into(),
475 exchange_timestamp: timestamp * 1_000_000, system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
477 instrument_id: InstrumentId::new(order.symbol.clone(), order.venue),
478 status: OrderStatus::New,
479 filled_quantity: Decimal::ZERO,
480 remaining_quantity: order.quantity,
481 execution_price: None,
482 reject_reason: None,
483 exchange_execution_id: Some(data._order_id.into()),
484 is_final: false,
485 };
486
487 if let Err(e) = report_tx.send_async(report).await {
488 error!("Failed to send acknowledgment report: {e}");
489 }
490 return Ok(());
491 }
492 } else {
493 self.update_metrics("place_order", false, duration_ms);
494
495 let error_msg = order_response
496 .message
497 .unwrap_or_else(|| "Unknown error".into());
498
499 if self.config.enable_logging {
500 error!("Bithumb order placement failed: {error_msg}");
501 }
502
503 let report = ExecutionReport {
504 id: id_generation::generate_rejection_id(&order.id.to_string()),
505 order_id: order.id.to_string().into(),
506 exchange_timestamp: 0,
507 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
508 instrument_id: InstrumentId::new(order.symbol.clone(), order.venue),
509 status: OrderStatus::Rejected,
510 filled_quantity: Decimal::ZERO,
511 remaining_quantity: order.quantity,
512 execution_price: None,
513 reject_reason: Some(error_msg.clone().into()),
514 exchange_execution_id: None,
515 is_final: true,
516 };
517
518 if let Err(e) = report_tx.send_async(report).await {
519 error!("Failed to send rejection report: {e}");
520 }
521
522 let err = BithumbError::api(order_response.status, error_msg);
523 return Err(anyhow!(err));
524 }
525 }
526
527 self.update_metrics("place_order", false, duration_ms);
528 let error_msg = format!("HTTP request failed with status: {status}");
529
530 let report = ExecutionReport {
531 id: id_generation::generate_rejection_id(&order.id.to_string()),
532 order_id: order.id.to_string().into(),
533 exchange_timestamp: 0,
534 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
535 instrument_id: InstrumentId::new(order.symbol.clone(), order.venue),
536 status: OrderStatus::Rejected,
537 filled_quantity: Decimal::ZERO,
538 remaining_quantity: order.quantity,
539 execution_price: None,
540 reject_reason: Some(error_msg.clone().into()),
541 exchange_execution_id: None,
542 is_final: true,
543 };
544
545 if let Err(e) = report_tx.send_async(report).await {
546 error!("Failed to send rejection report: {e}");
547 }
548
549 let err = BithumbError::connection(error_msg);
550 Err(anyhow!(err))
551 }
552
553 async fn cancel_order(
554 &self,
555 order_id: SmartString,
556 report_tx: Sender<ExecutionReport>,
557 ) -> Result<()> {
558 let start_time = self.clock.raw();
559
560 if !*self.connected.read() {
562 let err = BithumbError::connection("Not connected to Bithumb");
563 self.update_metrics("cancel_order", false, 0);
564 return Err(anyhow!(err));
565 }
566
567 let endpoint = "/trade/cancel";
568
569 let (base_currency, quote_currency, order_side) = {
571 let cache = self.order_cache.read();
572 if let Some(cached_data) = cache.get(order_id.as_str()).cloned() {
573 cached_data
574 } else {
575 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
576 self.update_metrics("cancel_order", false, duration_ms);
577 let err = BithumbError::order_operation(
578 "cancel_order",
579 format!("Order {order_id} not found in cache"),
580 );
581 return Err(anyhow!(err));
582 }
583 };
584
585 let order_type = Self::map_order_side(order_side);
586
587 let params = vec![
588 ("type", order_type),
589 ("order_id", &order_id),
590 ("order_currency", base_currency.as_str()),
591 ("payment_currency", quote_currency.as_str()),
592 ];
593
594 let (url, body, header_map) = match self.parse_headers(endpoint, ¶ms) {
596 Ok(result) => result,
597 Err(e) => {
598 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
599 self.update_metrics("cancel_order", false, duration_ms);
600 return Err(e.into());
601 }
602 };
603
604 let response = match timeout(
605 self.config.get_http_timeout(),
606 self.client
607 .post(url.as_str())
608 .headers(header_map)
609 .body(body)
610 .send(),
611 )
612 .await
613 {
614 Ok(Ok(response)) => response,
615 Ok(Err(e)) => {
616 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
617 self.update_metrics("cancel_order", false, duration_ms);
618 let err = BithumbError::connection(format!("HTTP request failed: {e}"));
619 return Err(anyhow!(err));
620 }
621 Err(_) => {
622 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
623 self.update_metrics("cancel_order", false, duration_ms);
624 let err = BithumbError::timeout(self.config.get_http_timeout().as_millis() as u64);
625 return Err(anyhow!(err));
626 }
627 };
628
629 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
630 let status = response.status();
631
632 if status.is_success() {
633 let cancel_response: BithumbOrderResponse = match response.json().await {
634 Ok(resp) => resp,
635 Err(e) => {
636 self.update_metrics("cancel_order", false, duration_ms);
637 let err = BithumbError::json_parsing(format!("Failed to parse response: {e}"));
638 return Err(anyhow!(err));
639 }
640 };
641
642 if cancel_response.status == "0000" {
643 self.update_metrics("cancel_order", true, duration_ms);
644
645 let instrument = InstrumentId {
647 symbol: format!("{base_currency}_{quote_currency}").into(),
648 venue: Venue::Bithumb,
649 };
650
651 let timestamp = BithumbAuth::generate_timestamp();
652
653 let report = ExecutionReport {
654 id: id_generation::generate_cancel_id(&order_id),
655 order_id: order_id.clone(),
656 exchange_timestamp: timestamp * 1_000_000, system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
658 instrument_id: instrument,
659 status: OrderStatus::Cancelled,
660 filled_quantity: Decimal::ZERO,
661 remaining_quantity: Decimal::ZERO, execution_price: None,
663 reject_reason: None,
664 exchange_execution_id: None,
665 is_final: true,
666 };
667
668 if let Err(e) = report_tx.send_async(report).await {
669 error!("Failed to send cancellation report: {e}");
670 }
671 return Ok(());
672 } else {
673 self.update_metrics("cancel_order", false, duration_ms);
674
675 let error_msg = cancel_response
676 .message
677 .unwrap_or_else(|| "Unknown error".into());
678
679 if self.config.enable_logging {
680 error!("Bithumb order cancellation failed: {error_msg}");
681 }
682
683 let err = BithumbError::api(cancel_response.status, error_msg);
684 return Err(anyhow!(err));
685 }
686 }
687
688 self.update_metrics("cancel_order", false, duration_ms);
689 let err = BithumbError::connection(format!("HTTP request failed with status: {status}"));
690 Err(anyhow!(err))
691 }
692
693 async fn modify_order(
694 &self,
695 _order_id: SmartString,
696 _new_price: Option<Decimal>,
697 _new_quantity: Option<Decimal>,
698 _report_tx: Sender<ExecutionReport>,
699 ) -> Result<()> {
700 self.update_metrics("modify_order", false, 0);
703
704 let err = BithumbError::order_operation(
705 "modify_order",
706 "Order modification not supported for Bithumb. Cancel and place a new order instead.",
707 );
708 Err(anyhow!(err))
709 }
710
711 async fn cancel_all_orders(
712 &self,
713 _instrument_id: Option<InstrumentId>,
714 _report_tx: Sender<ExecutionReport>,
715 ) -> Result<()> {
716 self.update_metrics("cancel_all_orders", false, 0);
723
724 let err = BithumbError::order_operation(
725 "cancel_all_orders",
726 "Cancel all orders not implemented for Bithumb. Use individual order cancellation.",
727 );
728 Err(anyhow!(err))
729 }
730
731 async fn get_order_status(&self, order_id: &str) -> Result<OrderStatus> {
732 let start_time = self.clock.raw();
733
734 if !*self.connected.read() {
736 let err = BithumbError::connection("Not connected to Bithumb");
737 self.update_metrics("get_order_status", false, 0);
738 return Err(anyhow!(err));
739 }
740
741 let endpoint = "/info/order_detail";
742
743 let (base_currency, quote_currency, order_side) = {
745 let cache = self.order_cache.read();
746 if let Some(cached_data) = cache.get(order_id).cloned() {
747 cached_data
748 } else {
749 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
750 self.update_metrics("get_order_status", false, duration_ms);
751 let err = BithumbError::order_operation(
752 "get_order_status",
753 format!("Order {order_id} not found in cache"),
754 );
755 return Err(anyhow!(err));
756 }
757 };
758
759 let order_type = Self::map_order_side(order_side);
760
761 let params = vec![
762 ("order_id", order_id),
763 ("type", order_type),
764 ("order_currency", base_currency.as_str()),
765 ("payment_currency", quote_currency.as_str()),
766 ];
767
768 let (url, body, header_map) = match self.parse_headers(endpoint, ¶ms) {
770 Ok(result) => result,
771 Err(e) => {
772 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
773 self.update_metrics("get_order_status", false, duration_ms);
774 return Err(e.into());
775 }
776 };
777
778 let response = match timeout(
779 self.config.get_http_timeout(),
780 self.client
781 .post(url.as_str())
782 .headers(header_map)
783 .body(body)
784 .send(),
785 )
786 .await
787 {
788 Ok(Ok(response)) => response,
789 Ok(Err(e)) => {
790 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
791 self.update_metrics("get_order_status", false, duration_ms);
792 let err = BithumbError::connection(format!("HTTP request failed: {e}"));
793 return Err(anyhow!(err));
794 }
795 Err(_) => {
796 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
797 self.update_metrics("get_order_status", false, duration_ms);
798 let err = BithumbError::timeout(self.config.get_http_timeout().as_millis() as u64);
799 return Err(anyhow!(err));
800 }
801 };
802
803 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
804 let status = response.status();
805
806 if status.is_success() {
807 let order_response: BithumbOrderResponse = match response.json().await {
808 Ok(resp) => resp,
809 Err(e) => {
810 self.update_metrics("get_order_status", false, duration_ms);
811 let err = BithumbError::json_parsing(format!("Failed to parse response: {e}"));
812 return Err(anyhow!(err));
813 }
814 };
815
816 if order_response.status == "0000" {
817 if let Some(data) = order_response.data {
818 self.update_metrics("get_order_status", true, duration_ms);
819
820 return match map_order_status(&data.order_status) {
821 Ok(status) => Ok(status),
822 Err(e) => {
823 self.update_metrics("get_order_status", false, duration_ms);
824 Err(e.into())
825 }
826 };
827 }
828 } else {
829 self.update_metrics("get_order_status", false, duration_ms);
830 let error_msg = order_response
831 .message
832 .unwrap_or_else(|| "Unknown error".into());
833 let err = BithumbError::api(order_response.status, error_msg);
834 return Err(anyhow!(err));
835 }
836 }
837
838 self.update_metrics("get_order_status", false, duration_ms);
839 let err = BithumbError::connection(format!("HTTP request failed with status: {status}"));
840 Err(anyhow!(err))
841 }
842
843 async fn connect(&self, _report_sender: Sender<ExecutionReport>) -> Result<()> {
844 let start_time = self.clock.raw();
845
846 if self.config.enable_logging {
847 info!("Connecting to Bithumb exchange...");
848 }
849
850 let test_url = format!("{}/public/ticker/BTC_KRW", self.config.get_api_url()?);
852
853 let response = match timeout(
854 self.config.get_http_timeout(),
855 self.client.get(&test_url).send(),
856 )
857 .await
858 {
859 Ok(Ok(response)) => response,
860 Ok(Err(e)) => {
861 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
862 self.update_metrics("connect", false, duration_ms);
863 let err = BithumbError::connection(format!("Connection test failed: {e}"));
864 return Err(anyhow!(err));
865 }
866 Err(_) => {
867 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
868 self.update_metrics("connect", false, duration_ms);
869 let err = BithumbError::timeout(self.config.get_http_timeout().as_millis() as u64);
870 return Err(anyhow!(err));
871 }
872 };
873
874 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
875 let status = response.status();
876
877 if status.is_success() {
878 *self.connected.write() = true;
879 self.update_metrics("connect", true, duration_ms);
880
881 if self.config.enable_logging {
882 info!("Successfully connected to Bithumb exchange");
883 }
884 Ok(())
885 } else {
886 self.update_metrics("connect", false, duration_ms);
887 let err =
888 BithumbError::connection(format!("Connection test failed with status: {status}"));
889 Err(anyhow!(err))
890 }
891 }
892
893 async fn disconnect(&self) -> Result<()> {
894 if self.config.enable_logging {
895 info!("Disconnecting from Bithumb exchange...");
896 }
897
898 *self.connected.write() = false;
899
900 self.instruments_cache.write().clear();
902 self.order_cache.write().clear();
903
904 if self.config.enable_logging {
905 info!("Successfully disconnected from Bithumb exchange");
906 }
907
908 Ok(())
909 }
910
911 async fn is_connected(&self) -> bool {
912 *self.connected.read()
913 }
914
915 async fn get_instruments(&self) -> Result<SmallVec<[InstrumentId; 32]>> {
916 let start_time = self.clock.raw();
917
918 {
920 let cache = self.instruments_cache.read();
921 if !cache.is_empty() {
922 if self.config.enable_logging {
923 debug!("Returning cached instruments (count: {})", cache.len());
924 }
925 return Ok(cache.clone());
926 }
927 }
928
929 if self.config.enable_logging {
930 info!("Fetching instruments from Bithumb API...");
931 }
932
933 let url = format!("{}/public/ticker/ALL_KRW", self.config.get_api_url()?);
934
935 let response = match timeout(self.config.get_http_timeout(), self.client.get(&url).send())
936 .await
937 {
938 Ok(Ok(response)) => response,
939 Ok(Err(e)) => {
940 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
941 self.update_metrics("get_instruments", false, duration_ms);
942 let err = BithumbError::connection(format!("HTTP request failed: {e}"));
943 return Err(anyhow!(err));
944 }
945 Err(_) => {
946 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
947 self.update_metrics("get_instruments", false, duration_ms);
948 let err = BithumbError::timeout(self.config.get_http_timeout().as_millis() as u64);
949 return Err(anyhow!(err));
950 }
951 };
952
953 let duration_ms = (self.clock.raw() - start_time) / 1_000_000;
954 let status = response.status();
955
956 if status.is_success() {
957 let json: JsonValue = match response.json().await {
958 Ok(json) => json,
959 Err(e) => {
960 self.update_metrics("get_instruments", false, duration_ms);
961 let err = BithumbError::json_parsing(format!("Failed to parse response: {e}"));
962 return Err(anyhow!(err));
963 }
964 };
965
966 let mut instruments = SmallVec::new();
967
968 if let Some(data) = json.as_object() {
969 for (key, value) in data {
970 if key == "status" || key == "date" {
972 continue;
973 }
974
975 if value.as_object().is_some() {
977 let symbol = format!("{key}_KRW");
978
979 if validate_symbol(&symbol).is_ok() {
981 let instrument_id = InstrumentId {
982 symbol: symbol.into(),
983 venue: Venue::Bithumb,
984 };
985 instruments.push(instrument_id);
986 } else if self.config.enable_logging {
987 warn!("Skipping invalid symbol format: {symbol}");
988 }
989 }
990 }
991
992 *self.instruments_cache.write() = instruments.clone();
994 self.update_metrics("get_instruments", true, duration_ms);
995
996 if self.config.enable_logging {
997 info!(
998 "Successfully fetched {} instruments from Bithumb",
999 instruments.len()
1000 );
1001 }
1002
1003 return Ok(instruments);
1004 } else {
1005 self.update_metrics("get_instruments", false, duration_ms);
1006 let err = BithumbError::json_parsing(
1007 "Response does not contain expected object structure",
1008 );
1009 return Err(anyhow!(err));
1010 }
1011 }
1012
1013 self.update_metrics("get_instruments", false, duration_ms);
1014 let err = BithumbError::connection(format!("HTTP request failed with status: {status}"));
1015 Err(anyhow!(err))
1016 }
1017
1018 async fn send_fix_message(&self, message: Vec<u8>) -> Result<()> {
1019 log::warn!(
1020 "BithumbExchange: send_fix_message not implemented. Message: {:?}",
1021 message
1022 );
1023 Err(anyhow!("FIX message sending not implemented for Bithumb"))
1024 }
1025
1026 async fn receive_fix_message(&self) -> Result<Vec<u8>> {
1027 log::warn!("BithumbExchange: receive_fix_message not implemented.");
1028 Err(anyhow!("FIX message receiving not implemented for Bithumb"))
1029 }
1030}
1031
1032#[cfg(test)]
1033mod tests {
1034 use super::*;
1035
1036 #[tokio::test]
1037 async fn test_send_fix_message_not_implemented() {
1038 let exchange =
1039 BithumbExchange::new("test_api_key".to_string(), "test_secret_key".to_string());
1040
1041 let message = vec![b'T', b'E', b'S', b'T'];
1042 let result = exchange.send_fix_message(message).await;
1043
1044 assert!(result.is_err());
1045 let err = result.unwrap_err();
1046 assert!(
1047 err.to_string()
1048 .contains("FIX message sending not implemented for Bithumb")
1049 );
1050 }
1051
1052 #[tokio::test]
1053 async fn test_receive_fix_message_not_implemented() {
1054 let exchange =
1055 BithumbExchange::new("test_api_key".to_string(), "test_secret_key".to_string());
1056
1057 let result = exchange.receive_fix_message().await;
1058
1059 assert!(result.is_err());
1060 let err = result.unwrap_err();
1061 assert!(
1062 err.to_string()
1063 .contains("FIX message receiving not implemented for Bithumb")
1064 );
1065 }
1066}