1use std::sync::Arc;
2
3use anyhow::{Result, anyhow};
4use async_trait::async_trait;
5use flume::Sender;
6
7use crate::utils::time::timestamp_millis;
8use parking_lot::RwLock;
9use quanta::Clock;
10use rust_decimal::Decimal;
11use rusty_common::SmartString;
12use rusty_common::utils::id_generation;
13use rusty_model::{
14 enums::{OrderSide, OrderStatus, OrderType, TimeInForce},
15 instruments::InstrumentId,
16 trading_order::Order,
17 types::ClientId,
18 venues::Venue,
19};
20use smallvec::SmallVec;
21use std::time::{Duration, SystemTime};
22
23use crate::error::EMSError;
24use crate::error::batch_errors::{
25 BatchResult, BatchStatus, BatchSummary, OrderResult, OrderResultMap,
26};
27use crate::exchanges::binance_rest::{
28 BinanceRestClient, OcoOrderParams, PlaceOrderParams, SorOrderParams,
29};
30use crate::exchanges::binance_websocket::BinanceWebSocketClient;
31use crate::execution_engine::{Exchange, ExecutionReport};
32
33fn map_order_status(status: &str) -> OrderStatus {
35 match status {
36 "NEW" => OrderStatus::New,
37 "PARTIALLY_FILLED" => OrderStatus::PartiallyFilled,
38 "FILLED" => OrderStatus::Filled,
39 "CANCELED" | "CANCELLED" => OrderStatus::Cancelled,
40 "REJECTED" => OrderStatus::Rejected,
41 "EXPIRED" => OrderStatus::Expired,
42 "PENDING_CANCEL" => OrderStatus::Pending,
43 "PENDING_NEW" => OrderStatus::Pending,
44 "REPLACED" => OrderStatus::Unknown, _ => {
46 log::error!("Unknown Binance order status: {status}");
47 OrderStatus::Unknown
48 }
49 }
50}
51
52#[derive(Clone, Debug)]
54struct OrderSymbolCacheEntry {
55 symbol: Option<SmartString>,
57 cached_at: SystemTime,
59}
60
61impl OrderSymbolCacheEntry {
62 fn found(symbol: SmartString) -> Self {
64 Self {
65 symbol: Some(symbol),
66 cached_at: SystemTime::now(),
67 }
68 }
69
70 fn not_found() -> Self {
72 Self {
73 symbol: None,
74 cached_at: SystemTime::now(),
75 }
76 }
77
78 fn is_valid(&self, max_age: Duration) -> bool {
80 self.cached_at.elapsed().unwrap_or(max_age) < max_age
81 }
82}
83
84#[derive(Debug, Clone)]
120pub struct BinanceConfig {
121 pub order_lookup_timeout_ms: u64,
123 pub order_lookup_total_timeout_ms: u64,
125 pub major_symbols: SmallVec<[SmartString; 8]>,
127}
128
129impl Default for BinanceConfig {
130 fn default() -> Self {
131 Self {
132 order_lookup_timeout_ms: 2000,
133 order_lookup_total_timeout_ms: 5000,
134 major_symbols: Self::default_major_symbols(),
135 }
136 }
137}
138
139impl BinanceConfig {
140 #[must_use]
142 fn default_major_symbols() -> SmallVec<[SmartString; 8]> {
143 let symbols = ["BTCUSDT", "ETHUSDT", "BNBUSDT", "ADAUSDT", "SOLUSDT"];
144 symbols.iter().map(|&s| s.into()).collect()
145 }
146
147 #[must_use]
149 pub fn high_frequency() -> Self {
150 Self {
151 order_lookup_timeout_ms: 1000, order_lookup_total_timeout_ms: 2000, major_symbols: Self::default_major_symbols(),
154 }
155 }
156}
157
158pub struct BinanceExchange {
160 rest_client: Arc<BinanceRestClient>,
162
163 ws_client: BinanceWebSocketClient,
165
166 clock: Clock,
168
169 instruments_cache: Arc<RwLock<SmallVec<[InstrumentId; 32]>>>,
171
172 order_symbol_cache:
175 Arc<RwLock<rusty_common::collections::FxHashMap<SmartString, OrderSymbolCacheEntry>>>,
176
177 config: BinanceConfig,
179}
180
181impl Clone for BinanceExchange {
182 fn clone(&self) -> Self {
183 Self {
184 rest_client: self.rest_client.clone(),
185 ws_client: self.ws_client.clone(),
186 clock: self.clock.clone(),
187 instruments_cache: self.instruments_cache.clone(),
188 order_symbol_cache: self.order_symbol_cache.clone(),
189 config: self.config.clone(),
190 }
191 }
192}
193
194impl BinanceExchange {
195 #[must_use]
197 pub fn rest_client(&self) -> &BinanceRestClient {
198 &self.rest_client
199 }
200
201 #[must_use]
203 pub const fn websocket_client(&self) -> &BinanceWebSocketClient {
204 &self.ws_client
205 }
206
207 #[must_use]
209 pub fn create_rest_client(api_key: String, secret_key: String) -> BinanceRestClient {
210 use rusty_common::auth::exchanges::binance::BinanceAuth;
211 let auth = BinanceAuth::new_hmac(api_key.into(), secret_key.into());
212 BinanceRestClient::new_with_auth(auth)
213 }
214
215 #[must_use]
217 pub fn create_websocket_client(api_key: String, secret_key: String) -> BinanceWebSocketClient {
218 use rusty_common::auth::exchanges::binance::BinanceAuth;
219 let auth = Arc::new(BinanceAuth::new_hmac(api_key.into(), secret_key.into()));
220 BinanceWebSocketClient::new(auth)
221 }
222 #[must_use]
224 pub fn new(api_key: String, secret_key: String) -> Self {
225 Self::new_with_config(api_key, secret_key, BinanceConfig::default())
226 }
227
228 #[must_use]
230 pub fn new_with_config(api_key: String, secret_key: String, config: BinanceConfig) -> Self {
231 use rusty_common::auth::exchanges::binance::BinanceAuth;
232
233 let auth = Arc::new(BinanceAuth::new_hmac(api_key.into(), secret_key.into()));
234 let rest_client = Arc::new(BinanceRestClient::new_with_auth((*auth).clone()));
235 let ws_client = BinanceWebSocketClient::new(auth);
236
237 Self {
238 rest_client,
239 ws_client,
240 clock: Clock::new(),
241 instruments_cache: Arc::new(RwLock::new(SmallVec::new())),
242 order_symbol_cache: Arc::new(RwLock::new(
243 rusty_common::collections::FxHashMap::default(),
244 )),
245 config,
246 }
247 }
248
249 pub fn new_ed25519(api_key: String, private_key: String) -> Result<Self> {
251 Self::new_ed25519_with_config(api_key, private_key, BinanceConfig::default())
252 }
253
254 pub fn new_ed25519_with_config(
256 api_key: String,
257 private_key: String,
258 config: BinanceConfig,
259 ) -> Result<Self> {
260 use rusty_common::auth::exchanges::binance::BinanceAuth;
261
262 let auth = Arc::new(
263 BinanceAuth::new_ed25519(api_key.into(), private_key.into())
264 .map_err(|e| anyhow!("Failed to create Ed25519 auth: {}", e))?,
265 );
266 let rest_client = Arc::new(BinanceRestClient::new_with_auth((*auth).clone()));
267 let ws_client = BinanceWebSocketClient::new(auth);
268
269 Ok(Self {
270 rest_client,
271 ws_client,
272 clock: Clock::new(),
273 instruments_cache: Arc::new(RwLock::new(SmallVec::new())),
274 order_symbol_cache: Arc::new(RwLock::new(
275 rusty_common::collections::FxHashMap::default(),
276 )),
277 config,
278 })
279 }
280
281 fn cache_order_symbol(&self, order_id: &str, symbol: &str) {
283 let mut cache = self.order_symbol_cache.write();
284
285 const MAX_CACHE_SIZE: usize = 10000;
287 if cache.len() >= MAX_CACHE_SIZE {
288 if let Some(oldest_key) = cache
290 .iter()
291 .min_by_key(|(_, entry)| entry.cached_at)
292 .map(|(k, _)| k.clone())
293 {
294 cache.remove(&oldest_key);
295 }
296 }
297
298 cache.insert(order_id.into(), OrderSymbolCacheEntry::found(symbol.into()));
299 }
300
301 fn cache_order_not_found(&self, order_id: &str) {
303 let mut cache = self.order_symbol_cache.write();
304
305 const MAX_CACHE_SIZE: usize = 10000;
307 if cache.len() >= MAX_CACHE_SIZE
308 && let Some(oldest_key) = cache
309 .iter()
310 .min_by_key(|(_, entry)| entry.cached_at)
311 .map(|(k, _)| k.clone())
312 {
313 cache.remove(&oldest_key);
314 }
315
316 cache.insert(order_id.into(), OrderSymbolCacheEntry::not_found());
317 }
318
319 fn get_cached_symbol(&self, order_id: &str) -> Option<SmartString> {
321 let cache = self.order_symbol_cache.read();
322
323 const CACHE_TTL: Duration = Duration::from_secs(3600);
325
326 cache.get(order_id).and_then(|entry| {
327 if entry.is_valid(CACHE_TTL) {
328 entry.symbol.clone()
329 } else {
330 None
331 }
332 })
333 }
334
335 fn is_order_not_found_cached(&self, order_id: &str) -> bool {
337 let cache = self.order_symbol_cache.read();
338
339 const NEGATIVE_CACHE_TTL: Duration = Duration::from_secs(300);
341
342 cache
343 .get(order_id)
344 .is_some_and(|entry| entry.symbol.is_none() && entry.is_valid(NEGATIVE_CACHE_TTL))
345 }
346
347 fn remove_from_cache(&self, order_id: &str) {
349 let mut cache = self.order_symbol_cache.write();
350 cache.remove(order_id);
351 }
352
353 async fn find_order_symbol(&self, order_id: &str) -> Result<SmartString> {
357 if let Some(symbol) = self.get_cached_symbol(order_id) {
359 return Ok(symbol);
360 }
361
362 if self.is_order_not_found_cached(order_id) {
364 return Err(anyhow!(
365 "Order {} was previously not found (cached negative result). \
366 This avoids repeated API calls for non-existent orders.",
367 order_id
368 ));
369 }
370
371 use futures::future::join_all;
374 use std::time::Duration;
375
376 let per_request_timeout_ms = self.config.order_lookup_timeout_ms;
378 let total_timeout_ms = self.config.order_lookup_total_timeout_ms;
379
380 let lookup_futures = self.config.major_symbols.iter().map(|symbol| {
382 let symbol = symbol.clone();
383 let order_id = order_id.to_string();
384 let rest_client = self.rest_client.clone();
385
386 async move {
387 let result = tokio::time::timeout(
389 Duration::from_millis(per_request_timeout_ms),
390 rest_client.get_order_status(&symbol, &order_id),
391 )
392 .await;
393
394 match result {
395 Ok(Ok(_)) => Some(symbol), _ => None, }
398 }
399 });
400
401 let results = tokio::time::timeout(
403 Duration::from_millis(total_timeout_ms),
404 join_all(lookup_futures),
405 )
406 .await
407 .map_err(|_| {
408 anyhow!(
409 "Timeout while searching for order {} in major symbols. \
410 Consider caching order symbols at placement time for HFT performance.",
411 order_id
412 )
413 })?;
414
415 if let Some(symbol) = results.into_iter().flatten().next() {
417 self.cache_order_symbol(order_id, &symbol);
419 return Ok(symbol);
420 }
421
422 self.cache_order_not_found(order_id);
425
426 Err(anyhow!(
428 "Order {} not found in {} configured major trading pairs. \
429 For HFT systems, consider: 1) Adding the symbol to major_symbols configuration, \
430 2) Ensuring order_id -> symbol mapping is cached when placing orders, \
431 3) Using persistent cache across application restarts, \
432 4) Avoiding lookups for orders placed outside current session.",
433 order_id,
434 self.config.major_symbols.len()
435 ))
436 }
437
438 const fn map_order_type(order_type: OrderType) -> &'static str {
440 match order_type {
441 OrderType::Market => "MARKET",
442 OrderType::Limit => "LIMIT",
443 OrderType::Stop => "STOP_LOSS", OrderType::StopLimit => "STOP_LOSS_LIMIT",
445 OrderType::FillOrKill => "LIMIT", OrderType::ImmediateOrCancel => "LIMIT", OrderType::PostOnly => "LIMIT", }
449 }
450
451 const fn map_order_side(side: OrderSide) -> &'static str {
453 match side {
454 OrderSide::Buy => "BUY",
455 OrderSide::Sell => "SELL",
456 }
457 }
458
459 const fn map_time_in_force(tif: TimeInForce) -> &'static str {
461 match tif {
462 TimeInForce::GTC => "GTC",
463 TimeInForce::IOC => "IOC",
464 TimeInForce::FOK => "FOK",
465 _ => "GTC", }
467 }
468
469 const fn get_time_in_force(&self, order_type: OrderType) -> TimeInForce {
471 match order_type {
472 OrderType::Market => TimeInForce::IOC,
473 OrderType::Limit => TimeInForce::GTC,
474 OrderType::Stop => TimeInForce::GTC,
475 OrderType::StopLimit => TimeInForce::GTC,
476 OrderType::FillOrKill => TimeInForce::FOK,
477 OrderType::ImmediateOrCancel => TimeInForce::IOC,
478 OrderType::PostOnly => TimeInForce::GTC,
479 }
480 }
481
482 #[allow(clippy::too_many_arguments)]
496 pub async fn place_oco_order(
497 &self,
498 symbol: &str,
499 side: OrderSide,
500 quantity: Decimal,
501 above_type: &str,
502 below_type: &str,
503 above_price: Option<Decimal>,
504 below_price: Option<Decimal>,
505 above_stop_price: Option<Decimal>,
506 below_stop_price: Option<Decimal>,
507 report_tx: Sender<ExecutionReport>,
508 ) -> Result<()> {
509 let above_price_str = above_price.map(|p| p.to_string());
510 let below_price_str = below_price.map(|p| p.to_string());
511 let above_stop_price_str = above_stop_price.map(|p| p.to_string());
512 let below_stop_price_str = below_stop_price.map(|p| p.to_string());
513
514 let params = OcoOrderParams {
515 symbol,
516 side: Self::map_order_side(side),
517 quantity: &quantity.to_string(),
518 above_type,
519 below_type,
520 above_price: above_price_str.as_deref(),
521 above_stop_price: above_stop_price_str.as_deref(),
522 below_price: below_price_str.as_deref(),
523 below_stop_price: below_stop_price_str.as_deref(),
524 list_client_order_id: None,
525 };
526
527 match self.rest_client.place_oco_order(params).await {
528 Ok(oco_response) => {
529 for order_report in oco_response.order_reports {
531 let report = ExecutionReport {
532 id: id_generation::generate_report_id("oco", &order_report.client_order_id),
533 order_id: order_report.client_order_id.clone(),
534 exchange_timestamp: oco_response.transaction_time * 1_000_000,
535 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
536 instrument_id: InstrumentId::new(symbol, Venue::Binance),
537 status: map_order_status(&order_report.status),
538 filled_quantity: Decimal::from_str_exact(&order_report.executed_qty)
539 .unwrap_or(Decimal::ZERO),
540 remaining_quantity: {
541 let original = Decimal::from_str_exact(&order_report.original_qty)
542 .unwrap_or(Decimal::ZERO);
543 let executed = Decimal::from_str_exact(&order_report.executed_qty)
544 .unwrap_or(Decimal::ZERO);
545 original - executed
546 },
547 execution_price: Decimal::from_str_exact(&order_report.price).ok(),
548 reject_reason: None,
549 exchange_execution_id: Some(order_report.order_id.to_string().into()),
550 is_final: false,
551 };
552
553 report_tx.send_async(report).await?;
554 }
555 Ok(())
556 }
557 Err(e) => {
558 let error_report = ExecutionReport {
559 id: id_generation::generate_report_id_with_uuid("oco_rej"),
560 order_id: id_generation::generate_report_id_with_uuid("oco"),
561 exchange_timestamp: 0,
562 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
563 instrument_id: InstrumentId::new(symbol, Venue::Binance),
564 status: OrderStatus::Rejected,
565 filled_quantity: Decimal::ZERO,
566 remaining_quantity: quantity,
567 execution_price: None,
568 reject_reason: Some(e.to_string().into()),
569 exchange_execution_id: None,
570 is_final: true,
571 };
572
573 report_tx.send_async(error_report).await?;
574 Err(anyhow!("OCO order placement failed: {}", e))
575 }
576 }
577 }
578
579 #[allow(clippy::too_many_arguments)]
590 pub async fn place_sor_order(
591 &self,
592 symbol: &str,
593 side: OrderSide,
594 order_type: OrderType,
595 quantity: Decimal,
596 price: Option<Decimal>,
597 time_in_force: Option<TimeInForce>,
598 report_tx: Sender<ExecutionReport>,
599 ) -> Result<()> {
600 let price_str = price.map(|p| p.to_string());
601 let tif_str = time_in_force.map(Self::map_time_in_force);
602
603 let params = SorOrderParams {
604 symbol,
605 side: Self::map_order_side(side),
606 order_type: Self::map_order_type(order_type),
607 quantity: &quantity.to_string(),
608 price: price_str.as_deref(),
609 time_in_force: tif_str,
610 client_order_id: None,
611 strategy_id: None,
612 strategy_type: None,
613 };
614
615 match self.rest_client.place_sor_order(params).await {
616 Ok(sor_response) => {
617 let report = ExecutionReport {
618 id: id_generation::generate_report_id("sor", &sor_response.client_order_id),
619 order_id: sor_response.client_order_id.clone(),
620 exchange_timestamp: sor_response.transaction_time * 1_000_000,
621 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
622 instrument_id: InstrumentId::new(symbol, Venue::Binance),
623 status: map_order_status(&sor_response.status),
624 filled_quantity: Decimal::from_str_exact(&sor_response.executed_qty)
625 .unwrap_or(Decimal::ZERO),
626 remaining_quantity: {
627 let original = Decimal::from_str_exact(&sor_response.original_qty)
628 .unwrap_or(Decimal::ZERO);
629 let executed = Decimal::from_str_exact(&sor_response.executed_qty)
630 .unwrap_or(Decimal::ZERO);
631 original - executed
632 },
633 execution_price: Decimal::from_str_exact(&sor_response.price).ok(),
634 reject_reason: None,
635 exchange_execution_id: Some(sor_response.order_id.to_string().into()),
636 is_final: false,
637 };
638
639 report_tx.send_async(report).await?;
640 Ok(())
641 }
642 Err(e) => {
643 let error_report = ExecutionReport {
644 id: id_generation::generate_report_id_with_uuid("sor_rej"),
645 order_id: id_generation::generate_report_id_with_uuid("sor"),
646 exchange_timestamp: 0,
647 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
648 instrument_id: InstrumentId::new(symbol, Venue::Binance),
649 status: OrderStatus::Rejected,
650 filled_quantity: Decimal::ZERO,
651 remaining_quantity: quantity,
652 execution_price: None,
653 reject_reason: Some(e.to_string().into()),
654 exchange_execution_id: None,
655 is_final: true,
656 };
657
658 report_tx.send_async(error_report).await?;
659 Err(anyhow!("SOR order placement failed: {}", e))
660 }
661 }
662 }
663
664 pub async fn place_batch_orders(
673 &self,
674 orders: Vec<Order>,
675 report_tx: Sender<ExecutionReport>,
676 ) -> Result<BatchResult<()>> {
677 let start_time = self.clock.raw();
678 let total_orders = orders.len();
679
680 use futures::future::join_all;
682
683 let order_futures: Vec<_> = orders
684 .into_iter()
685 .map(|order| {
686 let report_tx = report_tx.clone();
687 let order_id: SmartString = order.id.to_string().into();
688 async move {
689 let result = self.place_order(order.clone(), report_tx).await;
690 (order_id, order, result)
691 }
692 })
693 .collect();
694
695 let results = join_all(order_futures).await;
696
697 let mut order_results: OrderResultMap<()> = OrderResultMap::default();
698 let mut successful_orders = 0;
699 let mut failed_orders = 0;
700 let mut retryable_orders = 0;
701
702 for (order_id, order, result) in results {
703 match result {
704 Ok(()) => {
705 successful_orders += 1;
706 log::debug!("Batch order {order_id} placed successfully");
707 order_results.insert(order_id.clone(), OrderResult::Success(()));
708 }
709 Err(e) => {
710 failed_orders += 1;
711 log::error!("Batch order {order_id} failed: {e}");
712
713 let ems_error = if let Some(specific_error) = e.downcast_ref::<EMSError>() {
715 specific_error.clone()
716 } else {
717 EMSError::OrderSubmissionError(
718 format!("Order {order_id} failed: {e}").into(),
719 )
720 };
721
722 let is_retryable = ems_error.is_recoverable();
724
725 if is_retryable {
726 retryable_orders += 1;
727 }
728
729 order_results.insert(
730 order_id.clone(),
731 OrderResult::Failed {
732 error: ems_error,
733 order: Box::new(order),
734 is_retryable,
735 },
736 );
737 }
738 }
739 }
740
741 let processing_time_ns = self.clock.raw() - start_time;
742
743 let status = if failed_orders == 0 {
745 BatchStatus::AllSucceeded
746 } else if successful_orders == 0 {
747 BatchStatus::AllFailed
748 } else {
749 BatchStatus::PartialSuccess
750 };
751
752 let batch_result = BatchResult {
753 status,
754 order_results,
755 summary: BatchSummary {
756 total_orders,
757 successful_orders,
758 failed_orders,
759 retryable_orders,
760 processing_time_ns,
761 },
762 transport_error: None,
763 };
764
765 Ok(batch_result)
766 }
767
768 pub async fn get_sor_configuration(&self) -> Result<Vec<simd_json::OwnedValue>> {
770 let configs = self.rest_client.get_sor_config().await?;
771
772 let mut json_values = Vec::with_capacity(configs.len());
774 for config in configs {
775 let value = simd_json::serde::to_owned_value(&config)?;
777 json_values.push(value);
778 }
779
780 Ok(json_values)
781 }
782}
783
784#[async_trait]
785impl Exchange for BinanceExchange {
786 fn venue(&self) -> Venue {
787 Venue::Binance
788 }
789
790 async fn place_order(&self, order: Order, report_tx: Sender<ExecutionReport>) -> Result<()> {
791 if !self.ws_client.is_connected() {
793 self.ws_client
794 .start_user_data_stream(report_tx.clone(), None)
795 .await?;
796 }
797
798 let symbol = order.symbol.clone();
799 let side = Self::map_order_side(order.side);
800 let order_type_clone = order.order_type;
801 let tif = self.get_time_in_force(order_type_clone);
802 let order_type = Self::map_order_type(order.order_type);
803 let time_in_force = Self::map_time_in_force(tif);
804
805 let params = PlaceOrderParams {
807 symbol: symbol.clone(),
808 side: side.into(),
809 order_type: order_type.into(),
810 time_in_force: time_in_force.into(),
811 quantity: order.quantity.to_string().into(),
812 price: order
813 .price
814 .map_or_else(|| "0".to_string(), |p| p.to_string())
815 .into(),
816 client_order_id: order.id.to_string().into(),
817 };
818
819 let result = self.rest_client.place_order(params).await;
820
821 match result {
822 Ok(order_response) => {
823 let report = ExecutionReport {
825 id: id_generation::generate_ack_id(&order.id.to_string()),
826 order_id: order.id.to_string().into(),
827 exchange_timestamp: order_response.transaction_time * 1_000_000, system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
829 instrument_id: InstrumentId::new(order.symbol.clone(), order.venue),
830 status: map_order_status(&order_response.status),
831 filled_quantity: Decimal::from_str_exact(&order_response.executed_qty)
832 .unwrap_or(Decimal::ZERO),
833 remaining_quantity: {
834 let original = Decimal::from_str_exact(&order_response.original_qty)
835 .unwrap_or(Decimal::ZERO);
836 let executed = Decimal::from_str_exact(&order_response.executed_qty)
837 .unwrap_or(Decimal::ZERO);
838 original - executed
839 },
840 execution_price: Decimal::from_str_exact(&order_response.price).ok(),
841 reject_reason: None,
842 exchange_execution_id: Some(order_response.order_id.to_string().into()),
843 is_final: false,
844 };
845
846 report_tx.send_async(report).await?;
847
848 self.cache_order_symbol(&order_response.order_id.to_string(), &symbol);
850 }
851 Err(e) => {
852 let report = ExecutionReport {
854 id: id_generation::generate_rejection_id(&order.id.to_string()),
855 order_id: order.id.to_string().into(),
856 exchange_timestamp: 0,
857 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
858 instrument_id: InstrumentId::new(order.symbol.clone(), order.venue),
859 status: OrderStatus::Rejected,
860 filled_quantity: Decimal::ZERO,
861 remaining_quantity: order.quantity,
862 execution_price: None,
863 reject_reason: Some(e.to_string().into()),
864 exchange_execution_id: None,
865 is_final: true,
866 };
867
868 report_tx.send_async(report).await?;
869 return Err(anyhow!("Order placement failed: {}", e));
870 }
871 }
872
873 Ok(())
874 }
875
876 async fn cancel_order(
877 &self,
878 order_id: SmartString,
879 report_tx: Sender<ExecutionReport>,
880 ) -> Result<()> {
881 let symbol = match self.find_order_symbol(&order_id).await {
883 Ok(symbol) => symbol,
884 Err(e) => {
885 return Err(anyhow!(
886 "Failed to find order symbol for order {}: {}",
887 order_id,
888 e
889 ));
890 }
891 };
892
893 let order_id_str = order_id.to_string();
895
896 let result = self.rest_client.cancel_order(&symbol, &order_id).await;
898
899 match result {
900 Ok(cancel_response) => {
901 let instrument = InstrumentId {
903 symbol: cancel_response.symbol,
904 venue: Venue::Binance,
905 };
906
907 let report = ExecutionReport {
909 id: id_generation::generate_cancel_id(&order_id),
910 order_id,
911 exchange_timestamp: cancel_response.transaction_time * 1_000_000, system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
913 instrument_id: instrument,
914 status: OrderStatus::Cancelled,
915 filled_quantity: Decimal::from_str_exact(&cancel_response.executed_qty)
916 .unwrap_or(Decimal::ZERO),
917 remaining_quantity: {
918 let original = Decimal::from_str_exact(&cancel_response.original_qty)
919 .unwrap_or(Decimal::ZERO);
920 let executed = Decimal::from_str_exact(&cancel_response.executed_qty)
921 .unwrap_or(Decimal::ZERO);
922 original - executed
923 },
924 execution_price: Decimal::from_str_exact(&cancel_response.price).ok(),
925 reject_reason: None,
926 exchange_execution_id: Some(cancel_response.order_id.to_string().into()),
927 is_final: true,
928 };
929
930 report_tx.send_async(report).await?;
931
932 self.remove_from_cache(&order_id_str);
934 }
935 Err(e) => {
936 return Err(anyhow!("Order cancellation failed: {}", e));
937 }
938 }
939
940 Ok(())
941 }
942
943 async fn modify_order(
944 &self,
945 order_id: SmartString,
946 new_price: Option<Decimal>,
947 new_quantity: Option<Decimal>,
948 report_tx: Sender<ExecutionReport>,
949 ) -> Result<()> {
950 let symbol = match self.find_order_symbol(&order_id).await {
954 Ok(symbol) => symbol,
955 Err(e) => {
956 return Err(anyhow!(
957 "Failed to find order symbol for order {}: {}",
958 order_id,
959 e
960 ));
961 }
962 };
963
964 let order_response = {
965 match self.rest_client.get_order_status(&symbol, &order_id).await {
966 Ok(order) => order,
967 Err(e) => {
968 return Err(anyhow!(
969 "Failed to get order status for order {}: {}",
970 order_id,
971 e
972 ));
973 }
974 }
975 };
976
977 if let Some(new_qty) = new_quantity
979 && new_price.is_none()
980 {
981 let original_qty =
982 Decimal::from_str_exact(&order_response.original_qty).unwrap_or(Decimal::ZERO);
983 if new_qty < original_qty {
984 if let Ok(amended_order) = self
986 .rest_client
987 .amend_order_keep_priority(&order_response.symbol, &order_id, new_qty)
988 .await
989 {
990 let report = ExecutionReport {
991 id: id_generation::generate_report_id("amend", &order_id),
992 order_id: order_id.clone(),
993 exchange_timestamp: amended_order.transaction_time * 1_000_000,
994 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
995 instrument_id: InstrumentId::new(
996 amended_order.symbol.clone(),
997 Venue::Binance,
998 ),
999 status: map_order_status(&amended_order.status),
1000 filled_quantity: Decimal::from_str_exact(&amended_order.executed_qty)
1001 .unwrap_or(Decimal::ZERO),
1002 remaining_quantity: {
1003 let original = Decimal::from_str_exact(&amended_order.original_qty)
1004 .unwrap_or(Decimal::ZERO);
1005 let executed = Decimal::from_str_exact(&amended_order.executed_qty)
1006 .unwrap_or(Decimal::ZERO);
1007 original - executed
1008 },
1009 execution_price: Decimal::from_str_exact(&amended_order.price).ok(),
1010 reject_reason: None,
1011 exchange_execution_id: Some(amended_order.order_id.to_string().into()),
1012 is_final: false,
1013 };
1014
1015 report_tx.send_async(report).await?;
1016 return Ok(());
1017 }
1018 }
1020 }
1021
1022 self.cancel_order(order_id.clone(), report_tx.clone())
1026 .await?;
1027
1028 let instrument = InstrumentId {
1030 symbol: order_response.symbol.clone(),
1031 venue: Venue::Binance,
1032 };
1033
1034 let price = new_price.unwrap_or_else(|| {
1035 Decimal::from_str_exact(&order_response.price).unwrap_or(Decimal::ZERO)
1036 });
1037
1038 let quantity = new_quantity.unwrap_or_else(|| {
1039 Decimal::from_str_exact(&order_response.original_qty).unwrap_or(Decimal::ZERO)
1040 });
1041
1042 let side = match order_response.side.as_str() {
1043 "BUY" => OrderSide::Buy,
1044 _ => OrderSide::Sell,
1045 };
1046
1047 let order_type = match order_response.order_type.as_str() {
1048 "MARKET" => OrderType::Market,
1049 _ => OrderType::Limit,
1050 };
1051
1052 let new_order = Order::new(
1054 instrument.venue,
1055 instrument.symbol.clone(),
1056 side,
1057 order_type,
1058 quantity,
1059 Some(price),
1060 ClientId::new(format!("{}_{}", order_id, timestamp_millis())),
1061 );
1062
1063 self.place_order(new_order, report_tx).await?;
1065
1066 Ok(())
1067 }
1068
1069 async fn cancel_all_orders(
1070 &self,
1071 instrument_id: Option<InstrumentId>,
1072 _report_tx: Sender<ExecutionReport>,
1073 ) -> Result<()> {
1074 if let Some(instrument) = instrument_id {
1075 self.rest_client
1077 .cancel_all_orders(&instrument.symbol)
1078 .await?;
1079 } else {
1080 return Err(anyhow!(
1084 "Cancelling all orders globally is not supported by Binance API directly. Please specify an instrument."
1085 ));
1086 }
1087
1088 Ok(())
1092 }
1093
1094 async fn get_order_status(&self, order_id: &str) -> Result<OrderStatus> {
1095 let symbol = match self.find_order_symbol(order_id).await {
1097 Ok(symbol) => symbol,
1098 Err(e) => {
1099 return Err(anyhow!(
1100 "Failed to find order symbol for order {}: {}",
1101 order_id,
1102 e
1103 ));
1104 }
1105 };
1106
1107 match self.rest_client.get_order_status(&symbol, order_id).await {
1108 Ok(order_response) => Ok(map_order_status(&order_response.status)),
1109 Err(e) => Err(anyhow!("Failed to get order status: {}", e)),
1110 }
1111 }
1112
1113 async fn connect(&self, _report_sender: Sender<ExecutionReport>) -> Result<()> {
1114 match self.rest_client.get_exchange_info().await {
1117 Ok(_) => {
1118 Ok(())
1120 }
1121 Err(e) => Err(anyhow!("Failed to connect to Binance API: {}", e)),
1122 }
1123 }
1124
1125 async fn disconnect(&self) -> Result<()> {
1126 self.ws_client.disconnect().await?;
1128 Ok(())
1129 }
1130
1131 async fn is_connected(&self) -> bool {
1132 self.ws_client.is_connected()
1134 }
1135
1136 async fn get_instruments(&self) -> Result<SmallVec<[InstrumentId; 32]>> {
1137 {
1139 let cached = self.instruments_cache.read();
1140 if !cached.is_empty() {
1141 return Ok(cached.clone());
1142 }
1143 }
1144
1145 let exchange_info = self.rest_client.get_exchange_info().await?;
1147
1148 let mut instruments = SmallVec::new();
1150
1151 for symbol in exchange_info.symbols {
1152 if symbol.status == "TRADING" {
1153 instruments.push(InstrumentId {
1154 symbol: symbol.symbol,
1155 venue: Venue::Binance,
1156 });
1157 }
1158 }
1159
1160 *self.instruments_cache.write() = instruments.clone();
1162
1163 Ok(instruments)
1164 }
1165
1166 async fn send_fix_message(&self, message: Vec<u8>) -> Result<()> {
1167 log::warn!(
1168 "BinanceExchange: send_fix_message not implemented. Message: {:?}",
1169 message
1170 );
1171 Err(anyhow!("FIX message sending not implemented for Binance"))
1172 }
1173
1174 async fn receive_fix_message(&self) -> Result<Vec<u8>> {
1175 log::warn!("BinanceExchange: receive_fix_message not implemented.");
1176 Err(anyhow!("FIX message receiving not implemented for Binance"))
1177 }
1178}
1179
1180#[cfg(test)]
1182mod tests {
1183 use super::*;
1184 use dotenv::dotenv;
1185 use std::env;
1186
1187 #[tokio::test]
1188 async fn test_connect_and_get_instruments() {
1189 dotenv().ok();
1190
1191 let api_key = env::var("BINANCE_API_KEY").unwrap_or_default();
1192 let secret_key = env::var("BINANCE_SECRET_KEY").unwrap_or_default();
1193
1194 if api_key.is_empty() || secret_key.is_empty() {
1195 println!("Skipping test: BINANCE_API_KEY and BINANCE_SECRET_KEY must be set");
1196 return;
1197 }
1198
1199 let exchange = BinanceExchange::new(api_key, secret_key);
1200
1201 let (report_tx, _report_rx) = flume::bounded::<ExecutionReport>(10);
1203 let result = exchange.connect(report_tx).await;
1204 assert!(result.is_ok(), "Failed to connect: {result:?}");
1205
1206 let result = exchange.get_instruments().await;
1208 assert!(result.is_ok(), "Failed to get instruments: {result:?}");
1209
1210 let instruments = result.unwrap();
1211 assert!(!instruments.is_empty(), "No instruments returned");
1212
1213 println!("Found {} instruments", instruments.len());
1214 for (i, instrument) in instruments.iter().take(5).enumerate() {
1215 println!(" {}: {}", i + 1, instrument.symbol);
1216 }
1217
1218 let result = exchange.disconnect().await;
1220 assert!(result.is_ok(), "Failed to disconnect: {result:?}");
1221 }
1222
1223 #[tokio::test]
1224 async fn test_send_fix_message_not_implemented() {
1225 let exchange =
1226 BinanceExchange::new("test_api_key".to_string(), "test_secret_key".to_string());
1227
1228 let message = vec![b'T', b'E', b'S', b'T'];
1229 let result = exchange.send_fix_message(message).await;
1230
1231 assert!(result.is_err());
1232 let err = result.unwrap_err();
1233 assert!(
1234 err.to_string()
1235 .contains("FIX message sending not implemented for Binance")
1236 );
1237 }
1238
1239 #[tokio::test]
1240 async fn test_receive_fix_message_not_implemented() {
1241 let exchange =
1242 BinanceExchange::new("test_api_key".to_string(), "test_secret_key".to_string());
1243
1244 let result = exchange.receive_fix_message().await;
1245
1246 assert!(result.is_err());
1247 let err = result.unwrap_err();
1248 assert!(
1249 err.to_string()
1250 .contains("FIX message receiving not implemented for Binance")
1251 );
1252 }
1253}