1use std::time::{SystemTime, UNIX_EPOCH};
7
8pub mod websocket;
9
10use crate::execution_engine::Exchange;
11use async_trait::async_trait;
12use hmac::{Hmac, Mac};
13use log::{debug, error, info};
14use parking_lot::RwLock;
15use reqwest::{Client, StatusCode};
16use rusty_model::{
17 enums::{OrderSide, OrderStatus, OrderType, TimeInForce},
18 trading_order::Order,
19};
20use serde::{Deserialize, Serialize};
21use sha2::Sha256;
22use smallvec::SmallVec;
23use std::sync::Arc;
24use tokio::sync::OnceCell;
25
26use self::websocket::{
27 BinanceWebSocketTradingClient, OrderCancelRequest, OrderPlaceRequest, WebSocketConfig,
28};
29
30type HmacSha256 = Hmac<Sha256>;
31
32const API_URL: &str = "https://api.binance.com";
34
35#[derive(Debug, Serialize, Deserialize)]
37struct BinanceOrderResponse {
38 #[serde(rename = "orderId")]
40 order_id: u64,
41 #[serde(rename = "clientOrderId")]
43 client_order_id: std::string::String,
44 symbol: std::string::String,
46 #[serde(rename = "transactTime")]
48 transaction_time: u64,
49 price: std::string::String,
51 #[serde(rename = "origQty")]
53 original_qty: std::string::String,
54 #[serde(rename = "executedQty")]
56 executed_qty: std::string::String,
57 #[serde(rename = "cummulativeQuoteQty")]
59 cumulative_quote_qty: std::string::String,
60 status: std::string::String,
62 #[serde(rename = "timeInForce")]
64 time_in_force: std::string::String,
65 #[serde(rename = "type")]
67 order_type: std::string::String,
68 side: std::string::String,
70}
71
72#[derive(Debug, Serialize, Deserialize)]
74struct BinanceError {
75 code: i32,
77 msg: std::string::String,
79}
80
81pub struct BinanceExchange {
83 pub api_key: std::string::String,
85 pub secret_key: std::string::String,
87 client: Client,
89 api_url: std::string::String,
91 ws_client: Arc<OnceCell<BinanceWebSocketTradingClient>>,
93 prefer_websocket: Arc<RwLock<bool>>,
95}
96
97impl BinanceExchange {
98 #[must_use]
100 pub fn new(api_key: std::string::String, secret_key: std::string::String) -> Self {
101 Self {
102 api_key,
103 secret_key,
104 client: Client::new(),
105 api_url: API_URL.into(),
106 ws_client: Arc::new(OnceCell::new()),
107 prefer_websocket: Arc::new(RwLock::new(false)), }
109 }
110
111 #[must_use]
113 pub fn with_api_url(mut self, api_url: std::string::String) -> Self {
114 self.api_url = api_url;
115 self
116 }
117
118 fn generate_signature(&self, query_string: &str) -> std::string::String {
120 let mut mac = HmacSha256::new_from_slice(self.secret_key.as_bytes())
121 .expect("HMAC can take key of any size");
122 mac.update(query_string.as_bytes());
123 let result = mac.finalize();
124 hex::encode(result.into_bytes())
125 }
126
127 fn get_timestamp() -> u64 {
129 SystemTime::now()
130 .duration_since(UNIX_EPOCH)
131 .expect("Time went backwards")
132 .as_millis() as u64
133 }
134
135 pub fn enable_websocket(&self, enable: bool) {
137 *self.prefer_websocket.write() = enable;
138 }
139
140 async fn ensure_ws_client(
142 &self,
143 ) -> Result<&BinanceWebSocketTradingClient, std::string::String> {
144 self.ws_client
145 .get_or_try_init(|| async {
146 info!("Initializing Binance WebSocket trading client");
147
148 let auth = rusty_common::auth::exchanges::binance::BinanceAuth::new_ed25519(
150 self.api_key.clone().into(),
151 self.secret_key.clone().into(),
152 )
153 .map_err(|e| format!("Failed to create Binance auth: {e}"))?;
154
155 let config = if self.api_url.contains("testnet") {
157 WebSocketConfig::testnet()
158 } else {
159 WebSocketConfig::default()
160 };
161
162 let client = BinanceWebSocketTradingClient::new(config, auth);
164 client.connect().await.map_err(|e| {
165 error!("Failed to connect WebSocket: {e}");
166 e
167 })?;
168
169 info!("Binance WebSocket trading client connected");
170 Ok(client)
171 })
172 .await
173 }
174
175 pub async fn websocket_client(
177 &self,
178 ) -> Result<&BinanceWebSocketTradingClient, std::string::String> {
179 self.ensure_ws_client().await
180 }
181
182 fn create_ws_order_request(&self, order: &Order) -> OrderPlaceRequest {
184 let timestamp = Self::get_timestamp();
185
186 let mut params = vec![
188 format!("symbol={}", order.symbol),
189 format!("side={}", Self::map_order_side(order.side)),
190 format!("type={}", Self::map_order_type(order.order_type)),
191 format!("quantity={}", order.quantity),
192 format!("timestamp={}", timestamp),
193 ];
194
195 if order.order_type == OrderType::Limit || order.order_type == OrderType::StopLimit {
197 if let Some(p) = order.price {
198 params.insert(3, format!("price={p}"));
199 }
200 params.insert(
201 3,
202 format!(
203 "timeInForce={}",
204 Self::map_time_in_force(self.get_time_in_force(order.order_type))
205 ),
206 );
207 }
208
209 params.push(format!("newClientOrderId={}", order.id));
210
211 let query = params.join("&");
212 let signature = self.generate_signature(&query);
213
214 OrderPlaceRequest {
215 symbol: order.symbol.clone(),
216 side: Self::map_order_side(order.side).into(),
217 order_type: Self::map_order_type(order.order_type).into(),
218 time_in_force: Some(
219 Self::map_time_in_force(self.get_time_in_force(order.order_type)).into(),
220 ),
221 quantity: Some(order.quantity),
222 quote_order_qty: None,
223 price: order.price,
224 new_client_order_id: Some(order.id.to_string().into()),
225 new_order_resp_type: Some("FULL".into()),
226 stop_price: order.stop_price,
227 working_type: None,
228 iceberg_qty: None,
229 self_trade_prevention_mode: Some("EXPIRE_MAKER".into()),
230 timestamp,
231 signature: signature.into(),
232 }
233 }
234
235 const fn map_order_type(order_type: OrderType) -> &'static str {
237 match order_type {
238 OrderType::Market => "MARKET",
239 OrderType::Limit => "LIMIT",
240 OrderType::Stop => "STOP_LOSS", OrderType::StopLimit => "STOP_LOSS_LIMIT",
242 OrderType::FillOrKill => "LIMIT", OrderType::ImmediateOrCancel => "LIMIT", OrderType::PostOnly => "LIMIT", }
246 }
247
248 const fn map_order_side(side: OrderSide) -> &'static str {
250 match side {
251 OrderSide::Buy => "BUY",
252 OrderSide::Sell => "SELL",
253 }
254 }
255
256 const fn map_time_in_force(tif: TimeInForce) -> &'static str {
258 match tif {
259 TimeInForce::GTC => "GTC",
260 TimeInForce::IOC => "IOC",
261 TimeInForce::FOK => "FOK",
262 _ => "GTC", }
264 }
265
266 #[allow(dead_code)]
268 fn map_order_status(status: &str) -> OrderStatus {
269 match status {
270 "NEW" => OrderStatus::New,
271 "PARTIALLY_FILLED" => OrderStatus::PartiallyFilled,
272 "FILLED" => OrderStatus::Filled,
273 "CANCELED" | "CANCELLED" => OrderStatus::Cancelled, "REJECTED" => OrderStatus::Rejected,
275 "EXPIRED" => OrderStatus::Expired,
276 _ => OrderStatus::New,
277 }
278 }
279
280 const fn get_time_in_force(&self, order_type: OrderType) -> TimeInForce {
282 match order_type {
283 OrderType::Market => TimeInForce::IOC,
284 OrderType::Limit => TimeInForce::GTC,
285 OrderType::Stop => TimeInForce::GTC,
286 OrderType::StopLimit => TimeInForce::GTC,
287 OrderType::FillOrKill => TimeInForce::FOK,
288 OrderType::ImmediateOrCancel => TimeInForce::IOC,
289 OrderType::PostOnly => TimeInForce::GTC,
290 }
291 }
292}
293
294#[async_trait]
295impl Exchange for BinanceExchange {
296 async fn send_order(&self, order: Order) -> crate::Result<()> {
297 if *self.prefer_websocket.read() {
299 match self.ensure_ws_client().await {
300 Ok(ws_client) => {
301 let ws_request = self.create_ws_order_request(&order);
303 match ws_client.place_order(ws_request).await {
304 Ok(response) => {
305 info!(
306 "Order placed via WebSocket: {} (exchange ID: {})",
307 response.client_order_id, response.order_id
308 );
309 return Ok(());
310 }
311 Err(e) => {
312 error!("WebSocket order placement failed: {e}, falling back to REST");
313 }
315 }
316 }
317 Err(e) => {
318 error!("Failed to initialize WebSocket client: {e}, using REST API");
319 }
321 }
322 }
323
324 let symbol = order.symbol.clone();
326 let side = Self::map_order_side(order.side);
327 let order_type = Self::map_order_type(order.order_type);
328 let time_in_force = Self::map_time_in_force(self.get_time_in_force(order.order_type));
329 let quantity = order.quantity.to_string();
330 let _price = order.price.map(|p| p.to_string()).unwrap_or_default();
331 let client_order_id = order.id.to_string();
332
333 let timestamp = Self::get_timestamp();
334 let mut query_params = vec![
335 format!("symbol={}", symbol),
336 format!("side={}", side),
337 format!("type={}", order_type),
338 format!("timeInForce={}", time_in_force),
339 format!("quantity={}", quantity),
340 format!("newClientOrderId={}", client_order_id),
341 format!("timestamp={}", timestamp),
342 ];
343
344 if (order.order_type == OrderType::Limit || order.order_type == OrderType::StopLimit)
346 && let Some(p) = order.price
347 {
348 query_params.insert(5, format!("price={p}"));
349 }
350
351 let query = query_params.join("&");
352
353 let signature = self.generate_signature(&query);
354 let query_with_signature = format!("{query}&signature={signature}");
355
356 let url = format!("{}/api/v3/order", self.api_url);
357 let response = match self
358 .client
359 .post(&url)
360 .header("X-MBX-APIKEY", self.api_key.as_str())
361 .header("Content-Type", "application/x-www-form-urlencoded")
362 .body(query_with_signature)
363 .send()
364 .await
365 {
366 Ok(resp) => resp,
367 Err(e) => {
368 return Err(crate::OmsError::Exchange(
369 format!("Request error: {e}").into(),
370 ));
371 }
372 };
373
374 if response.status() != StatusCode::OK {
375 let error_body = match response.text().await {
376 Ok(body) => body,
377 Err(e) => {
378 return Err(crate::OmsError::Exchange(
379 format!("Failed to read error response: {e}").into(),
380 ));
381 }
382 };
383
384 let mut error_bytes = error_body.clone().into_bytes();
385 let error_json: Result<BinanceError, _> = simd_json::from_slice(&mut error_bytes);
386
387 let error_message = if let Ok(error) = error_json {
388 format!("Binance error: {} (code: {})", error.msg, error.code)
389 } else {
390 format!("Binance error: {error_body}")
391 };
392
393 error!("{error_message}");
394 return Err(crate::OmsError::Exchange(error_message.into()));
395 }
396
397 debug!("Order placed successfully: {}", client_order_id);
398 Ok(())
399 }
400
401 async fn cancel_order(&self, order_id: std::string::String) -> crate::Result<()> {
402 if *self.prefer_websocket.read() {
404 match self.ensure_ws_client().await {
405 Ok(ws_client) => {
406 let parts: SmallVec<[&str; 3]> = order_id.split(':').collect();
408 if parts.len() >= 2 {
409 let symbol = parts[0];
410 let client_order_id = parts[1];
411
412 let timestamp = Self::get_timestamp();
413 let query = format!(
414 "symbol={symbol}&origClientOrderId={client_order_id}×tamp={timestamp}"
415 );
416 let signature = self.generate_signature(&query);
417
418 let cancel_request = OrderCancelRequest {
419 symbol: symbol.into(),
420 order_id: None,
421 orig_client_order_id: Some(client_order_id.into()),
422 new_client_order_id: None,
423 timestamp,
424 signature: signature.into(),
425 };
426
427 match ws_client.cancel_order(cancel_request).await {
428 Ok(response) => {
429 info!(
430 "Order cancelled via WebSocket: {}",
431 response.client_order_id
432 );
433 return Ok(());
434 }
435 Err(e) => {
436 error!(
437 "WebSocket order cancellation failed: {e}, falling back to REST"
438 );
439 }
441 }
442 }
443 }
444 Err(e) => {
445 error!("Failed to initialize WebSocket client: {e}, using REST API");
446 }
448 }
449 }
450
451 let parts: SmallVec<[&str; 3]> = order_id.split(':').collect();
456 if parts.len() < 2 {
457 return Err(crate::OmsError::Exchange(
458 "Invalid order_id format. Expected SYMBOL:CLIENT_ORDER_ID".into(),
459 ));
460 }
461
462 let symbol = parts[0];
463 let client_order_id = parts[1];
464
465 let timestamp = Self::get_timestamp();
466 let query =
467 format!("symbol={symbol}&origClientOrderId={client_order_id}×tamp={timestamp}");
468
469 let signature = self.generate_signature(&query);
470 let query_with_signature = format!("{query}&signature={signature}");
471
472 let url = format!("{}/api/v3/order", self.api_url);
473 let response = match self
474 .client
475 .delete(&url)
476 .header("X-MBX-APIKEY", self.api_key.as_str())
477 .header("Content-Type", "application/x-www-form-urlencoded")
478 .body(query_with_signature)
479 .send()
480 .await
481 {
482 Ok(resp) => resp,
483 Err(e) => {
484 return Err(crate::OmsError::Exchange(
485 format!("Request error: {e}").into(),
486 ));
487 }
488 };
489
490 if response.status() != StatusCode::OK {
491 let error_body = match response.text().await {
492 Ok(body) => body,
493 Err(e) => {
494 return Err(crate::OmsError::Exchange(
495 format!("Failed to read error response: {e}").into(),
496 ));
497 }
498 };
499
500 let error_message = format!("Order cancellation failed: {error_body}");
501 error!("{error_message}");
502 return Err(crate::OmsError::Exchange(error_message.into()));
503 }
504
505 debug!("Order cancelled successfully: {}", order_id);
506 Ok(())
507 }
508
509 async fn get_order_status(
510 &self,
511 order_id: std::string::String,
512 ) -> crate::Result<std::string::String> {
513 let parts: SmallVec<[&str; 3]> = order_id.split(':').collect();
517 if parts.len() < 2 {
518 return Err(crate::OmsError::Exchange(
519 "Invalid order_id format. Expected SYMBOL:CLIENT_ORDER_ID".into(),
520 ));
521 }
522
523 let symbol = parts[0];
524 let client_order_id = parts[1];
525
526 let timestamp = Self::get_timestamp();
527 let query =
528 format!("symbol={symbol}&origClientOrderId={client_order_id}×tamp={timestamp}");
529
530 let signature = self.generate_signature(&query);
531
532 let url = format!("{}/api/v3/order", self.api_url);
533 let response = match self
534 .client
535 .get(&url)
536 .header("X-MBX-APIKEY", self.api_key.as_str())
537 .query(&[
538 ("symbol", symbol),
539 ("origClientOrderId", client_order_id),
540 ("timestamp", ×tamp.to_string()),
541 ("signature", &signature),
542 ])
543 .send()
544 .await
545 {
546 Ok(resp) => resp,
547 Err(e) => {
548 return Err(crate::OmsError::Exchange(
549 format!("Request error: {e}").into(),
550 ));
551 }
552 };
553
554 if response.status() != StatusCode::OK {
555 let error_body = match response.text().await {
556 Ok(body) => body,
557 Err(e) => {
558 return Err(crate::OmsError::Exchange(
559 format!("Failed to read error response: {e}").into(),
560 ));
561 }
562 };
563
564 let error_message = format!("Failed to get order status: {error_body}");
565 error!("{error_message}");
566 return Err(crate::OmsError::Exchange(error_message.into()));
567 }
568
569 let bytes = response.bytes().await.map_err(|e| {
570 crate::OmsError::Exchange(format!("Failed to read response: {e}").into())
571 })?;
572 let mut bytes_vec = bytes.to_vec();
573 let order_response: BinanceOrderResponse = match simd_json::from_slice(&mut bytes_vec) {
575 Ok(resp) => resp,
576 Err(e) => {
577 return Err(crate::OmsError::Exchange(
578 format!("Failed to parse response: {e}").into(),
579 ));
580 }
581 };
582
583 Ok(order_response.status)
584 }
585}