rusty_oms/exchanges/
binance.rs

1//! Binance exchange implementation
2//!
3//! This module provides order management functionality for the Binance exchange,
4//! supporting both REST API and WebSocket connections.
5
6use std::time::{SystemTime, UNIX_EPOCH};
7
8pub mod websocket;
9
10use crate::execution_engine::Exchange;
11use async_trait::async_trait;
12use hmac::{Hmac, Mac};
13use log::{debug, error, info};
14use parking_lot::RwLock;
15use reqwest::{Client, StatusCode};
16use rusty_model::{
17    enums::{OrderSide, OrderStatus, OrderType, TimeInForce},
18    trading_order::Order,
19};
20use serde::{Deserialize, Serialize};
21use sha2::Sha256;
22use smallvec::SmallVec;
23use std::sync::Arc;
24use tokio::sync::OnceCell;
25
26use self::websocket::{
27    BinanceWebSocketTradingClient, OrderCancelRequest, OrderPlaceRequest, WebSocketConfig,
28};
29
30type HmacSha256 = Hmac<Sha256>;
31
32// Binance API URLs
33const API_URL: &str = "https://api.binance.com";
34
35/// Response structure for Binance order API
36#[derive(Debug, Serialize, Deserialize)]
37struct BinanceOrderResponse {
38    /// Exchange-assigned order ID
39    #[serde(rename = "orderId")]
40    order_id: u64,
41    /// Client-assigned order ID
42    #[serde(rename = "clientOrderId")]
43    client_order_id: std::string::String,
44    /// Trading symbol
45    symbol: std::string::String,
46    /// Transaction timestamp
47    #[serde(rename = "transactTime")]
48    transaction_time: u64,
49    /// Order price
50    price: std::string::String,
51    /// Original order quantity
52    #[serde(rename = "origQty")]
53    original_qty: std::string::String,
54    /// Executed quantity
55    #[serde(rename = "executedQty")]
56    executed_qty: std::string::String,
57    /// Cumulative quote quantity
58    #[serde(rename = "cummulativeQuoteQty")]
59    cumulative_quote_qty: std::string::String,
60    /// Order status
61    status: std::string::String,
62    /// Time in force
63    #[serde(rename = "timeInForce")]
64    time_in_force: std::string::String,
65    /// Order type
66    #[serde(rename = "type")]
67    order_type: std::string::String,
68    /// Order side
69    side: std::string::String,
70}
71
72/// Error response from Binance API
73#[derive(Debug, Serialize, Deserialize)]
74struct BinanceError {
75    /// Error code
76    code: i32,
77    /// Error message
78    msg: std::string::String,
79}
80
81/// Binance exchange implementation
82pub struct BinanceExchange {
83    /// API key for authentication
84    pub api_key: std::string::String,
85    /// Secret key for signing requests
86    pub secret_key: std::string::String,
87    /// HTTP client for REST API calls
88    client: Client,
89    /// Base URL for API requests
90    api_url: std::string::String,
91    /// WebSocket trading client (lazy initialized)
92    ws_client: Arc<OnceCell<BinanceWebSocketTradingClient>>,
93    /// Prefer WebSocket for order management
94    prefer_websocket: Arc<RwLock<bool>>,
95}
96
97impl BinanceExchange {
98    /// Create a new Binance exchange instance
99    #[must_use]
100    pub fn new(api_key: std::string::String, secret_key: std::string::String) -> Self {
101        Self {
102            api_key,
103            secret_key,
104            client: Client::new(),
105            api_url: API_URL.into(),
106            ws_client: Arc::new(OnceCell::new()),
107            prefer_websocket: Arc::new(RwLock::new(false)), // Default to REST API
108        }
109    }
110
111    /// Create a new Binance exchange with custom API URL
112    #[must_use]
113    pub fn with_api_url(mut self, api_url: std::string::String) -> Self {
114        self.api_url = api_url;
115        self
116    }
117
118    /// Generate signature for Binance API requests
119    fn generate_signature(&self, query_string: &str) -> std::string::String {
120        let mut mac = HmacSha256::new_from_slice(self.secret_key.as_bytes())
121            .expect("HMAC can take key of any size");
122        mac.update(query_string.as_bytes());
123        let result = mac.finalize();
124        hex::encode(result.into_bytes())
125    }
126
127    /// Get current timestamp in milliseconds
128    fn get_timestamp() -> u64 {
129        SystemTime::now()
130            .duration_since(UNIX_EPOCH)
131            .expect("Time went backwards")
132            .as_millis() as u64
133    }
134
135    /// Enable WebSocket trading
136    pub fn enable_websocket(&self, enable: bool) {
137        *self.prefer_websocket.write() = enable;
138    }
139
140    /// Initialize WebSocket client if not already initialized
141    async fn ensure_ws_client(
142        &self,
143    ) -> Result<&BinanceWebSocketTradingClient, std::string::String> {
144        self.ws_client
145            .get_or_try_init(|| async {
146                info!("Initializing Binance WebSocket trading client");
147
148                // Create auth instance for WebSocket (Ed25519 required for WebSocket)
149                let auth = rusty_common::auth::exchanges::binance::BinanceAuth::new_ed25519(
150                    self.api_key.clone().into(),
151                    self.secret_key.clone().into(),
152                )
153                .map_err(|e| format!("Failed to create Binance auth: {e}"))?;
154
155                // Create WebSocket config
156                let config = if self.api_url.contains("testnet") {
157                    WebSocketConfig::testnet()
158                } else {
159                    WebSocketConfig::default()
160                };
161
162                // Create and connect client
163                let client = BinanceWebSocketTradingClient::new(config, auth);
164                client.connect().await.map_err(|e| {
165                    error!("Failed to connect WebSocket: {e}");
166                    e
167                })?;
168
169                info!("Binance WebSocket trading client connected");
170                Ok(client)
171            })
172            .await
173    }
174
175    /// Get WebSocket client for advanced usage
176    pub async fn websocket_client(
177        &self,
178    ) -> Result<&BinanceWebSocketTradingClient, std::string::String> {
179        self.ensure_ws_client().await
180    }
181
182    /// Create WebSocket order placement request
183    fn create_ws_order_request(&self, order: &Order) -> OrderPlaceRequest {
184        let timestamp = Self::get_timestamp();
185
186        // Create signature for order
187        let mut params = vec![
188            format!("symbol={}", order.symbol),
189            format!("side={}", Self::map_order_side(order.side)),
190            format!("type={}", Self::map_order_type(order.order_type)),
191            format!("quantity={}", order.quantity),
192            format!("timestamp={}", timestamp),
193        ];
194
195        // Add optional parameters
196        if order.order_type == OrderType::Limit || order.order_type == OrderType::StopLimit {
197            if let Some(p) = order.price {
198                params.insert(3, format!("price={p}"));
199            }
200            params.insert(
201                3,
202                format!(
203                    "timeInForce={}",
204                    Self::map_time_in_force(self.get_time_in_force(order.order_type))
205                ),
206            );
207        }
208
209        params.push(format!("newClientOrderId={}", order.id));
210
211        let query = params.join("&");
212        let signature = self.generate_signature(&query);
213
214        OrderPlaceRequest {
215            symbol: order.symbol.clone(),
216            side: Self::map_order_side(order.side).into(),
217            order_type: Self::map_order_type(order.order_type).into(),
218            time_in_force: Some(
219                Self::map_time_in_force(self.get_time_in_force(order.order_type)).into(),
220            ),
221            quantity: Some(order.quantity),
222            quote_order_qty: None,
223            price: order.price,
224            new_client_order_id: Some(order.id.to_string().into()),
225            new_order_resp_type: Some("FULL".into()),
226            stop_price: order.stop_price,
227            working_type: None,
228            iceberg_qty: None,
229            self_trade_prevention_mode: Some("EXPIRE_MAKER".into()),
230            timestamp,
231            signature: signature.into(),
232        }
233    }
234
235    /// Convert internal `OrderType` to Binance order type
236    const fn map_order_type(order_type: OrderType) -> &'static str {
237        match order_type {
238            OrderType::Market => "MARKET",
239            OrderType::Limit => "LIMIT",
240            OrderType::Stop => "STOP_LOSS", // Maps to stop loss market
241            OrderType::StopLimit => "STOP_LOSS_LIMIT",
242            OrderType::FillOrKill => "LIMIT", // Binance doesn't directly support FOK
243            OrderType::ImmediateOrCancel => "LIMIT", // IOC behavior is set via time_in_force
244            OrderType::PostOnly => "LIMIT",   // Post only behavior is set via additional flag
245        }
246    }
247
248    /// Convert internal `OrderSide` to Binance side
249    const fn map_order_side(side: OrderSide) -> &'static str {
250        match side {
251            OrderSide::Buy => "BUY",
252            OrderSide::Sell => "SELL",
253        }
254    }
255
256    /// Convert internal `TimeInForce` to Binance TIF
257    const fn map_time_in_force(tif: TimeInForce) -> &'static str {
258        match tif {
259            TimeInForce::GTC => "GTC",
260            TimeInForce::IOC => "IOC",
261            TimeInForce::FOK => "FOK",
262            _ => "GTC", // Default to GTC for unsupported types
263        }
264    }
265
266    /// Convert Binance order status to internal `OrderStatus`
267    #[allow(dead_code)]
268    fn map_order_status(status: &str) -> OrderStatus {
269        match status {
270            "NEW" => OrderStatus::New,
271            "PARTIALLY_FILLED" => OrderStatus::PartiallyFilled,
272            "FILLED" => OrderStatus::Filled,
273            "CANCELED" | "CANCELLED" => OrderStatus::Cancelled, // Use the correct variant spelling
274            "REJECTED" => OrderStatus::Rejected,
275            "EXPIRED" => OrderStatus::Expired,
276            _ => OrderStatus::New,
277        }
278    }
279
280    /// Function to extract `time_in_force` from `order_type`
281    const fn get_time_in_force(&self, order_type: OrderType) -> TimeInForce {
282        match order_type {
283            OrderType::Market => TimeInForce::IOC,
284            OrderType::Limit => TimeInForce::GTC,
285            OrderType::Stop => TimeInForce::GTC,
286            OrderType::StopLimit => TimeInForce::GTC,
287            OrderType::FillOrKill => TimeInForce::FOK,
288            OrderType::ImmediateOrCancel => TimeInForce::IOC,
289            OrderType::PostOnly => TimeInForce::GTC,
290        }
291    }
292}
293
294#[async_trait]
295impl Exchange for BinanceExchange {
296    async fn send_order(&self, order: Order) -> crate::Result<()> {
297        // Check if WebSocket is preferred and available
298        if *self.prefer_websocket.read() {
299            match self.ensure_ws_client().await {
300                Ok(ws_client) => {
301                    // Use WebSocket for order placement
302                    let ws_request = self.create_ws_order_request(&order);
303                    match ws_client.place_order(ws_request).await {
304                        Ok(response) => {
305                            info!(
306                                "Order placed via WebSocket: {} (exchange ID: {})",
307                                response.client_order_id, response.order_id
308                            );
309                            return Ok(());
310                        }
311                        Err(e) => {
312                            error!("WebSocket order placement failed: {e}, falling back to REST");
313                            // Fall through to REST API
314                        }
315                    }
316                }
317                Err(e) => {
318                    error!("Failed to initialize WebSocket client: {e}, using REST API");
319                    // Fall through to REST API
320                }
321            }
322        }
323
324        // Original REST API implementation
325        let symbol = order.symbol.clone();
326        let side = Self::map_order_side(order.side);
327        let order_type = Self::map_order_type(order.order_type);
328        let time_in_force = Self::map_time_in_force(self.get_time_in_force(order.order_type));
329        let quantity = order.quantity.to_string();
330        let _price = order.price.map(|p| p.to_string()).unwrap_or_default();
331        let client_order_id = order.id.to_string();
332
333        let timestamp = Self::get_timestamp();
334        let mut query_params = vec![
335            format!("symbol={}", symbol),
336            format!("side={}", side),
337            format!("type={}", order_type),
338            format!("timeInForce={}", time_in_force),
339            format!("quantity={}", quantity),
340            format!("newClientOrderId={}", client_order_id),
341            format!("timestamp={}", timestamp),
342        ];
343
344        // Only add price for limit orders
345        if (order.order_type == OrderType::Limit || order.order_type == OrderType::StopLimit)
346            && let Some(p) = order.price
347        {
348            query_params.insert(5, format!("price={p}"));
349        }
350
351        let query = query_params.join("&");
352
353        let signature = self.generate_signature(&query);
354        let query_with_signature = format!("{query}&signature={signature}");
355
356        let url = format!("{}/api/v3/order", self.api_url);
357        let response = match self
358            .client
359            .post(&url)
360            .header("X-MBX-APIKEY", self.api_key.as_str())
361            .header("Content-Type", "application/x-www-form-urlencoded")
362            .body(query_with_signature)
363            .send()
364            .await
365        {
366            Ok(resp) => resp,
367            Err(e) => {
368                return Err(crate::OmsError::Exchange(
369                    format!("Request error: {e}").into(),
370                ));
371            }
372        };
373
374        if response.status() != StatusCode::OK {
375            let error_body = match response.text().await {
376                Ok(body) => body,
377                Err(e) => {
378                    return Err(crate::OmsError::Exchange(
379                        format!("Failed to read error response: {e}").into(),
380                    ));
381                }
382            };
383
384            let mut error_bytes = error_body.clone().into_bytes();
385            let error_json: Result<BinanceError, _> = simd_json::from_slice(&mut error_bytes);
386
387            let error_message = if let Ok(error) = error_json {
388                format!("Binance error: {} (code: {})", error.msg, error.code)
389            } else {
390                format!("Binance error: {error_body}")
391            };
392
393            error!("{error_message}");
394            return Err(crate::OmsError::Exchange(error_message.into()));
395        }
396
397        debug!("Order placed successfully: {}", client_order_id);
398        Ok(())
399    }
400
401    async fn cancel_order(&self, order_id: std::string::String) -> crate::Result<()> {
402        // Check if WebSocket is preferred and available
403        if *self.prefer_websocket.read() {
404            match self.ensure_ws_client().await {
405                Ok(ws_client) => {
406                    // Parse order ID format: SYMBOL:CLIENT_ORDER_ID
407                    let parts: SmallVec<[&str; 3]> = order_id.split(':').collect();
408                    if parts.len() >= 2 {
409                        let symbol = parts[0];
410                        let client_order_id = parts[1];
411
412                        let timestamp = Self::get_timestamp();
413                        let query = format!(
414                            "symbol={symbol}&origClientOrderId={client_order_id}&timestamp={timestamp}"
415                        );
416                        let signature = self.generate_signature(&query);
417
418                        let cancel_request = OrderCancelRequest {
419                            symbol: symbol.into(),
420                            order_id: None,
421                            orig_client_order_id: Some(client_order_id.into()),
422                            new_client_order_id: None,
423                            timestamp,
424                            signature: signature.into(),
425                        };
426
427                        match ws_client.cancel_order(cancel_request).await {
428                            Ok(response) => {
429                                info!(
430                                    "Order cancelled via WebSocket: {}",
431                                    response.client_order_id
432                                );
433                                return Ok(());
434                            }
435                            Err(e) => {
436                                error!(
437                                    "WebSocket order cancellation failed: {e}, falling back to REST"
438                                );
439                                // Fall through to REST API
440                            }
441                        }
442                    }
443                }
444                Err(e) => {
445                    error!("Failed to initialize WebSocket client: {e}, using REST API");
446                    // Fall through to REST API
447                }
448            }
449        }
450
451        // Original REST API implementation
452        // In a real implementation, we would need to know the symbol
453        // For simplicity, we'll assume the order_id contains the symbol information
454        // Format: SYMBOL:CLIENT_ORDER_ID
455        let parts: SmallVec<[&str; 3]> = order_id.split(':').collect();
456        if parts.len() < 2 {
457            return Err(crate::OmsError::Exchange(
458                "Invalid order_id format. Expected SYMBOL:CLIENT_ORDER_ID".into(),
459            ));
460        }
461
462        let symbol = parts[0];
463        let client_order_id = parts[1];
464
465        let timestamp = Self::get_timestamp();
466        let query =
467            format!("symbol={symbol}&origClientOrderId={client_order_id}&timestamp={timestamp}");
468
469        let signature = self.generate_signature(&query);
470        let query_with_signature = format!("{query}&signature={signature}");
471
472        let url = format!("{}/api/v3/order", self.api_url);
473        let response = match self
474            .client
475            .delete(&url)
476            .header("X-MBX-APIKEY", self.api_key.as_str())
477            .header("Content-Type", "application/x-www-form-urlencoded")
478            .body(query_with_signature)
479            .send()
480            .await
481        {
482            Ok(resp) => resp,
483            Err(e) => {
484                return Err(crate::OmsError::Exchange(
485                    format!("Request error: {e}").into(),
486                ));
487            }
488        };
489
490        if response.status() != StatusCode::OK {
491            let error_body = match response.text().await {
492                Ok(body) => body,
493                Err(e) => {
494                    return Err(crate::OmsError::Exchange(
495                        format!("Failed to read error response: {e}").into(),
496                    ));
497                }
498            };
499
500            let error_message = format!("Order cancellation failed: {error_body}");
501            error!("{error_message}");
502            return Err(crate::OmsError::Exchange(error_message.into()));
503        }
504
505        debug!("Order cancelled successfully: {}", order_id);
506        Ok(())
507    }
508
509    async fn get_order_status(
510        &self,
511        order_id: std::string::String,
512    ) -> crate::Result<std::string::String> {
513        // In a real implementation, we would need to know the symbol
514        // For simplicity, we'll assume the order_id contains the symbol information
515        // Format: SYMBOL:CLIENT_ORDER_ID
516        let parts: SmallVec<[&str; 3]> = order_id.split(':').collect();
517        if parts.len() < 2 {
518            return Err(crate::OmsError::Exchange(
519                "Invalid order_id format. Expected SYMBOL:CLIENT_ORDER_ID".into(),
520            ));
521        }
522
523        let symbol = parts[0];
524        let client_order_id = parts[1];
525
526        let timestamp = Self::get_timestamp();
527        let query =
528            format!("symbol={symbol}&origClientOrderId={client_order_id}&timestamp={timestamp}");
529
530        let signature = self.generate_signature(&query);
531
532        let url = format!("{}/api/v3/order", self.api_url);
533        let response = match self
534            .client
535            .get(&url)
536            .header("X-MBX-APIKEY", self.api_key.as_str())
537            .query(&[
538                ("symbol", symbol),
539                ("origClientOrderId", client_order_id),
540                ("timestamp", &timestamp.to_string()),
541                ("signature", &signature),
542            ])
543            .send()
544            .await
545        {
546            Ok(resp) => resp,
547            Err(e) => {
548                return Err(crate::OmsError::Exchange(
549                    format!("Request error: {e}").into(),
550                ));
551            }
552        };
553
554        if response.status() != StatusCode::OK {
555            let error_body = match response.text().await {
556                Ok(body) => body,
557                Err(e) => {
558                    return Err(crate::OmsError::Exchange(
559                        format!("Failed to read error response: {e}").into(),
560                    ));
561                }
562            };
563
564            let error_message = format!("Failed to get order status: {error_body}");
565            error!("{error_message}");
566            return Err(crate::OmsError::Exchange(error_message.into()));
567        }
568
569        let bytes = response.bytes().await.map_err(|e| {
570            crate::OmsError::Exchange(format!("Failed to read response: {e}").into())
571        })?;
572        let mut bytes_vec = bytes.to_vec();
573        // bytes_vec is already mutable and UTF-8 validated from response.bytes()
574        let order_response: BinanceOrderResponse = match simd_json::from_slice(&mut bytes_vec) {
575            Ok(resp) => resp,
576            Err(e) => {
577                return Err(crate::OmsError::Exchange(
578                    format!("Failed to parse response: {e}").into(),
579                ));
580            }
581        };
582
583        Ok(order_response.status)
584    }
585}