1use crate::instrument_registry::InstrumentRegistry;
59use rusty_common::auth::exchanges::bybit::{BybitAuth, BybitWsTradingMessage};
60use rusty_common::collections::FxHashMap;
61use rusty_common::utils::id_generation;
62use std::sync::Arc;
63use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
64
65use crate::execution_engine::ExecutionReport;
66use anyhow::{Result, bail};
67use async_trait::async_trait;
68use flume::Sender;
69use futures::{SinkExt, StreamExt};
70use log::{debug, error, info, warn};
71use parking_lot::RwLock;
72use quanta::Clock;
73use rust_decimal::Decimal;
74use rusty_common::types::Exchange;
75use rusty_common::websocket::{
76 Message, WebSocketConfig,
77 client::ConnectionState as WsConnectionState,
78 connector::{WebSocketConnector, WebSocketSink, WebSocketStream},
79 stats::new_shared_stats,
80};
81use rusty_model::{
82 enums::{OrderSide, OrderStatus, OrderType, TimeInForce},
83 instruments::InstrumentId,
84 position::{MarginType, PositionSide, PositionUpdate},
85 trading_order::Order,
86 types::PositionId,
87 venues::Venue,
88};
89use simd_json::prelude::*;
90use simd_json::value::owned::Value as JsonValue;
91use smallvec::SmallVec;
92use smartstring::alias::String as SmartString;
93use std::str::FromStr;
94use std::time::Duration;
95use tokio::sync::RwLock as AsyncRwLock;
96use tokio::sync::oneshot;
97use tokio::task::JoinHandle;
98use tokio::time::interval;
99use uuid::Uuid;
100
101const WS_MAINNET_URL: &str = "wss://stream.bybit.com/v5/trade";
103const WS_TESTNET_URL: &str = "wss://stream-testnet.bybit.com/v5/trade";
104
105const PING_INTERVAL_SECONDS: u64 = 20; const PONG_TIMEOUT_SECONDS: u64 = 10;
108const MAX_RECONNECTION_ATTEMPTS: u8 = 10;
109const INITIAL_BACKOFF_MS: u64 = 1000;
110const MAX_BACKOFF_MS: u64 = 60000;
111
112#[derive(Debug, Clone, PartialEq, Eq)]
114pub enum BybitCategory {
115 Spot,
117 Linear,
119 Inverse,
121 Options,
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq)]
127pub enum BybitAccountType {
128 Unified,
130 Contract,
132}
133
134impl BybitCategory {
135 const fn as_str(&self) -> &'static str {
136 match self {
137 Self::Spot => "spot",
138 Self::Linear => "linear",
139 Self::Inverse => "inverse",
140 Self::Options => "option",
141 }
142 }
143
144 #[must_use]
146 pub fn parse_category(s: &str) -> Option<Self> {
147 match s {
148 "spot" => Some(Self::Spot),
149 "linear" => Some(Self::Linear),
150 "inverse" => Some(Self::Inverse),
151 "option" => Some(Self::Options),
152 _ => None,
153 }
154 }
155}
156
157#[derive(Debug, Clone)]
159struct InstrumentInfo {
160 symbol: SmartString,
162 category: BybitCategory,
164 base_coin: SmartString,
166 quote_coin: SmartString,
168}
169
170#[derive(Debug, Clone, Default)]
172pub struct ConnectionHealth {
173 pub is_healthy: bool,
175 pub last_ping_sent: u64,
177 pub last_pong_received: u64,
179 pub reconnection_attempts: u8,
181 pub messages_sent: u64,
183 pub messages_received: u64,
185}
186
187#[derive(Debug, Clone)]
189pub struct BybitOrderRequest {
190 pub symbol: SmartString,
192 pub side: OrderSide,
194 pub order_type: OrderType,
196 pub quantity: Decimal,
198 pub price: Option<Decimal>,
200 pub time_in_force: TimeInForce,
202 pub category: BybitCategory,
204 pub order_link_id: Option<SmartString>,
206 pub reduce_only: Option<bool>,
208 pub close_on_trigger: Option<bool>,
210 pub position_idx: Option<u32>,
212 pub take_profit: Option<Decimal>,
214 pub stop_loss: Option<Decimal>,
216 pub tp_trigger_by: Option<SmartString>,
218 pub sl_trigger_by: Option<SmartString>,
220 pub tp_limit_price: Option<Decimal>,
222 pub sl_limit_price: Option<Decimal>,
224}
225
226#[derive(Debug, Clone)]
228pub struct BybitBatchOrderRequest {
229 pub category: BybitCategory,
231 pub orders: Vec<BybitOrderRequest>,
233}
234
235#[derive(Debug, Clone)]
237pub struct BybitAmendOrderRequest {
238 pub category: BybitCategory,
240 pub symbol: SmartString,
242 pub order_id: Option<SmartString>,
244 pub order_link_id: Option<SmartString>,
246 pub qty: Option<Decimal>,
248 pub price: Option<Decimal>,
250 pub take_profit: Option<Decimal>,
252 pub stop_loss: Option<Decimal>,
254 pub tp_trigger_by: Option<SmartString>,
256 pub sl_trigger_by: Option<SmartString>,
258 pub trigger_price: Option<Decimal>,
260 pub tp_limit_price: Option<Decimal>,
262 pub sl_limit_price: Option<Decimal>,
264}
265
266#[derive(Debug, Clone)]
268pub struct BybitCancelAllRequest {
269 pub category: BybitCategory,
271 pub symbol: Option<SmartString>,
273 pub base_coin: Option<SmartString>,
275 pub settle_coin: Option<SmartString>,
277}
278
279#[derive(Debug, Clone)]
281struct PendingRequest {
282 pub instrument_id: InstrumentId,
284 pub category: BybitCategory,
286 pub request_type: RequestType,
288 pub timestamp: u64,
290}
291
292#[derive(Debug, Clone, PartialEq)]
294enum RequestType {
295 CreateOrder,
297 CancelOrder,
299 AmendOrder,
301 BatchCreate,
303 BatchCancel,
305 BatchAmend,
307}
308
309pub struct BybitWebSocketTrader {
311 auth: Arc<BybitAuth>,
312 testnet: bool,
313 connection_health: Arc<RwLock<ConnectionHealth>>,
314 ws_sink: Arc<AsyncRwLock<Option<WebSocketSink>>>,
315 ws_stream: Arc<AsyncRwLock<Option<WebSocketStream>>>,
316 is_connected: Arc<AtomicBool>,
317 is_authenticated: Arc<AtomicBool>,
318 reconnection_attempts: Arc<AtomicU8>,
319 clock: Clock,
320 ping_task: Arc<AsyncRwLock<Option<JoinHandle<()>>>>,
321 message_task: Arc<AsyncRwLock<Option<JoinHandle<()>>>>,
322 cleanup_task: Arc<AsyncRwLock<Option<JoinHandle<()>>>>,
323 request_counter: Arc<AtomicU64>,
324 auth_completion_tx: Arc<AsyncRwLock<Option<oneshot::Sender<Result<()>>>>>,
326 pending_requests: Arc<AsyncRwLock<FxHashMap<SmartString, PendingRequest>>>,
328 position_state: Arc<RwLock<FxHashMap<PositionId, PositionUpdate>>>,
330
331 account_type: RwLock<Option<BybitAccountType>>,
333
334 instrument_cache: Arc<RwLock<FxHashMap<SmartString, InstrumentInfo>>>,
336
337 instrument_registry: Arc<dyn InstrumentRegistry>,
339}
340
341impl BybitWebSocketTrader {
342 pub fn new(
344 auth: Arc<BybitAuth>,
345 testnet: bool,
346 instrument_registry: Arc<dyn InstrumentRegistry>,
347 ) -> Self {
348 Self {
349 auth,
350 testnet,
351 connection_health: Arc::new(RwLock::new(ConnectionHealth::default())),
352 ws_sink: Arc::new(AsyncRwLock::new(None)),
353 ws_stream: Arc::new(AsyncRwLock::new(None)),
354 is_connected: Arc::new(AtomicBool::new(false)),
355 is_authenticated: Arc::new(AtomicBool::new(false)),
356 reconnection_attempts: Arc::new(AtomicU8::new(0)),
357 clock: Clock::new(),
358 ping_task: Arc::new(AsyncRwLock::new(None)),
359 message_task: Arc::new(AsyncRwLock::new(None)),
360 cleanup_task: Arc::new(AsyncRwLock::new(None)),
361 request_counter: Arc::new(AtomicU64::new(0)),
362 auth_completion_tx: Arc::new(AsyncRwLock::new(None)),
363 pending_requests: Arc::new(AsyncRwLock::new(FxHashMap::default())),
364 position_state: Arc::new(RwLock::new(FxHashMap::default())),
365 account_type: RwLock::new(None),
366 instrument_cache: Arc::new(RwLock::new(FxHashMap::default())),
367 instrument_registry,
368 }
369 }
370
371 const fn get_ws_url(&self) -> &'static str {
373 if self.testnet {
374 WS_TESTNET_URL
375 } else {
376 WS_MAINNET_URL
377 }
378 }
379
380 pub fn get_connection_health(&self) -> ConnectionHealth {
382 self.connection_health.read().clone()
383 }
384
385 pub fn is_ready(&self) -> bool {
387 self.is_connected.load(Ordering::Relaxed) && self.is_authenticated.load(Ordering::Relaxed)
388 }
389
390 pub async fn connect(&self, report_tx: Sender<ExecutionReport>) -> Result<()> {
392 let url = self.get_ws_url();
393 info!("Connecting to Bybit WebSocket: {url}");
394
395 let config = WebSocketConfig::builder(Exchange::Bybit, url.to_string())
397 .connect_timeout(Duration::from_secs(30))
398 .timeout(Duration::from_secs(10))
399 .heartbeat(30000, 60000, 3)
400 .build();
401
402 let stats = new_shared_stats();
403 let connection_state = Arc::new(RwLock::new(WsConnectionState::Disconnected));
404 let mut connector =
405 WebSocketConnector::new(config, stats.clone(), connection_state.clone());
406 let (sink, stream) = connector.connect_with_retry(url).await?;
407
408 *self.ws_sink.write().await = Some(sink);
410 *self.ws_stream.write().await = Some(stream);
411
412 self.is_connected.store(true, Ordering::Relaxed);
413 self.reconnection_attempts.store(0, Ordering::Relaxed);
414
415 {
417 let mut health = self.connection_health.write();
418 health.is_healthy = true;
419 health.last_pong_received = self.clock.raw();
420 }
421
422 self.authenticate().await?;
424
425 self.start_ping_task().await;
427
428 self.start_message_task(report_tx).await;
430
431 self.start_cleanup_task().await;
433
434 info!("Successfully connected and authenticated to Bybit WebSocket");
435 Ok(())
436 }
437
438 async fn authenticate(&self) -> Result<()> {
440 let req_id = format!("auth_{}", Uuid::new_v4());
441 let auth_message = self
442 .auth
443 .create_ws_auth_message(Some(req_id.clone().into()))
444 .map_err(|e| anyhow::anyhow!("Failed to create WebSocket auth message: {}", e))?;
445
446 let (auth_tx, auth_rx) = oneshot::channel();
448
449 {
451 let mut completion_tx = self.auth_completion_tx.write().await;
452 *completion_tx = Some(auth_tx);
453 }
454
455 let json_str = simd_json::to_string(&auth_message)?;
456 self.send_raw_message(json_str).await?;
457
458 debug!("Sent authentication message with req_id: {req_id}");
459
460 match tokio::time::timeout(Duration::from_secs(10), auth_rx).await {
462 Ok(Ok(Ok(()))) => {
463 self.is_authenticated.store(true, Ordering::Relaxed);
464 info!("WebSocket authentication successful");
465 Ok(())
466 }
467 Ok(Ok(Err(e))) => {
468 error!("WebSocket authentication failed: {e}");
469 Err(e)
470 }
471 Ok(Err(_)) => {
472 error!("Authentication channel was closed unexpectedly");
473 bail!("Authentication channel closed")
474 }
475 Err(_) => {
476 error!("Authentication timed out after 10 seconds");
477 bail!("Authentication timeout")
478 }
479 }
480 }
481
482 async fn send_raw_message(&self, message: String) -> Result<()> {
484 if let Some(sink) = self.ws_sink.write().await.as_mut() {
485 sink.send(Message::Text(message.into()).to_frame_view())
486 .await?;
487
488 {
490 let mut health = self.connection_health.write();
491 health.messages_sent += 1;
492 }
493
494 Ok(())
495 } else {
496 bail!("WebSocket sink not available")
497 }
498 }
499
500 async fn start_ping_task(&self) {
502 let sink = self.ws_sink.clone();
503 let health = self.connection_health.clone();
504 let clock = self.clock.clone();
505 let is_connected = self.is_connected.clone();
506
507 let task = tokio::spawn(async move {
508 let mut interval = interval(Duration::from_secs(PING_INTERVAL_SECONDS));
509
510 loop {
511 interval.tick().await;
512
513 if !is_connected.load(Ordering::Relaxed) {
514 break;
515 }
516
517 if let Some(sink) = sink.write().await.as_mut() {
519 let ping_message = simd_json::json!({
520 "req_id": format!("ping_{}", Uuid::new_v4()),
521 "op": "ping"
522 });
523
524 let json_str = simd_json::to_string(&ping_message).unwrap_or_default();
525 if let Err(e) = sink
526 .send(Message::Text(json_str.into()).to_frame_view())
527 .await
528 {
529 error!("Failed to send ping: {e}");
530 break;
531 }
532
533 {
535 let mut health_guard = health.write();
536 health_guard.last_ping_sent = clock.raw();
537 }
538 }
539 }
540 });
541
542 *self.ping_task.write().await = Some(task);
543 }
544
545 async fn start_message_task(&self, report_tx: Sender<ExecutionReport>) {
547 let stream = self.ws_stream.clone();
548 let health = self.connection_health.clone();
549 let clock = self.clock.clone();
550 let is_connected = self.is_connected.clone();
551 let auth_completion_tx = self.auth_completion_tx.clone();
552 let pending_requests = self.pending_requests.clone();
553 let position_state = self.position_state.clone();
554 let instrument_registry = self.instrument_registry.clone();
555
556 let task = tokio::spawn(async move {
557 loop {
558 if !is_connected.load(Ordering::Relaxed) {
559 break;
560 }
561
562 if let Some(stream) = stream.write().await.as_mut() {
563 if let Some(frame) = stream.next().await {
564 let message = Message::from_frame_view(frame);
565
566 {
568 let mut health_guard = health.write();
569 health_guard.messages_received += 1;
570 health_guard.last_pong_received = clock.raw();
571 }
572
573 if let Err(e) = Self::process_message(
575 message,
576 &report_tx,
577 &auth_completion_tx,
578 &pending_requests,
579 &position_state,
580 &clock,
581 &instrument_registry,
582 )
583 .await
584 {
585 error!("Failed to process message: {e}");
586 }
587 } else {
588 debug!("WebSocket stream ended");
589 break;
590 }
591 }
592 }
593 });
594
595 *self.message_task.write().await = Some(task);
596 }
597
598 async fn process_message(
600 message: Message,
601 report_tx: &Sender<ExecutionReport>,
602 auth_completion_tx: &Arc<AsyncRwLock<Option<oneshot::Sender<Result<()>>>>>,
603 pending_requests: &Arc<AsyncRwLock<FxHashMap<SmartString, PendingRequest>>>,
604 position_state: &Arc<RwLock<FxHashMap<PositionId, PositionUpdate>>>,
605 clock: &Clock,
606 instrument_registry: &Arc<dyn InstrumentRegistry>,
607 ) -> Result<()> {
608 match message {
609 Message::Text(text) => {
610 debug!("Received message: {text}");
611
612 let mut json_str = text.clone();
614 let json: JsonValue = unsafe { simd_json::from_str(&mut json_str)? };
616
617 if let Some(op) = json.get("op").and_then(|v| v.as_str()) {
619 match op {
620 "auth" => {
621 Self::handle_auth_response(&json, auth_completion_tx).await?;
622 }
623 "pong" => {
624 debug!("Received pong response");
625 }
626 "order.create" | "order.cancel" | "order.amend" => {
627 Self::handle_order_response(
628 &json,
629 report_tx,
630 pending_requests,
631 clock,
632 instrument_registry,
633 )
634 .await?;
635 }
636 "order.create-batch" | "order.cancel-batch" | "order.amend-batch" => {
637 Self::handle_batch_response(
638 &json,
639 report_tx,
640 pending_requests,
641 clock,
642 instrument_registry,
643 )
644 .await?;
645 }
646 _ => {
647 debug!("Unknown operation: {op}");
648 }
649 }
650 } else if let Some(topic) = json.get("topic").and_then(|v| v.as_str()) {
651 match topic {
652 "order" => {
653 Self::handle_order_update(&json, report_tx, clock, instrument_registry)
654 .await?;
655 }
656 "position" => {
657 Self::handle_position_update(
658 &json,
659 position_state,
660 Some(report_tx),
661 clock,
662 )
663 .await?;
664 }
665 _ => {
666 debug!("Unknown topic: {topic}");
667 }
668 }
669 }
670 }
671 Message::Binary(data) => {
672 debug!("Received binary message: {} bytes", data.len());
673 }
674 Message::Ping(data) => {
675 debug!("Received ping: {} bytes", data.len());
676 }
677 Message::Pong(data) => {
678 debug!("Received pong: {} bytes", data.len());
679 }
680 Message::Close(frame) => {
681 warn!("Received close frame: {frame:?}");
682 }
683 Message::Frame(_) => {
684 debug!("Received raw frame message");
685 }
686 }
687
688 Ok(())
689 }
690
691 async fn handle_auth_response(
693 json: &JsonValue,
694 auth_completion_tx: &Arc<AsyncRwLock<Option<oneshot::Sender<Result<()>>>>>,
695 ) -> Result<()> {
696 let result = if let Some(success) = json
697 .get("success")
698 .and_then(simd_json::prelude::ValueAsScalar::as_bool)
699 {
700 if success {
701 info!("WebSocket authentication successful");
702 Ok(())
703 } else {
704 let msg = json
705 .get("ret_msg")
706 .and_then(|v| v.as_str())
707 .unwrap_or("Unknown error");
708 error!("WebSocket authentication failed: {msg}");
709 Err(anyhow::anyhow!("Authentication failed: {}", msg))
710 }
711 } else {
712 Err(anyhow::anyhow!("Invalid authentication response format"))
713 };
714
715 if let Some(tx) = auth_completion_tx.write().await.take() {
717 let signal_result = match &result {
718 Ok(()) => Ok(()),
719 Err(e) => Err(anyhow::anyhow!("{}", e)),
720 };
721 let _ = tx.send(signal_result);
722 }
723
724 result
725 }
726
727 async fn handle_order_response(
729 json: &JsonValue,
730 report_tx: &Sender<ExecutionReport>,
731 pending_requests: &Arc<AsyncRwLock<FxHashMap<SmartString, PendingRequest>>>,
732 clock: &Clock,
733 instrument_registry: &Arc<dyn InstrumentRegistry>,
734 ) -> Result<()> {
735 debug!("Processing order response: {json:?}");
736
737 let req_id: SmartString = json
739 .get("req_id")
740 .and_then(|v| v.as_str())
741 .map_or_else(|| "unknown".into(), std::convert::Into::into);
742
743 let pending_request = {
745 let pending = pending_requests.read().await;
746 pending.get(&req_id).cloned()
747 };
748
749 if let Some(ret_code) = json
751 .get("retCode")
752 .and_then(simd_json::prelude::ValueAsScalar::as_i64)
753 {
754 let success = ret_code == 0;
755 let status = if success {
756 OrderStatus::New
757 } else {
758 OrderStatus::Rejected
759 };
760
761 let instrument_id = if let Some(ref request) = pending_request {
763 request.instrument_id.clone()
764 } else {
765 let symbol = json
767 .get("result")
768 .and_then(|r| r.get("symbol"))
769 .and_then(|v| v.as_str())
770 .map_or_else(
771 || "BTC/USDT".into(),
772 |s| instrument_registry.normalize_symbol(s, Venue::Bybit),
773 ); InstrumentId::new(&symbol, Venue::Bybit)
775 };
776
777 let order_id: SmartString = json
778 .get("result")
779 .and_then(|r| r.get("orderId"))
780 .and_then(|v| v.as_str())
781 .unwrap_or("unknown")
782 .into();
783
784 if success
786 && order_id != "unknown"
787 && let Some(ref request) = pending_request
788 {
789 instrument_registry.cache_order_mapping(
790 &order_id,
791 request.instrument_id.clone(),
792 None, );
794 }
795
796 let report = ExecutionReport {
797 id: id_generation::generate_report_id_with_uuid("bybit"),
798 order_id: order_id.clone(),
799 exchange_timestamp: json
800 .get("time")
801 .and_then(simd_json::prelude::ValueAsScalar::as_u64)
802 .unwrap_or(0)
803 * 1_000_000,
804 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
805 instrument_id,
806 status,
807 filled_quantity: Decimal::ZERO,
808 remaining_quantity: Decimal::ZERO,
809 execution_price: None,
810 reject_reason: if success {
811 None
812 } else {
813 Some(
814 json.get("retMsg")
815 .and_then(|v| v.as_str())
816 .unwrap_or("Unknown error")
817 .into(),
818 )
819 },
820 exchange_execution_id: None,
821 is_final: !success,
822 };
823
824 if let Err(e) = report_tx.try_send(report) {
825 error!("Failed to send execution report: {e}");
826 }
827
828 if success || status == OrderStatus::Rejected {
830 pending_requests.write().await.remove(&req_id);
831 }
832 }
833
834 Ok(())
835 }
836
837 async fn handle_batch_response(
839 json: &JsonValue,
840 report_tx: &Sender<ExecutionReport>,
841 pending_requests: &Arc<AsyncRwLock<FxHashMap<SmartString, PendingRequest>>>,
842 clock: &Clock,
843 instrument_registry: &Arc<dyn InstrumentRegistry>,
844 ) -> Result<()> {
845 debug!("Processing batch response: {json:?}");
846
847 let req_id: SmartString = json
849 .get("req_id")
850 .and_then(|v| v.as_str())
851 .map_or_else(|| "unknown".into(), std::convert::Into::into);
852
853 let pending_request = {
855 let pending = pending_requests.read().await;
856 pending.get(&req_id).cloned()
857 };
858
859 if let Some(result) = json.get("result") {
861 if let Some(list) = result.get("list").and_then(|v| v.as_array()) {
862 for (i, order) in list.iter().enumerate() {
863 let symbol = order
865 .get("symbol")
866 .and_then(|v| v.as_str())
867 .map(|s| instrument_registry.normalize_symbol(s, Venue::Bybit))
868 .unwrap_or_else(|| {
869 if let Some(ref request) = pending_request {
870 request.instrument_id.symbol.clone()
871 } else {
872 instrument_registry.normalize_symbol("BTCUSDT", Venue::Bybit)
874 }
875 });
876
877 let report = ExecutionReport {
878 id: id_generation::generate_batch_id("bybit", i),
879 order_id: order
880 .get("orderId")
881 .and_then(|v| v.as_str())
882 .unwrap_or("unknown")
883 .into(),
884 exchange_timestamp: order
885 .get("createAt")
886 .and_then(simd_json::prelude::ValueAsScalar::as_u64)
887 .unwrap_or(0)
888 * 1_000_000,
889 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
890 instrument_id: InstrumentId::new(&symbol, Venue::Bybit),
891 status: OrderStatus::New,
892 filled_quantity: Decimal::ZERO,
893 remaining_quantity: Decimal::ZERO,
894 execution_price: None,
895 reject_reason: None,
896 exchange_execution_id: order
897 .get("orderId")
898 .and_then(|v| v.as_str())
899 .map(std::convert::Into::into),
900 is_final: false,
901 };
902
903 if let Err(e) = report_tx.try_send(report) {
904 error!("Failed to send batch execution report: {e}");
905 }
906 }
907 }
908
909 pending_requests.write().await.remove(&req_id);
911 }
912
913 Ok(())
914 }
915
916 async fn handle_order_update(
918 json: &JsonValue,
919 report_tx: &Sender<ExecutionReport>,
920 clock: &Clock,
921 instrument_registry: &Arc<dyn InstrumentRegistry>,
922 ) -> Result<()> {
923 debug!("Processing order update: {json:?}");
924
925 if let Some(data) = json.get("data").and_then(|v| v.as_array()) {
926 for order in data {
927 let status = Self::map_order_status(
928 order
929 .get("orderStatus")
930 .and_then(|v| v.as_str())
931 .unwrap_or("Unknown"),
932 );
933
934 let report = ExecutionReport {
935 id: id_generation::generate_report_id_with_uuid("bybit_update"),
936 order_id: order
937 .get("orderId")
938 .and_then(|v| v.as_str())
939 .unwrap_or("unknown")
940 .into(),
941 exchange_timestamp: order
942 .get("updatedTime")
943 .and_then(simd_json::prelude::ValueAsScalar::as_u64)
944 .unwrap_or(0)
945 * 1_000_000,
946 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
947 instrument_id: InstrumentId::new(
948 order.get("symbol").and_then(|v| v.as_str()).map_or_else(
949 || instrument_registry.normalize_symbol("BTCUSDT", Venue::Bybit),
950 |s| instrument_registry.normalize_symbol(s, Venue::Bybit),
951 ),
952 Venue::Bybit,
953 ),
954 status,
955 filled_quantity: order
956 .get("cumExecQty")
957 .and_then(|v| v.as_str())
958 .and_then(|s| s.parse().ok())
959 .unwrap_or(Decimal::ZERO),
960 remaining_quantity: order
961 .get("leavesQty")
962 .and_then(|v| v.as_str())
963 .and_then(|s| s.parse().ok())
964 .unwrap_or(Decimal::ZERO),
965 execution_price: order
966 .get("avgPrice")
967 .and_then(|v| v.as_str())
968 .and_then(|s| s.parse().ok()),
969 reject_reason: None,
970 exchange_execution_id: order
971 .get("orderId")
972 .and_then(|v| v.as_str())
973 .map(std::convert::Into::into),
974 is_final: matches!(
975 status,
976 OrderStatus::Filled | OrderStatus::Cancelled | OrderStatus::Rejected
977 ),
978 };
979
980 if let Err(e) = report_tx.try_send(report) {
981 error!("Failed to send order update report: {e}");
982 }
983 }
984 }
985
986 Ok(())
987 }
988
989 async fn handle_position_update(
991 json: &JsonValue,
992 position_state: &Arc<RwLock<FxHashMap<PositionId, PositionUpdate>>>,
993 report_tx: Option<&Sender<ExecutionReport>>,
994 clock: &Clock,
995 ) -> Result<()> {
996 debug!("Processing position update: {json:?}");
997
998 if let Some(data) = json.get("data").and_then(|v| v.as_array()) {
1000 let timestamp_ns = clock.raw();
1001
1002 for position_json in data {
1003 if let Some(position_update) =
1004 Self::parse_position_update(position_json, timestamp_ns)
1005 {
1006 {
1008 let mut state = position_state.write();
1009 state.insert(position_update.position_id, position_update.clone());
1010 }
1011
1012 info!(
1014 "Position update: {} {} {} @ {}",
1015 position_update.symbol,
1016 position_update.side,
1017 position_update.amount,
1018 position_update.entry_price
1019 );
1020
1021 if let Some(tx) = report_tx {
1023 let report = ExecutionReport {
1024 id: id_generation::generate_report_id_with_uuid("bybit_position"),
1025 order_id: SmartString::default(), exchange_timestamp: timestamp_ns,
1027 system_timestamp: timestamp_ns,
1028 instrument_id: InstrumentId::new(
1029 position_update.symbol.clone(),
1030 Venue::Bybit,
1031 ),
1032 status: OrderStatus::New, filled_quantity: position_update.amount,
1034 remaining_quantity: Decimal::ZERO, execution_price: Some(position_update.entry_price),
1036 reject_reason: None,
1037 exchange_execution_id: Some(
1038 format!("position_{}", position_update.position_id).into(),
1039 ),
1040 is_final: false,
1041 };
1042
1043 if let Err(e) = tx.try_send(report) {
1044 error!("Failed to send position update report: {e}");
1045 }
1046 }
1047 }
1048 }
1049 }
1050
1051 Ok(())
1052 }
1053
1054 fn parse_position_update(
1056 position_json: &JsonValue,
1057 timestamp_ns: u64,
1058 ) -> Option<PositionUpdate> {
1059 let symbol = position_json.get("symbol").and_then(|v| v.as_str())?;
1061 let side_str = position_json.get("side").and_then(|v| v.as_str())?;
1062 let size_str = position_json.get("size").and_then(|v| v.as_str())?;
1063 let position_value_str = position_json
1064 .get("positionValue")
1065 .and_then(|v| v.as_str())?;
1066 let entry_price_str = position_json.get("entryPrice").and_then(|v| v.as_str())?;
1067 let unrealized_pnl_str = position_json
1068 .get("unrealisedPnl")
1069 .and_then(|v| v.as_str())?;
1070 let realized_pnl_str = position_json
1071 .get("cumRealisedPnl")
1072 .and_then(|v| v.as_str())?;
1073 let margin_mode_str = position_json.get("tradeMode").and_then(|v| v.as_str())?;
1074
1075 let amount = Decimal::from_str(size_str).ok()?;
1077 let position_value = Decimal::from_str(position_value_str).ok()?;
1078 let entry_price = Decimal::from_str(entry_price_str).ok()?;
1079 let unrealized_pnl = Decimal::from_str(unrealized_pnl_str).ok()?;
1080 let realized_pnl = Decimal::from_str(realized_pnl_str).ok()?;
1081
1082 let position_side = match side_str {
1084 "Buy" => PositionSide::Long,
1085 "Sell" => PositionSide::Short,
1086 _ => {
1087 debug!("Unknown position side: {side_str}");
1088 return None;
1089 }
1090 };
1091
1092 let margin_type = match margin_mode_str {
1094 "cross_margin" | "0" => MarginType::Cross,
1095 "isolated_margin" | "1" => MarginType::Isolated,
1096 _ => MarginType::Cross, };
1098
1099 if amount.is_zero() {
1101 debug!("Skipping closed position for {symbol}");
1102 return None;
1103 }
1104
1105 let position_id = Self::generate_stable_position_id(symbol, position_side, Venue::Bybit);
1107
1108 Some(PositionUpdate {
1109 position_id,
1110 venue: Venue::Bybit,
1111 symbol: SmartString::from(symbol),
1112 side: position_side,
1113 amount,
1114 entry_price,
1115 breakeven_price: entry_price, unrealized_pnl,
1117 realized_pnl,
1118 margin_type,
1119 isolated_wallet: position_value, timestamp_ns,
1121 })
1122 }
1123
1124 fn generate_stable_position_id(symbol: &str, side: PositionSide, venue: Venue) -> PositionId {
1126 let position_key = format!("{symbol}-{side}-{venue:?}");
1129
1130 use sha2::{Digest, Sha256};
1134
1135 let mut hasher = Sha256::new();
1137 hasher.update(position_key.as_bytes());
1138 let primary_hash = hasher.finalize();
1139
1140 let mut secondary_hasher = Sha256::new();
1142 secondary_hasher.update((symbol.len() as u64).to_le_bytes());
1143 secondary_hasher.update((side as u8).to_le_bytes());
1144 secondary_hasher.update((venue as u8).to_le_bytes());
1145 secondary_hasher.update(b"position_id_v1"); let secondary_hash = secondary_hasher.finalize();
1147
1148 let mut uuid_bytes = [0u8; 16];
1151
1152 uuid_bytes[0..8].copy_from_slice(&primary_hash[0..8]);
1154
1155 uuid_bytes[8..16].copy_from_slice(&secondary_hash[0..8]);
1157
1158 let stable_uuid = Uuid::from_bytes(uuid_bytes);
1160 PositionId::from_uuid(stable_uuid)
1161 }
1162
1163 fn map_order_status(status: &str) -> OrderStatus {
1165 match status {
1166 "Created" | "New" => OrderStatus::New,
1167 "PartiallyFilled" => OrderStatus::PartiallyFilled,
1168 "Filled" => OrderStatus::Filled,
1169 "Cancelled" => OrderStatus::Cancelled,
1170 "Rejected" => OrderStatus::Rejected,
1171 "PendingCancel" => OrderStatus::Pending,
1172 "Deactivated" => OrderStatus::Cancelled,
1173 "Active" => OrderStatus::Open,
1174 "Untriggered" => OrderStatus::Pending,
1175 "Triggered" => OrderStatus::Open,
1176 _ => OrderStatus::Unknown,
1177 }
1178 }
1179
1180 const fn map_order_side(side: OrderSide) -> &'static str {
1182 match side {
1183 OrderSide::Buy => "Buy",
1184 OrderSide::Sell => "Sell",
1185 }
1186 }
1187
1188 const fn map_order_type(order_type: OrderType) -> &'static str {
1190 match order_type {
1191 OrderType::Market => "Market",
1192 OrderType::Limit => "Limit",
1193 _ => "Limit",
1194 }
1195 }
1196
1197 const fn map_time_in_force(tif: TimeInForce) -> &'static str {
1199 match tif {
1200 TimeInForce::GTC => "GTC",
1201 TimeInForce::IOC => "IOC",
1202 TimeInForce::FOK => "FOK",
1203 TimeInForce::GTX => "PostOnly",
1204 _ => "GTC",
1205 }
1206 }
1207
1208 async fn track_request(
1210 &self,
1211 req_id: SmartString,
1212 instrument_id: InstrumentId,
1213 category: BybitCategory,
1214 request_type: RequestType,
1215 ) {
1216 let pending_request = PendingRequest {
1217 instrument_id,
1218 category,
1219 request_type,
1220 timestamp: self.clock.raw(),
1221 };
1222
1223 self.pending_requests
1224 .write()
1225 .await
1226 .insert(req_id, pending_request);
1227 }
1228
1229 async fn find_order_category(&self, order_id: &str) -> Option<BybitCategory> {
1231 let pending = self.pending_requests.read().await;
1232
1233 for (_, request) in pending.iter() {
1236 if matches!(
1237 request.request_type,
1238 RequestType::CreateOrder | RequestType::BatchCreate
1239 ) {
1240 return Some(request.category.clone());
1243 }
1244 }
1245
1246 None
1247 }
1248
1249 async fn cleanup_old_requests(&self) {
1251 let now = self.clock.raw();
1252 let max_age_ns = 5 * 60 * 1_000_000_000; let mut pending = self.pending_requests.write().await;
1255 pending.retain(|_, request| now.saturating_sub(request.timestamp) < max_age_ns);
1256 }
1257
1258 pub async fn create_order(&self, order: &BybitOrderRequest) -> Result<()> {
1260 let req_id = id_generation::generate_exchange_request_id(
1261 "bybit",
1262 "create",
1263 self.request_counter.fetch_add(1, Ordering::Relaxed),
1264 );
1265 let timestamp = BybitAuth::get_timestamp();
1266 let header = self.auth.create_ws_trading_header(timestamp);
1267
1268 let instrument_id = InstrumentId::new(&order.symbol, Venue::Bybit);
1270
1271 self.track_request(
1273 req_id.clone(),
1274 instrument_id,
1275 order.category.clone(),
1276 RequestType::CreateOrder,
1277 )
1278 .await;
1279
1280 let mut order_data = simd_json::json!({
1281 "category": order.category.as_str(),
1282 "symbol": order.symbol,
1283 "side": Self::map_order_side(order.side),
1284 "orderType": Self::map_order_type(order.order_type),
1285 "qty": order.quantity.to_string(),
1286 "price": order.price.map_or("0".to_string(), |p| p.to_string()),
1287 "timeInForce": Self::map_time_in_force(order.time_in_force),
1288 "orderLinkId": order.order_link_id.as_ref().map_or(String::new(), std::string::ToString::to_string),
1289 "reduceOnly": order.reduce_only.unwrap_or(false),
1290 "closeOnTrigger": order.close_on_trigger.unwrap_or(false),
1291 "positionIdx": order.position_idx.unwrap_or(0)
1292 });
1293
1294 if let Some(tp) = order.take_profit {
1296 order_data["takeProfit"] = simd_json::json!(tp.to_string());
1297 }
1298 if let Some(sl) = order.stop_loss {
1299 order_data["stopLoss"] = simd_json::json!(sl.to_string());
1300 }
1301 if let Some(tp_trigger_by) = &order.tp_trigger_by {
1302 order_data["tpTriggerBy"] = simd_json::json!(tp_trigger_by);
1303 }
1304 if let Some(sl_trigger_by) = &order.sl_trigger_by {
1305 order_data["slTriggerBy"] = simd_json::json!(sl_trigger_by);
1306 }
1307 if let Some(tp_limit_price) = order.tp_limit_price {
1308 order_data["tpLimitPrice"] = simd_json::json!(tp_limit_price.to_string());
1309 }
1310 if let Some(sl_limit_price) = order.sl_limit_price {
1311 order_data["slLimitPrice"] = simd_json::json!(sl_limit_price.to_string());
1312 }
1313
1314 let message = BybitWsTradingMessage {
1315 req_id,
1316 header,
1317 op: "order.create".into(),
1318 args: vec![order_data],
1319 };
1320
1321 let json_str = simd_json::to_string(&message)?;
1322 self.send_raw_message(json_str).await?;
1323
1324 Ok(())
1325 }
1326
1327 pub async fn create_batch_orders(&self, batch: &BybitBatchOrderRequest) -> Result<()> {
1329 let req_id = id_generation::generate_exchange_request_id(
1330 "bybit",
1331 "batch_create",
1332 self.request_counter.fetch_add(1, Ordering::Relaxed),
1333 );
1334 let timestamp = BybitAuth::get_timestamp();
1335 let header = self.auth.create_ws_trading_header(timestamp);
1336
1337 if let Some(first_order) = batch.orders.first() {
1339 let instrument_id = InstrumentId::new(&first_order.symbol, Venue::Bybit);
1340 self.track_request(
1341 req_id.clone(),
1342 instrument_id,
1343 batch.category.clone(),
1344 RequestType::BatchCreate,
1345 )
1346 .await;
1347 }
1348
1349 let mut batch_data = Vec::new();
1350 for order in &batch.orders {
1351 let mut order_data = simd_json::json!({
1352 "category": batch.category.as_str(),
1353 "symbol": order.symbol,
1354 "side": Self::map_order_side(order.side),
1355 "orderType": Self::map_order_type(order.order_type),
1356 "qty": order.quantity.to_string(),
1357 "price": order.price.map_or("0".to_string(), |p| p.to_string()),
1358 "timeInForce": Self::map_time_in_force(order.time_in_force),
1359 "orderLinkId": order.order_link_id.as_ref().map_or(String::new(), std::string::ToString::to_string),
1360 "reduceOnly": order.reduce_only.unwrap_or(false),
1361 "closeOnTrigger": order.close_on_trigger.unwrap_or(false),
1362 "positionIdx": order.position_idx.unwrap_or(0)
1363 });
1364
1365 if let Some(tp) = order.take_profit {
1367 order_data["takeProfit"] = simd_json::json!(tp.to_string());
1368 }
1369 if let Some(sl) = order.stop_loss {
1370 order_data["stopLoss"] = simd_json::json!(sl.to_string());
1371 }
1372 if let Some(tp_trigger_by) = &order.tp_trigger_by {
1373 order_data["tpTriggerBy"] = simd_json::json!(tp_trigger_by);
1374 }
1375 if let Some(sl_trigger_by) = &order.sl_trigger_by {
1376 order_data["slTriggerBy"] = simd_json::json!(sl_trigger_by);
1377 }
1378 if let Some(tp_limit_price) = order.tp_limit_price {
1379 order_data["tpLimitPrice"] = simd_json::json!(tp_limit_price.to_string());
1380 }
1381 if let Some(sl_limit_price) = order.sl_limit_price {
1382 order_data["slLimitPrice"] = simd_json::json!(sl_limit_price.to_string());
1383 }
1384
1385 batch_data.push(order_data);
1386 }
1387
1388 let message = BybitWsTradingMessage {
1389 req_id,
1390 header,
1391 op: "order.create-batch".into(),
1392 args: batch_data,
1393 };
1394
1395 let json_str = simd_json::to_string(&message)?;
1396 self.send_raw_message(json_str).await?;
1397
1398 Ok(())
1399 }
1400
1401 pub async fn cancel_order(&self, category: BybitCategory, order_id: &str) -> Result<()> {
1403 let req_id = id_generation::generate_exchange_request_id(
1404 "bybit",
1405 "cancel",
1406 self.request_counter.fetch_add(1, Ordering::Relaxed),
1407 );
1408 let timestamp = BybitAuth::get_timestamp();
1409 let header = self.auth.create_ws_trading_header(timestamp);
1410
1411 let instrument_id = self
1413 .instrument_registry
1414 .lookup_by_order_id(order_id)
1415 .unwrap_or_else(|| {
1416 let normalized_symbol = self
1418 .instrument_registry
1419 .normalize_symbol(order_id, Venue::Bybit);
1420 InstrumentId::new(normalized_symbol, Venue::Bybit)
1421 });
1422 self.track_request(
1423 req_id.clone(),
1424 instrument_id,
1425 category.clone(),
1426 RequestType::CancelOrder,
1427 )
1428 .await;
1429
1430 let cancel_data = simd_json::json!({
1431 "category": category.as_str(),
1432 "orderId": order_id
1433 });
1434
1435 let message = BybitWsTradingMessage {
1436 req_id,
1437 header,
1438 op: "order.cancel".into(),
1439 args: vec![cancel_data],
1440 };
1441
1442 let json_str = simd_json::to_string(&message)?;
1443 self.send_raw_message(json_str).await?;
1444
1445 Ok(())
1446 }
1447
1448 pub async fn cancel_batch_orders(
1450 &self,
1451 category: BybitCategory,
1452 order_ids: &[&str],
1453 ) -> Result<()> {
1454 let req_id = id_generation::generate_exchange_request_id(
1455 "bybit",
1456 "batch_cancel",
1457 self.request_counter.fetch_add(1, Ordering::Relaxed),
1458 );
1459 let timestamp = BybitAuth::get_timestamp();
1460 let header = self.auth.create_ws_trading_header(timestamp);
1461
1462 let instrument_id = if let Some(first_order_id) = order_ids.first() {
1464 self.instrument_registry
1465 .lookup_by_order_id(first_order_id)
1466 .unwrap_or_else(|| {
1467 let normalized_symbol = self
1468 .instrument_registry
1469 .normalize_symbol("BTCUSDT", Venue::Bybit);
1470 InstrumentId::new(&normalized_symbol, Venue::Bybit)
1471 })
1472 } else {
1473 let normalized_symbol = self
1475 .instrument_registry
1476 .normalize_symbol("BTCUSDT", Venue::Bybit);
1477 InstrumentId::new(&normalized_symbol, Venue::Bybit)
1478 };
1479 self.track_request(
1480 req_id.clone(),
1481 instrument_id,
1482 category.clone(),
1483 RequestType::BatchCancel,
1484 )
1485 .await;
1486
1487 let mut batch_data = Vec::new();
1488 for order_id in order_ids {
1489 let cancel_data = simd_json::json!({
1490 "category": category.as_str(),
1491 "orderId": order_id
1492 });
1493 batch_data.push(cancel_data);
1494 }
1495
1496 let message = BybitWsTradingMessage {
1497 req_id,
1498 header,
1499 op: "order.cancel-batch".into(),
1500 args: batch_data,
1501 };
1502
1503 let json_str = simd_json::to_string(&message)?;
1504 self.send_raw_message(json_str).await?;
1505
1506 Ok(())
1507 }
1508
1509 pub async fn amend_order(&self, request: &BybitAmendOrderRequest) -> Result<()> {
1511 let req_id = id_generation::generate_exchange_request_id(
1512 "bybit",
1513 "amend",
1514 self.request_counter.fetch_add(1, Ordering::Relaxed),
1515 );
1516 let timestamp = BybitAuth::get_timestamp();
1517 let header = self.auth.create_ws_trading_header(timestamp);
1518
1519 let mut amend_data = simd_json::json!({
1521 "category": request.category.as_str(),
1522 "symbol": request.symbol,
1523 });
1524
1525 if let Some(order_id) = &request.order_id {
1527 amend_data["orderId"] = simd_json::json!(order_id);
1528 } else if let Some(order_link_id) = &request.order_link_id {
1529 amend_data["orderLinkId"] = simd_json::json!(order_link_id);
1530 } else {
1531 bail!("Either orderId or orderLinkId must be provided");
1532 }
1533
1534 if let Some(qty) = request.qty {
1536 amend_data["qty"] = simd_json::json!(qty.to_string());
1537 }
1538 if let Some(price) = request.price {
1539 amend_data["price"] = simd_json::json!(price.to_string());
1540 }
1541 if let Some(tp) = request.take_profit {
1542 amend_data["takeProfit"] = simd_json::json!(tp.to_string());
1543 }
1544 if let Some(sl) = request.stop_loss {
1545 amend_data["stopLoss"] = simd_json::json!(sl.to_string());
1546 }
1547 if let Some(tp_trigger_by) = &request.tp_trigger_by {
1548 amend_data["tpTriggerBy"] = simd_json::json!(tp_trigger_by);
1549 }
1550 if let Some(sl_trigger_by) = &request.sl_trigger_by {
1551 amend_data["slTriggerBy"] = simd_json::json!(sl_trigger_by);
1552 }
1553 if let Some(trigger_price) = request.trigger_price {
1554 amend_data["triggerPrice"] = simd_json::json!(trigger_price.to_string());
1555 }
1556 if let Some(tp_limit_price) = request.tp_limit_price {
1557 amend_data["tpLimitPrice"] = simd_json::json!(tp_limit_price.to_string());
1558 }
1559 if let Some(sl_limit_price) = request.sl_limit_price {
1560 amend_data["slLimitPrice"] = simd_json::json!(sl_limit_price.to_string());
1561 }
1562
1563 let message = BybitWsTradingMessage {
1564 req_id,
1565 header,
1566 op: "order.amend".into(),
1567 args: vec![amend_data],
1568 };
1569
1570 let json_str = simd_json::to_string(&message)?;
1571 self.send_raw_message(json_str).await?;
1572
1573 Ok(())
1574 }
1575
1576 pub async fn cancel_all_orders_internal(&self, request: &BybitCancelAllRequest) -> Result<()> {
1578 let req_id = id_generation::generate_exchange_request_id(
1579 "bybit",
1580 "cancel_all",
1581 self.request_counter.fetch_add(1, Ordering::Relaxed),
1582 );
1583 let timestamp = BybitAuth::get_timestamp();
1584 let header = self.auth.create_ws_trading_header(timestamp);
1585
1586 let mut cancel_data = simd_json::json!({
1588 "category": request.category.as_str(),
1589 });
1590
1591 if let Some(symbol) = &request.symbol {
1593 cancel_data["symbol"] = simd_json::json!(symbol);
1594 }
1595 if let Some(base_coin) = &request.base_coin {
1596 cancel_data["baseCoin"] = simd_json::json!(base_coin);
1597 }
1598 if let Some(settle_coin) = &request.settle_coin {
1599 cancel_data["settleCoin"] = simd_json::json!(settle_coin);
1600 }
1601
1602 let message = BybitWsTradingMessage {
1603 req_id,
1604 header,
1605 op: "order.cancel-all".into(),
1606 args: vec![cancel_data],
1607 };
1608
1609 let json_str = simd_json::to_string(&message)?;
1610 self.send_raw_message(json_str).await?;
1611
1612 Ok(())
1613 }
1614
1615 async fn find_order_info(&self, order_id: &str) -> Option<(BybitCategory, SmartString)> {
1617 let pending = self.pending_requests.read().await;
1618 for (_, request) in pending.iter() {
1619 if request.instrument_id.symbol.as_str() == order_id {
1621 return Some((
1622 request.category.clone(),
1623 request.instrument_id.symbol.clone(),
1624 ));
1625 }
1626 }
1627 None
1628 }
1629
1630 pub async fn send_ping(&self) -> Result<()> {
1632 let ping_message = simd_json::json!({
1633 "req_id": format!("ping_{}", Uuid::new_v4()),
1634 "op": "ping"
1635 });
1636
1637 let json_str = simd_json::to_string(&ping_message)?;
1638 self.send_raw_message(json_str).await?;
1639
1640 Ok(())
1641 }
1642
1643 pub async fn disconnect(&self) -> Result<()> {
1645 self.is_connected.store(false, Ordering::Relaxed);
1646 self.is_authenticated.store(false, Ordering::Relaxed);
1647
1648 if let Some(task) = self.ping_task.write().await.take() {
1650 task.abort();
1651 }
1652 if let Some(task) = self.message_task.write().await.take() {
1653 task.abort();
1654 }
1655 if let Some(task) = self.cleanup_task.write().await.take() {
1656 task.abort();
1657 }
1658
1659 if let Some(sink) = self.ws_sink.write().await.take() {
1661 drop(sink);
1662 }
1663 if let Some(stream) = self.ws_stream.write().await.take() {
1664 drop(stream);
1665 }
1666
1667 {
1669 let mut health = self.connection_health.write();
1670 health.is_healthy = false;
1671 }
1672
1673 info!("Disconnected from Bybit WebSocket");
1674 Ok(())
1675 }
1676}
1677
1678#[async_trait]
1679impl crate::execution_engine::Exchange for BybitWebSocketTrader {
1680 fn venue(&self) -> Venue {
1681 Venue::Bybit
1682 }
1683
1684 async fn place_order(
1685 &self,
1686 order: Order,
1687 report_sender: Sender<crate::execution_engine::ExecutionReport>,
1688 ) -> Result<()> {
1689 if !self.is_connected().await {
1691 bail!("WebSocket not connected to Bybit");
1692 }
1693
1694 let order_id = order.id;
1696 let symbol = order.symbol.clone();
1697 let venue = order.venue;
1698 let quantity = order.quantity;
1699
1700 let bybit_order = self.convert_order(order)?;
1702
1703 let ack_report = ExecutionReport {
1705 id: Uuid::new_v4().to_string().into(),
1706 order_id: order_id.to_string().into(),
1707 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1708 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1709 instrument_id: InstrumentId::new(symbol, venue),
1710 status: OrderStatus::New,
1711 filled_quantity: Decimal::ZERO,
1712 remaining_quantity: quantity,
1713 execution_price: None,
1714 reject_reason: None,
1715 exchange_execution_id: None,
1716 is_final: false,
1717 };
1718
1719 if let Err(e) = report_sender.send_async(ack_report).await {
1720 warn!("Failed to send order placement acknowledgment: {e}");
1721 }
1722
1723 self.create_order(&bybit_order).await
1725 }
1726
1727 async fn cancel_order(
1728 &self,
1729 order_id: SmartString,
1730 report_sender: Sender<crate::execution_engine::ExecutionReport>,
1731 ) -> Result<()> {
1732 if !self.is_connected().await {
1734 bail!("WebSocket not connected to Bybit");
1735 }
1736
1737 let cancel_ack_report = ExecutionReport {
1739 id: Uuid::new_v4().to_string().into(),
1740 order_id: order_id.clone(),
1741 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1742 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1743 instrument_id: self
1744 .instrument_registry
1745 .lookup_by_order_id(&order_id)
1746 .unwrap_or_else(|| {
1747 let normalized_symbol = self
1749 .instrument_registry
1750 .normalize_symbol(&order_id, Venue::Bybit);
1751 InstrumentId::new(normalized_symbol, Venue::Bybit)
1752 }),
1753 status: OrderStatus::Pending,
1754 filled_quantity: Decimal::ZERO,
1755 remaining_quantity: Decimal::ZERO,
1756 execution_price: None,
1757 reject_reason: None,
1758 exchange_execution_id: None,
1759 is_final: false,
1760 };
1761
1762 if let Err(e) = report_sender.send_async(cancel_ack_report).await {
1763 warn!("Failed to send order cancellation acknowledgment: {e}");
1764 }
1765
1766 let category = self
1769 .find_order_category(&order_id)
1770 .await
1771 .unwrap_or(BybitCategory::Spot);
1772
1773 self.cancel_order(category, &order_id).await
1774 }
1775
1776 async fn modify_order(
1777 &self,
1778 order_id: SmartString,
1779 new_price: Option<Decimal>,
1780 new_quantity: Option<Decimal>,
1781 report_sender: Sender<crate::execution_engine::ExecutionReport>,
1782 ) -> Result<()> {
1783 if !self.is_connected().await {
1785 bail!("WebSocket not connected to Bybit");
1786 }
1787
1788 let modify_ack_report = ExecutionReport {
1790 id: Uuid::new_v4().to_string().into(),
1791 order_id: order_id.clone(),
1792 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1793 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1794 instrument_id: self
1795 .instrument_registry
1796 .lookup_by_order_id(&order_id)
1797 .unwrap_or_else(|| {
1798 let normalized_symbol = self
1800 .instrument_registry
1801 .normalize_symbol(&order_id, Venue::Bybit);
1802 InstrumentId::new(normalized_symbol, Venue::Bybit)
1803 }),
1804 status: OrderStatus::Pending,
1805 filled_quantity: Decimal::ZERO,
1806 remaining_quantity: new_quantity.unwrap_or(Decimal::ZERO),
1807 execution_price: new_price,
1808 reject_reason: None,
1809 exchange_execution_id: None,
1810 is_final: false,
1811 };
1812
1813 if let Err(e) = report_sender.send_async(modify_ack_report).await {
1814 warn!("Failed to send order modification acknowledgment: {e}");
1815 }
1816
1817 let (category, symbol) = self
1819 .find_order_info(&order_id)
1820 .await
1821 .ok_or_else(|| anyhow::anyhow!("Order not found in tracking"))?;
1822
1823 let amend_request = BybitAmendOrderRequest {
1825 category,
1826 symbol,
1827 order_id: Some(order_id.clone()),
1828 order_link_id: None,
1829 qty: new_quantity,
1830 price: new_price,
1831 take_profit: None,
1832 stop_loss: None,
1833 tp_trigger_by: None,
1834 sl_trigger_by: None,
1835 trigger_price: None,
1836 tp_limit_price: None,
1837 sl_limit_price: None,
1838 };
1839
1840 self.amend_order(&amend_request).await
1841 }
1842
1843 async fn cancel_all_orders(
1844 &self,
1845 instrument_id: Option<InstrumentId>,
1846 report_sender: Sender<crate::execution_engine::ExecutionReport>,
1847 ) -> Result<()> {
1848 if !self.is_connected().await {
1850 bail!("WebSocket not connected to Bybit");
1851 }
1852
1853 let instrument_id_for_report = instrument_id.clone();
1855
1856 let cancel_all_ack_report = ExecutionReport {
1859 id: Uuid::new_v4().to_string().into(),
1860 order_id: "CANCEL_ALL".into(),
1861 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1862 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
1863 instrument_id: instrument_id_for_report
1864 .unwrap_or_else(|| InstrumentId::new("ALL", rusty_model::venues::Venue::Bybit)),
1865 status: OrderStatus::New, filled_quantity: Decimal::ZERO,
1867 remaining_quantity: Decimal::ZERO,
1868 execution_price: None,
1869 reject_reason: None,
1870 exchange_execution_id: None,
1871 is_final: true, };
1873
1874 if let Err(e) = report_sender.send_async(cancel_all_ack_report).await {
1875 warn!("Failed to send cancel-all acknowledgment: {e}");
1876 }
1877
1878 let cancel_request = BybitCancelAllRequest {
1880 category: BybitCategory::Spot, symbol: instrument_id.map(|id| id.symbol),
1882 base_coin: None,
1883 settle_coin: None,
1884 };
1885
1886 self.cancel_all_orders_internal(&cancel_request).await
1887 }
1888
1889 async fn get_order_status(&self, order_id: &str) -> Result<OrderStatus> {
1890 bail!("Order status query not supported via WebSocket - use REST API")
1892 }
1893
1894 async fn connect(
1895 &self,
1896 report_sender: Sender<crate::execution_engine::ExecutionReport>,
1897 ) -> Result<()> {
1898 Self::connect(self, report_sender).await
1901 }
1902
1903 async fn disconnect(&self) -> Result<()> {
1904 self.disconnect().await
1905 }
1906
1907 async fn is_connected(&self) -> bool {
1908 self.is_connected.load(Ordering::Relaxed)
1909 }
1910
1911 async fn get_instruments(&self) -> Result<SmallVec<[InstrumentId; 32]>> {
1912 bail!("Instrument discovery not supported via WebSocket - use REST API")
1914 }
1915
1916 async fn send_fix_message(&self, _message: Vec<u8>) -> Result<()> {
1917 anyhow::bail!("FIX protocol not supported on Bybit WebSocket")
1918 }
1919
1920 async fn receive_fix_message(&self) -> Result<Vec<u8>> {
1921 anyhow::bail!("FIX protocol not supported on Bybit WebSocket")
1922 }
1923}
1924
1925impl BybitWebSocketTrader {
1927 async fn start_cleanup_task(&self) {
1929 let pending_requests = self.pending_requests.clone();
1930 let clock = self.clock.clone();
1931
1932 let task = tokio::spawn(async move {
1933 let mut interval = tokio::time::interval(Duration::from_secs(60)); loop {
1936 interval.tick().await;
1937
1938 let now = clock.raw();
1939 let max_age_ns = 5 * 60 * 1_000_000_000; let mut pending = pending_requests.write().await;
1942 let count_before = pending.len();
1943 pending.retain(|_, request| now.saturating_sub(request.timestamp) < max_age_ns);
1944 let count_after = pending.len();
1945
1946 if count_before > count_after {
1947 debug!(
1948 "Cleaned up {} old pending requests",
1949 count_before - count_after
1950 );
1951 }
1952 }
1953 });
1954
1955 *self.cleanup_task.write().await = Some(task);
1956 }
1957
1958 fn convert_order(&self, order: Order) -> Result<BybitOrderRequest> {
1960 Ok(BybitOrderRequest {
1961 symbol: order.symbol.clone(),
1962 side: order.side,
1963 order_type: order.order_type,
1964 quantity: order.quantity,
1965 price: order.price,
1966 time_in_force: TimeInForce::GTC, category: BybitCategory::Spot, order_link_id: Some(order.id.to_string().into()),
1969 reduce_only: None,
1970 close_on_trigger: None,
1971 position_idx: None,
1972 take_profit: None,
1974 stop_loss: None,
1975 tp_trigger_by: None,
1976 sl_trigger_by: None,
1977 tp_limit_price: None,
1978 sl_limit_price: None,
1979 })
1980 }
1981}
1982
1983#[cfg(test)]
1984mod tests {
1985 use super::*;
1986 use crate::execution_engine::Exchange;
1987 use crate::instrument_registry::create_shared_registry;
1988 use rust_decimal::Decimal;
1989 use rusty_model::enums::OrderType;
1990 use rusty_model::trading_order::Order;
1991 use rusty_model::types::ClientId;
1992 use std::str::FromStr;
1993 use std::sync::Arc;
1994
1995 #[tokio::test]
1996 async fn test_exchange_trait_implementation() {
1997 let api_key = "test_api_key".into();
1999 let api_secret = "test_api_secret".into();
2000 let auth = Arc::new(BybitAuth::new(api_key, api_secret));
2001
2002 let registry = create_shared_registry();
2004 let trader = BybitWebSocketTrader::new(auth, false, registry);
2005
2006 assert_eq!(trader.venue(), Venue::Bybit);
2008
2009 assert!(!trader.is_connected().await);
2011
2012 let test_order = Order::new(
2014 Venue::Bybit,
2015 "BTCUSDT",
2016 OrderSide::Buy,
2017 OrderType::Limit,
2018 Decimal::from_str("0.001").unwrap(),
2019 Some(Decimal::from_str("50000").unwrap()),
2020 ClientId::new("test_client"),
2021 );
2022
2023 let (report_tx, _report_rx) = flume::bounded(100);
2025 let result = trader.place_order(test_order, report_tx).await;
2026 assert!(result.is_err());
2027 assert!(result.unwrap_err().to_string().contains("not connected"));
2028 }
2029
2030 #[test]
2031 fn test_order_status_mapping() {
2032 assert_eq!(
2033 BybitWebSocketTrader::map_order_status("New"),
2034 OrderStatus::New
2035 );
2036 assert_eq!(
2037 BybitWebSocketTrader::map_order_status("PartiallyFilled"),
2038 OrderStatus::PartiallyFilled
2039 );
2040 assert_eq!(
2041 BybitWebSocketTrader::map_order_status("Filled"),
2042 OrderStatus::Filled
2043 );
2044 assert_eq!(
2045 BybitWebSocketTrader::map_order_status("Cancelled"),
2046 OrderStatus::Cancelled
2047 );
2048 assert_eq!(
2049 BybitWebSocketTrader::map_order_status("Rejected"),
2050 OrderStatus::Rejected
2051 );
2052 assert_eq!(
2053 BybitWebSocketTrader::map_order_status("Unknown"),
2054 OrderStatus::Unknown
2055 );
2056 }
2057
2058 #[test]
2059 fn test_order_side_mapping() {
2060 assert_eq!(BybitWebSocketTrader::map_order_side(OrderSide::Buy), "Buy");
2061 assert_eq!(
2062 BybitWebSocketTrader::map_order_side(OrderSide::Sell),
2063 "Sell"
2064 );
2065 }
2066
2067 #[test]
2068 fn test_order_type_mapping() {
2069 assert_eq!(
2070 BybitWebSocketTrader::map_order_type(OrderType::Market),
2071 "Market"
2072 );
2073 assert_eq!(
2074 BybitWebSocketTrader::map_order_type(OrderType::Limit),
2075 "Limit"
2076 );
2077 }
2078
2079 #[test]
2080 fn test_time_in_force_mapping() {
2081 assert_eq!(
2082 BybitWebSocketTrader::map_time_in_force(TimeInForce::GTC),
2083 "GTC"
2084 );
2085 assert_eq!(
2086 BybitWebSocketTrader::map_time_in_force(TimeInForce::IOC),
2087 "IOC"
2088 );
2089 assert_eq!(
2090 BybitWebSocketTrader::map_time_in_force(TimeInForce::FOK),
2091 "FOK"
2092 );
2093 assert_eq!(
2094 BybitWebSocketTrader::map_time_in_force(TimeInForce::GTX),
2095 "PostOnly"
2096 );
2097 }
2098
2099 #[test]
2100 fn test_category_conversion() {
2101 assert_eq!(BybitCategory::Spot.as_str(), "spot");
2102 assert_eq!(BybitCategory::Linear.as_str(), "linear");
2103 assert_eq!(BybitCategory::Inverse.as_str(), "inverse");
2104 assert_eq!(BybitCategory::Options.as_str(), "option");
2105 }
2106
2107 #[tokio::test]
2108 async fn test_request_tracking() {
2109 let api_key = "test_api_key".into();
2111 let api_secret = "test_api_secret".into();
2112 let auth = Arc::new(BybitAuth::new(api_key, api_secret));
2113
2114 let registry = create_shared_registry();
2116 let trader = BybitWebSocketTrader::new(auth, false, registry);
2117
2118 let req_id: SmartString = "test_request_123".into();
2120 let instrument_id = InstrumentId::new("BTCUSDT", Venue::Bybit);
2121 let category = BybitCategory::Spot;
2122
2123 trader
2124 .track_request(
2125 req_id.clone(),
2126 instrument_id.clone(),
2127 category.clone(),
2128 RequestType::CreateOrder,
2129 )
2130 .await;
2131
2132 let pending = trader.pending_requests.read().await;
2134 assert!(pending.contains_key(&req_id));
2135
2136 let request = pending.get(&req_id).unwrap();
2137 assert_eq!(request.instrument_id, instrument_id);
2138 assert_eq!(request.category, category);
2139 assert_eq!(request.request_type, RequestType::CreateOrder);
2140
2141 drop(pending);
2143 let found_category = trader.find_order_category("some_order_id").await;
2144 assert_eq!(found_category, Some(BybitCategory::Spot));
2145 }
2146
2147 #[tokio::test]
2148 async fn test_request_cleanup() {
2149 let api_key = "test_api_key".into();
2151 let api_secret = "test_api_secret".into();
2152 let auth = Arc::new(BybitAuth::new(api_key, api_secret));
2153
2154 let registry = create_shared_registry();
2156 let trader = BybitWebSocketTrader::new(auth, false, registry);
2157
2158 let req_id: SmartString = "test_request_456".into();
2160 let instrument_id = InstrumentId::new("ETHUSDT", Venue::Bybit);
2161
2162 trader
2163 .track_request(
2164 req_id.clone(),
2165 instrument_id,
2166 BybitCategory::Linear,
2167 RequestType::CancelOrder,
2168 )
2169 .await;
2170
2171 {
2173 let pending = trader.pending_requests.read().await;
2174 assert_eq!(pending.len(), 1);
2175 }
2176
2177 trader.cleanup_old_requests().await;
2179
2180 {
2182 let pending = trader.pending_requests.read().await;
2183 assert_eq!(pending.len(), 1);
2184 }
2185 }
2186
2187 #[test]
2188 fn test_batch_order_request() {
2189 let orders = vec![
2190 BybitOrderRequest {
2191 symbol: "BTCUSDT".into(),
2192 side: OrderSide::Buy,
2193 order_type: OrderType::Limit,
2194 quantity: Decimal::from(1),
2195 price: Some(Decimal::from(50000)),
2196 time_in_force: TimeInForce::GTC,
2197 category: BybitCategory::Spot,
2198 order_link_id: Some("test_1".into()),
2199 reduce_only: None,
2200 close_on_trigger: None,
2201 position_idx: None,
2202 take_profit: None,
2203 stop_loss: None,
2204 tp_trigger_by: None,
2205 sl_trigger_by: None,
2206 tp_limit_price: None,
2207 sl_limit_price: None,
2208 },
2209 BybitOrderRequest {
2210 symbol: "ETHUSDT".into(),
2211 side: OrderSide::Sell,
2212 order_type: OrderType::Market,
2213 quantity: Decimal::from(2),
2214 price: None,
2215 time_in_force: TimeInForce::IOC,
2216 category: BybitCategory::Spot,
2217 order_link_id: Some("test_2".into()),
2218 reduce_only: Some(false),
2219 close_on_trigger: Some(false),
2220 position_idx: Some(0),
2221 take_profit: None,
2222 stop_loss: None,
2223 tp_trigger_by: None,
2224 sl_trigger_by: None,
2225 tp_limit_price: None,
2226 sl_limit_price: None,
2227 },
2228 ];
2229
2230 let batch = BybitBatchOrderRequest {
2231 category: BybitCategory::Spot,
2232 orders,
2233 };
2234
2235 assert_eq!(batch.orders.len(), 2);
2236 assert_eq!(batch.category, BybitCategory::Spot);
2237 }
2238}