1use std::sync::Arc;
2use std::time::Duration;
3
4use anyhow::{Result, anyhow};
5use async_trait::async_trait;
6use flume::Sender;
8use parking_lot::RwLock;
9use quanta::Clock;
10use reqwest::Client;
11use rust_decimal::Decimal;
12use rusty_common::SmartString;
13use rusty_common::auth::exchanges::upbit::{UpbitAuth, UpbitAuthConfig};
14use rusty_common::utils::id_generation;
15use rusty_common::websocket::connector::{WebSocketSink, WebSocketStream};
16use rusty_model::{
17 enums::{OrderSide, OrderStatus, OrderType},
18 instruments::InstrumentId,
19 trading_order::Order,
20 venues::Venue,
21};
22use serde::{Deserialize, Serialize};
23use simd_json::json;
24use smallvec::SmallVec;
25use tokio::sync::RwLock as AsyncRwLock;
26
27use crate::execution_engine::{Exchange, ExecutionReport};
28
29const API_URL: &str = "https://api.upbit.com";
31const WS_URL: &str = "wss://api.upbit.com/websocket/v1";
32const WS_PRIVATE_URL: &str = "wss://api.upbit.com/websocket/v1/private";
33
34#[derive(Debug, Serialize, Deserialize)]
40struct UpbitOrderResponse {
41 uuid: String,
43 side: String,
45 ord_type: String,
47 price: Option<String>,
49 state: String,
51 market: String,
53 created_at: String,
55 volume: String,
57 remaining_volume: String,
59 reserved_fee: String,
61 remaining_fee: String,
63 paid_fee: String,
65 locked: String,
67 executed_volume: String,
69 trades_count: u64,
71 time_in_force: Option<String>,
73 identifier: Option<String>,
75}
76
77#[derive(Debug, Serialize, Deserialize)]
82struct UpbitErrorResponse {
83 error: UpbitError,
85}
86
87#[derive(Debug, Serialize, Deserialize)]
92struct UpbitError {
93 name: String,
95 message: String,
97}
98
99#[derive(Clone)]
111pub struct UpbitExchange {
112 auth: Arc<UpbitAuth>,
114
115 client: Arc<Client>,
117
118 ws_sink: Arc<AsyncRwLock<Option<WebSocketSink>>>,
120 ws_stream: Arc<AsyncRwLock<Option<WebSocketStream>>>,
121
122 connected: Arc<RwLock<bool>>,
124
125 clock: Clock,
127
128 instruments_cache: Arc<RwLock<SmallVec<[InstrumentId; 32]>>>,
130}
131
132impl UpbitExchange {
133 #[must_use]
135 pub fn new(api_key: String, secret_key: String) -> Self {
136 let client = Client::builder()
137 .timeout(Duration::from_secs(10))
138 .build()
139 .expect("Failed to build HTTP client");
140
141 let auth_config = UpbitAuthConfig::new(api_key.into(), secret_key.into());
142 let auth = Arc::new(UpbitAuth::new(auth_config));
143
144 Self {
145 auth,
146 client: Arc::new(client),
147 ws_sink: Arc::new(AsyncRwLock::new(None)),
148 ws_stream: Arc::new(AsyncRwLock::new(None)),
149 connected: Arc::new(RwLock::new(false)),
150 clock: Clock::new(),
151 instruments_cache: Arc::new(RwLock::new(SmallVec::new())),
152 }
153 }
154
155 fn map_order_status(status: &str) -> OrderStatus {
157 match status {
158 "wait" => OrderStatus::New,
159 "watch" => OrderStatus::New,
160 "done" => OrderStatus::Filled,
161 "cancel" => OrderStatus::Cancelled,
162 _ => OrderStatus::Rejected, }
164 }
165
166 const fn map_order_type(order_type: OrderType) -> &'static str {
168 match order_type {
169 OrderType::Market => "price", OrderType::Limit => "limit",
171 OrderType::PostOnly => "best", _ => "limit", }
174 }
175
176 const fn map_time_in_force(order_type: OrderType) -> Option<&'static str> {
178 match order_type {
179 OrderType::ImmediateOrCancel => Some("ioc"),
180 OrderType::FillOrKill => Some("fok"),
181 _ => None,
182 }
183 }
184
185 const fn map_order_side(side: OrderSide) -> &'static str {
187 match side {
188 OrderSide::Buy => "bid",
189 OrderSide::Sell => "ask",
190 }
191 }
192
193 fn parse_timestamp(timestamp_str: &str) -> Result<u64> {
204 rusty_common::time::parse_rfc3339_timestamp(timestamp_str).map_err(|e| {
206 anyhow!(
207 "Failed to parse RFC3339 timestamp '{}': {}",
208 timestamp_str,
209 e
210 )
211 })
212 }
213}
214
215#[async_trait]
216impl Exchange for UpbitExchange {
217 fn venue(&self) -> Venue {
218 Venue::Upbit
219 }
220
221 async fn place_order(&self, order: Order, report_tx: Sender<ExecutionReport>) -> Result<()> {
222 if !*self.connected.read() {
223 return Err(anyhow!("Not connected to Upbit"));
224 }
225
226 let market = order.symbol.clone();
227 let side = Self::map_order_side(order.side);
228 let ord_type = Self::map_order_type(order.order_type);
229
230 let mut body = json!({
232 "market": market,
233 "side": side,
234 "ord_type": ord_type,
235 });
236
237 if ord_type == "limit" {
239 body["price"] = json!(
240 order
241 .price
242 .map_or_else(|| "0".to_string(), |p| p.to_string())
243 );
244 body["volume"] = json!(order.quantity.to_string());
245 } else if ord_type == "price" {
246 body["price"] = json!(
248 order
249 .price
250 .map_or_else(|| "0".to_string(), |p| p.to_string())
251 );
252 } else if ord_type == "market" {
253 body["volume"] = json!(order.quantity.to_string());
255 } else if ord_type == "best" {
256 if let Some(tif) = Self::map_time_in_force(order.order_type) {
258 body["time_in_force"] = json!(tif);
259
260 if side == "bid" {
262 body["price"] = json!(
264 order
265 .price
266 .map_or_else(|| "0".to_string(), |p| p.to_string())
267 );
268 } else {
269 body["volume"] = json!(order.quantity.to_string());
271 }
272 } else {
273 return Err(anyhow!("Best order type requires IOC or FOK order type"));
274 }
275 }
276
277 if ord_type == "limit"
279 && let Some(tif) = Self::map_time_in_force(order.order_type)
280 {
281 body["time_in_force"] = json!(tif);
282 }
283
284 if !order.client_id.as_str().is_empty() {
286 body["identifier"] = json!(order.client_id.as_str());
287 }
288
289 let body_str = simd_json::to_string(&body)?;
291 let token = self.auth.generate_rest_jwt_post(&body_str)?;
292
293 let url = format!("{API_URL}/v1/orders");
295 let response = self
296 .client
297 .post(&url)
298 .header("Authorization", format!("Bearer {token}"))
299 .header("Content-Type", "application/json")
300 .json(&body)
301 .send()
302 .await?;
303
304 if response.status().is_success() {
305 let order_response: UpbitOrderResponse = response.json().await?;
306
307 let report = ExecutionReport {
309 id: id_generation::generate_ack_id(&order.id.to_string()),
310 order_id: order.id.to_string().into(),
311 exchange_timestamp: Self::parse_timestamp(&order_response.created_at)
312 .unwrap_or_else(|e| {
313 log::warn!("Failed to parse Upbit timestamp: {e}");
314 rusty_common::time::get_timestamp_ns_result()
315 .unwrap_or_else(|_| self.clock.raw())
316 }),
317 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
318 instrument_id: InstrumentId::new(order.symbol.clone(), order.venue),
319 status: Self::map_order_status(&order_response.state),
320 filled_quantity: Decimal::from_str_exact(&order_response.executed_volume)
321 .unwrap_or(Decimal::ZERO),
322 remaining_quantity: Decimal::from_str_exact(&order_response.remaining_volume)
323 .unwrap_or(Decimal::ZERO),
324 execution_price: order_response
325 .price
326 .and_then(|p| Decimal::from_str_exact(&p).ok()),
327 reject_reason: None,
328 exchange_execution_id: Some(order_response.uuid.into()),
329 is_final: false,
330 };
331
332 report_tx.send_async(report).await?;
333 } else {
334 let error_response: UpbitErrorResponse = response.json().await?;
335 let error_msg = format!(
336 "{}: {}",
337 error_response.error.name, error_response.error.message
338 );
339
340 let report = ExecutionReport {
342 id: id_generation::generate_rejection_id(&order.id.to_string()),
343 order_id: order.id.to_string().into(),
344 exchange_timestamp: rusty_common::time::get_timestamp_ns_result()
345 .unwrap_or_else(|_| self.clock.raw()),
346 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
347 instrument_id: InstrumentId::new(order.symbol.clone(), order.venue),
348 status: OrderStatus::Rejected,
349 filled_quantity: Decimal::ZERO,
350 remaining_quantity: order.quantity,
351 execution_price: None,
352 reject_reason: Some(error_msg.clone().into()),
353 exchange_execution_id: None,
354 is_final: true,
355 };
356
357 report_tx.send_async(report).await?;
358 return Err(anyhow!("Order placement failed: {}", error_msg));
359 }
360
361 Ok(())
362 }
363
364 async fn cancel_order(
365 &self,
366 order_id: SmartString,
367 report_tx: Sender<ExecutionReport>,
368 ) -> Result<()> {
369 if !*self.connected.read() {
370 return Err(anyhow!("Not connected to Upbit"));
371 }
372
373 let query_params = Some(("uuid", order_id.as_str()));
375 let query_string = format!("uuid={order_id}");
376
377 let token = self
379 .auth
380 .generate_rest_jwt_get(query_params.as_ref().map(std::slice::from_ref))?;
381
382 let url = format!("{API_URL}/v1/order?{query_string}");
384 let response = self
385 .client
386 .delete(&url)
387 .header("Authorization", format!("Bearer {token}"))
388 .send()
389 .await?;
390
391 if response.status().is_success() {
392 let order_response: UpbitOrderResponse = response.json().await?;
393
394 let instrument = InstrumentId {
396 symbol: order_response.market.into(),
397 venue: Venue::Upbit,
398 };
399
400 let report = ExecutionReport {
402 id: id_generation::generate_cancel_id(&order_id),
403 order_id: order_id.clone(),
404 exchange_timestamp: Self::parse_timestamp(&order_response.created_at)
405 .unwrap_or_else(|e| {
406 log::warn!("Failed to parse Upbit timestamp: {e}");
407 rusty_common::time::get_timestamp_ns_result()
408 .unwrap_or_else(|_| self.clock.raw())
409 }),
410 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
411 instrument_id: instrument,
412 status: OrderStatus::Cancelled,
413 filled_quantity: Decimal::from_str_exact(&order_response.executed_volume)
414 .unwrap_or(Decimal::ZERO),
415 remaining_quantity: Decimal::from_str_exact(&order_response.remaining_volume)
416 .unwrap_or(Decimal::ZERO),
417 execution_price: order_response
418 .price
419 .and_then(|p| Decimal::from_str_exact(&p).ok()),
420 reject_reason: None,
421 exchange_execution_id: Some(order_response.uuid.into()),
422 is_final: true,
423 };
424
425 report_tx.send_async(report).await?;
426 } else {
427 let error_response: UpbitErrorResponse = response.json().await?;
428 let error_msg = format!(
429 "{}: {}",
430 error_response.error.name, error_response.error.message
431 );
432 return Err(anyhow!("Order cancellation failed: {}", error_msg));
433 }
434
435 Ok(())
436 }
437
438 async fn modify_order(
439 &self,
440 _order_id: SmartString,
441 _new_price: Option<Decimal>,
442 _new_quantity: Option<Decimal>,
443 _report_tx: Sender<ExecutionReport>,
444 ) -> Result<()> {
445 Err(anyhow!(
450 "Order modification not supported for Upbit. Cancel and place a new order instead."
451 ))
452 }
453
454 async fn cancel_all_orders(
455 &self,
456 _instrument_id: Option<InstrumentId>,
457 __report_tx: Sender<ExecutionReport>,
458 ) -> Result<()> {
459 Err(anyhow!("Cancel all orders not implemented for Upbit"))
464 }
465
466 async fn get_order_status(&self, order_id: &str) -> Result<OrderStatus> {
467 if !*self.connected.read() {
468 return Err(anyhow!("Not connected to Upbit"));
469 }
470
471 let query_params = Some(("uuid", order_id));
473 let query_string = format!("uuid={order_id}");
474
475 let token = self
477 .auth
478 .generate_rest_jwt_get(query_params.as_ref().map(std::slice::from_ref))?;
479
480 let url = format!("{API_URL}/v1/order?{query_string}");
482 let response = self
483 .client
484 .get(&url)
485 .header("Authorization", format!("Bearer {token}"))
486 .send()
487 .await?;
488
489 if response.status().is_success() {
490 let order_response: UpbitOrderResponse = response.json().await?;
491 return Ok(Self::map_order_status(&order_response.state));
492 } else {
493 let error_response: UpbitErrorResponse = response.json().await?;
494 let error_msg = format!(
495 "{}: {}",
496 error_response.error.name, error_response.error.message
497 );
498 return Err(anyhow!("Failed to get order status: {}", error_msg));
499 }
500 }
501
502 async fn connect(&self, _report_sender: Sender<ExecutionReport>) -> Result<()> {
503 *self.connected.write() = true;
506 Ok(())
507 }
508
509 async fn disconnect(&self) -> Result<()> {
510 *self.connected.write() = false;
512 Ok(())
513 }
514
515 async fn is_connected(&self) -> bool {
516 *self.connected.read()
517 }
518
519 async fn get_instruments(&self) -> Result<SmallVec<[InstrumentId; 32]>> {
520 {
522 let cache = self.instruments_cache.read();
523 if !cache.is_empty() {
524 return Ok(cache.clone());
525 }
526 }
527
528 let url = format!("{API_URL}/v1/market/all");
530 let response = self.client.get(&url).send().await?;
531
532 if response.status().is_success() {
533 #[derive(Debug, Deserialize)]
534 struct Market {
535 market: String,
536 korean_name: String,
537 english_name: String,
538 }
539
540 let markets: Vec<Market> = response.json().await?;
541 let mut instruments = SmallVec::new();
542
543 for market in markets {
544 let instrument_id = InstrumentId {
545 symbol: market.market.into(),
546 venue: Venue::Upbit,
547 };
548 instruments.push(instrument_id);
549 }
550
551 *self.instruments_cache.write() = instruments.clone();
553
554 return Ok(instruments);
555 }
556 return Err(anyhow!("Failed to get instruments from Upbit"));
557 }
558
559 async fn send_fix_message(&self, _message: Vec<u8>) -> Result<()> {
560 anyhow::bail!("FIX protocol not supported on Upbit")
562 }
563
564 async fn receive_fix_message(&self) -> Result<Vec<u8>> {
565 anyhow::bail!("FIX protocol not supported on Upbit")
567 }
568}
569
570#[cfg(test)]
571mod tests {
572 use super::*;
573 use rusty_model::enums::OrderType;
574
575 #[test]
576 fn test_map_time_in_force_ioc() {
577 let result = UpbitExchange::map_time_in_force(OrderType::ImmediateOrCancel);
578 assert_eq!(result, Some("ioc"));
579 }
580
581 #[test]
582 fn test_map_time_in_force_fok() {
583 let result = UpbitExchange::map_time_in_force(OrderType::FillOrKill);
584 assert_eq!(result, Some("fok"));
585 }
586
587 #[test]
588 fn test_map_time_in_force_market() {
589 let result = UpbitExchange::map_time_in_force(OrderType::Market);
590 assert_eq!(result, None);
591 }
592
593 #[test]
594 fn test_map_time_in_force_limit() {
595 let result = UpbitExchange::map_time_in_force(OrderType::Limit);
596 assert_eq!(result, None);
597 }
598
599 #[test]
600 fn test_map_time_in_force_stop() {
601 let result = UpbitExchange::map_time_in_force(OrderType::Stop);
602 assert_eq!(result, None);
603 }
604
605 #[test]
606 fn test_map_time_in_force_stop_limit() {
607 let result = UpbitExchange::map_time_in_force(OrderType::StopLimit);
608 assert_eq!(result, None);
609 }
610
611 #[test]
612 fn test_map_time_in_force_post_only() {
613 let result = UpbitExchange::map_time_in_force(OrderType::PostOnly);
614 assert_eq!(result, None);
615 }
616
617 #[test]
618 fn test_parse_timestamp_valid_rfc3339() {
619 let timestamp = "2023-12-25T12:30:45.123456789Z";
620 let result = UpbitExchange::parse_timestamp(timestamp);
621 assert!(result.is_ok());
622 let nanos = result.unwrap();
623 assert!(nanos > 0);
624 }
625
626 #[test]
627 fn test_parse_timestamp_valid_with_timezone() {
628 let timestamp = "2023-12-25T12:30:45.123456789+09:00";
629 let result = UpbitExchange::parse_timestamp(timestamp);
630 assert!(result.is_ok());
631 let nanos = result.unwrap();
632 assert!(nanos > 0);
633 }
634
635 #[test]
636 fn test_parse_timestamp_valid_with_negative_timezone() {
637 let timestamp = "2023-12-25T12:30:45.123456789-05:00";
638 let result = UpbitExchange::parse_timestamp(timestamp);
639 assert!(result.is_ok());
640 let nanos = result.unwrap();
641 assert!(nanos > 0);
642 }
643
644 #[test]
645 fn test_parse_timestamp_valid_without_nanos() {
646 let timestamp = "2023-12-25T12:30:45Z";
647 let result = UpbitExchange::parse_timestamp(timestamp);
648 assert!(result.is_ok());
649 let nanos = result.unwrap();
650 assert!(nanos > 0);
651 }
652
653 #[test]
654 fn test_parse_timestamp_valid_microseconds() {
655 let timestamp = "2023-12-25T12:30:45.123456Z";
656 let result = UpbitExchange::parse_timestamp(timestamp);
657 assert!(result.is_ok());
658 let nanos = result.unwrap();
659 assert!(nanos > 0);
660 }
661
662 #[test]
663 fn test_parse_timestamp_valid_milliseconds() {
664 let timestamp = "2023-12-25T12:30:45.123Z";
665 let result = UpbitExchange::parse_timestamp(timestamp);
666 assert!(result.is_ok());
667 let nanos = result.unwrap();
668 assert!(nanos > 0);
669 }
670
671 #[test]
672 fn test_parse_timestamp_year_boundary() {
673 let timestamp = "2023-12-31T23:59:59.999999999Z";
674 let result = UpbitExchange::parse_timestamp(timestamp);
675 assert!(result.is_ok());
676 let nanos = result.unwrap();
677 assert!(nanos > 0);
678 }
679
680 #[test]
681 fn test_parse_timestamp_leap_year() {
682 let timestamp = "2024-02-29T12:30:45.123456789Z";
683 let result = UpbitExchange::parse_timestamp(timestamp);
684 assert!(result.is_ok());
685 let nanos = result.unwrap();
686 assert!(nanos > 0);
687 }
688
689 #[test]
690 fn test_parse_timestamp_invalid_format() {
691 let timestamp = "2023-12-25 12:30:45";
692 let result = UpbitExchange::parse_timestamp(timestamp);
693 assert!(result.is_err());
694 assert!(
695 result
696 .unwrap_err()
697 .to_string()
698 .contains("Failed to parse RFC3339 timestamp")
699 );
700 }
701
702 #[test]
703 fn test_parse_timestamp_invalid_date() {
704 let timestamp = "2023-02-30T12:30:45.123456789Z";
705 let result = UpbitExchange::parse_timestamp(timestamp);
706 assert!(result.is_err());
707 assert!(
708 result
709 .unwrap_err()
710 .to_string()
711 .contains("Failed to parse RFC3339 timestamp")
712 );
713 }
714
715 #[test]
716 fn test_parse_timestamp_invalid_time() {
717 let timestamp = "2023-12-25T25:30:45.123456789Z";
718 let result = UpbitExchange::parse_timestamp(timestamp);
719 assert!(result.is_err());
720 assert!(
721 result
722 .unwrap_err()
723 .to_string()
724 .contains("Failed to parse RFC3339 timestamp")
725 );
726 }
727
728 #[test]
729 fn test_parse_timestamp_empty_string() {
730 let timestamp = "";
731 let result = UpbitExchange::parse_timestamp(timestamp);
732 assert!(result.is_err());
733 assert!(
734 result
735 .unwrap_err()
736 .to_string()
737 .contains("Failed to parse RFC3339 timestamp")
738 );
739 }
740
741 #[test]
742 fn test_parse_timestamp_malformed_timezone() {
743 let timestamp = "2023-12-25T12:30:45.123456789+25:00";
744 let result = UpbitExchange::parse_timestamp(timestamp);
745 assert!(result.is_err());
746 assert!(
747 result
748 .unwrap_err()
749 .to_string()
750 .contains("Failed to parse RFC3339 timestamp")
751 );
752 }
753
754 #[test]
755 fn test_parse_timestamp_missing_timezone() {
756 let timestamp = "2023-12-25T12:30:45.123456789";
757 let result = UpbitExchange::parse_timestamp(timestamp);
758 assert!(result.is_err());
759 assert!(
760 result
761 .unwrap_err()
762 .to_string()
763 .contains("Failed to parse RFC3339 timestamp")
764 );
765 }
766
767 #[test]
768 fn test_parse_timestamp_with_whitespace() {
769 let timestamp = " 2023-12-25T12:30:45.123456789Z ";
770 let result = UpbitExchange::parse_timestamp(timestamp);
771 assert!(result.is_err());
772 assert!(
773 result
774 .unwrap_err()
775 .to_string()
776 .contains("Failed to parse RFC3339 timestamp")
777 );
778 }
779
780 #[test]
781 fn test_parse_timestamp_precision_edge_cases() {
782 let timestamp = "2023-12-25T12:30:45.999999999Z";
784 let result = UpbitExchange::parse_timestamp(timestamp);
785 assert!(result.is_ok());
786 let nanos = result.unwrap();
787 assert!(nanos > 0);
788 }
789
790 #[test]
791 fn test_parse_timestamp_error_message_format() {
792 let invalid_timestamp = "invalid_timestamp";
793 let result = UpbitExchange::parse_timestamp(invalid_timestamp);
794 assert!(result.is_err());
795 let error_msg = result.unwrap_err().to_string();
796 assert!(error_msg.contains("Failed to parse RFC3339 timestamp"));
797 assert!(error_msg.contains("invalid_timestamp"));
798 }
799
800 #[test]
801 fn test_parse_timestamp_out_of_range() {
802 let timestamp = "9999-12-31T23:59:59.999999999Z";
805 let result = UpbitExchange::parse_timestamp(timestamp);
806 if let Err(error) = result {
809 let error_msg = error.to_string();
810 assert!(
811 error_msg.contains("Failed to parse RFC3339 timestamp")
812 || error_msg.contains("out of range")
813 || error_msg.contains("overflow")
814 );
815 }
816 }
817
818 #[test]
819 fn test_parse_timestamp_return_type() {
820 let timestamp = "2023-12-25T12:30:45.123456789Z";
821 let result = UpbitExchange::parse_timestamp(timestamp);
822 assert!(result.is_ok());
823 let nanos = result.unwrap();
824 assert!(nanos > 1_000_000_000_000_000_000); }
827
828 #[tokio::test]
829 async fn test_send_fix_message_not_supported() {
830 let exchange =
831 UpbitExchange::new("test_api_key".to_string(), "test_secret_key".to_string());
832
833 let message = vec![b'T', b'E', b'S', b'T'];
834 let result = exchange.send_fix_message(message).await;
835
836 assert!(result.is_err());
837 let err = result.unwrap_err();
838 assert!(
839 err.to_string()
840 .contains("FIX protocol not supported on Upbit")
841 );
842 }
843
844 #[tokio::test]
845 async fn test_receive_fix_message_not_supported() {
846 let exchange =
847 UpbitExchange::new("test_api_key".to_string(), "test_secret_key".to_string());
848
849 let result = exchange.receive_fix_message().await;
850
851 assert!(result.is_err());
852 let err = result.unwrap_err();
853 assert!(
854 err.to_string()
855 .contains("FIX protocol not supported on Upbit")
856 );
857 }
858}