1use anyhow::{Result, anyhow, bail};
2use async_trait::async_trait;
3use flume::Sender;
4use rust_decimal::Decimal;
5use simd_json::prelude::*;
6use simd_json::{json, value::owned::Value as JsonValue};
7use std::collections::VecDeque;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::time::Duration;
11
12use futures::{SinkExt, StreamExt};
13use log::{debug, error, info, warn};
14use parking_lot::RwLock;
15use quanta::Clock;
16use rusty_common::collections::FxHashMap;
17use rusty_common::websocket::Message;
18use rusty_common::websocket::connector::{WebSocketSink, WebSocketStream};
19use rusty_common::{SmartString, time};
20use rusty_model::{
21 enums::{OrderSide, OrderStatus, OrderType, TimeInForce},
22 instruments::InstrumentId,
23 trading_order::Order,
24 venues::Venue,
25};
26use smallvec::SmallVec;
27use tokio::sync::RwLock as AsyncRwLock;
28use tokio::task::JoinHandle;
29use tokio::time::interval;
30use uuid::Uuid;
31
32use crate::exchanges::bybit_rest::{BybitAccountInfo, BybitAccountType, BybitRestClient};
34use crate::execution_engine::{Exchange, ExecutionReport};
35use rusty_common::auth::exchanges::bybit::BybitAuth;
36
37const BYBIT_WS_TRADE_URL: &str = "wss://stream.bybit.com/v5/trade";
59const BYBIT_WS_TRADE_TESTNET_URL: &str = "wss://stream-testnet.bybit.com/v5/trade";
60const BYBIT_WS_PRIVATE_URL: &str = "wss://stream.bybit.com/v5/private";
61const BYBIT_WS_PRIVATE_TESTNET_URL: &str = "wss://stream-testnet.bybit.com/v5/private";
62
63const PING_INTERVAL_SECONDS: u64 = 30;
65const PONG_TIMEOUT_SECONDS: u64 = 10;
66const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024; const MAX_PENDING_REQUESTS: usize = 10000;
68const MAX_RECONNECTION_ATTEMPTS: u32 = 10;
69
70const MAX_REQUESTS_PER_SECOND: u32 = 3000; const RATE_LIMIT_WINDOW_MS: u64 = 1_000; #[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum BybitCategory {
77 Spot,
79 Linear,
81 Inverse,
83 Option,
85}
86
87impl BybitCategory {
88 #[must_use]
90 pub fn parse_category(s: &str) -> Option<Self> {
91 match s {
92 "spot" => Some(Self::Spot),
93 "linear" => Some(Self::Linear),
94 "inverse" => Some(Self::Inverse),
95 "option" => Some(Self::Option),
96 _ => None,
97 }
98 }
99
100 #[must_use]
102 pub const fn as_str(&self) -> &'static str {
103 match self {
104 Self::Spot => "spot",
105 Self::Linear => "linear",
106 Self::Inverse => "inverse",
107 Self::Option => "option",
108 }
109 }
110
111 #[must_use]
113 pub const fn max_batch_size(&self) -> usize {
114 match self {
115 Self::Spot => 10,
116 Self::Linear => 20,
117 Self::Inverse => 20,
118 Self::Option => 20,
119 }
120 }
121}
122
123#[derive(Debug, Clone)]
125pub struct InstrumentInfo {
126 pub symbol: SmartString,
128 pub category: BybitCategory,
130 pub status: SmartString,
132 pub base_coin: SmartString,
134 pub quote_coin: SmartString,
136 pub contract_type: Option<SmartString>,
138 pub cached_at: u64, }
141
142impl InstrumentInfo {
143 #[must_use]
145 pub fn new(
146 symbol: SmartString,
147 category: BybitCategory,
148 status: SmartString,
149 base_coin: SmartString,
150 quote_coin: SmartString,
151 contract_type: Option<SmartString>,
152 ) -> Self {
153 Self {
154 symbol,
155 category,
156 status,
157 base_coin,
158 quote_coin,
159 contract_type,
160 cached_at: rusty_common::time::get_epoch_timestamp_ns(),
161 }
162 }
163
164 #[must_use]
166 pub fn is_expired(&self) -> bool {
167 let now = rusty_common::time::get_epoch_timestamp_ns();
168 let one_hour_ns = 3600 * 1_000_000_000;
169 now.saturating_sub(self.cached_at) > one_hour_ns
170 }
171}
172
173const MAX_BATCH_SPOT: usize = 10;
175const MAX_BATCH_LINEAR: usize = 20;
176const MAX_BATCH_INVERSE: usize = 20;
177const MAX_BATCH_OPTION: usize = 20;
178
179const CATEGORY_SPOT: &str = "spot";
181const CATEGORY_LINEAR: &str = "linear";
182const CATEGORY_INVERSE: &str = "inverse";
183const CATEGORY_OPTION: &str = "option";
184
185#[derive(Debug, Clone)]
187pub struct ConnectionHealth {
188 pub is_healthy: bool,
190 pub last_ping_sent: u64,
192 pub last_pong_received: u64,
194 pub pending_pings: u32,
196 pub reconnection_attempts: u32,
198 pub messages_sent: u64,
200 pub messages_received: u64,
202}
203
204impl ConnectionHealth {
205 const fn new() -> Self {
206 Self {
207 is_healthy: false,
208 last_ping_sent: 0,
209 last_pong_received: 0,
210 pending_pings: 0,
211 reconnection_attempts: 0,
212 messages_sent: 0,
213 messages_received: 0,
214 }
215 }
216}
217
218struct RateLimiter {
220 request_times: VecDeque<u64>,
221 clock: Clock,
222}
223
224impl RateLimiter {
225 const fn new(clock: Clock) -> Self {
226 Self {
227 request_times: VecDeque::new(),
228 clock,
229 }
230 }
231
232 fn cleanup_expired(&mut self) {
233 let now = self.clock.raw() / 1_000_000; let window_start = now.saturating_sub(RATE_LIMIT_WINDOW_MS);
235
236 while let Some(&front) = self.request_times.front() {
237 if front < window_start {
238 self.request_times.pop_front();
239 } else {
240 break;
241 }
242 }
243 }
244
245 fn can_send_request(&mut self) -> bool {
246 self.cleanup_expired();
247 self.request_times.len() < MAX_REQUESTS_PER_SECOND as usize
248 }
249
250 fn record_request(&mut self) {
251 let now = self.clock.raw() / 1_000_000;
252 self.request_times.push_back(now);
253 }
254}
255
256struct PendingRequest {
258 op_type: SmartString,
259 timestamp: u64,
260 report_sender: Sender<ExecutionReport>,
261}
262
263pub struct BybitWebsocketTrading {
268 auth: Arc<BybitAuth>,
270
271 rest_client: Option<Arc<BybitRestClient>>,
273
274 trade_sink: Arc<AsyncRwLock<Option<WebSocketSink>>>,
276 trade_stream: Arc<AsyncRwLock<Option<WebSocketStream>>>,
277 private_sink: Arc<AsyncRwLock<Option<WebSocketSink>>>,
278 private_stream: Arc<AsyncRwLock<Option<WebSocketStream>>>,
279
280 account_type: SmartString,
282 testnet: bool,
283
284 account_info: Arc<RwLock<Option<BybitAccountInfo>>>,
286
287 is_connected: Arc<AtomicBool>,
289 is_authenticated: Arc<AtomicBool>,
290
291 clock: Clock,
293 health: Arc<RwLock<ConnectionHealth>>,
294 rate_limiter: Arc<RwLock<RateLimiter>>,
295
296 pending_requests: Arc<RwLock<FxHashMap<SmartString, PendingRequest>>>,
298
299 order_symbol_map: Arc<RwLock<FxHashMap<SmartString, SmartString>>>,
301
302 instrument_cache: Arc<RwLock<FxHashMap<SmartString, InstrumentInfo>>>,
304
305 ping_handle: Arc<RwLock<Option<JoinHandle<()>>>>,
307 message_handle: Arc<RwLock<Option<JoinHandle<()>>>>,
308
309 report_sender: Arc<RwLock<Option<Sender<ExecutionReport>>>>,
311}
312
313impl BybitWebsocketTrading {
314 pub fn new(auth: Arc<BybitAuth>, account_type: SmartString, testnet: bool) -> Self {
316 let clock = Clock::new();
317
318 Self {
319 auth,
320 rest_client: None,
321 trade_sink: Arc::new(AsyncRwLock::new(None)),
322 trade_stream: Arc::new(AsyncRwLock::new(None)),
323 private_sink: Arc::new(AsyncRwLock::new(None)),
324 private_stream: Arc::new(AsyncRwLock::new(None)),
325 account_type,
326 testnet,
327 account_info: Arc::new(RwLock::new(None)),
328 is_connected: Arc::new(AtomicBool::new(false)),
329 is_authenticated: Arc::new(AtomicBool::new(false)),
330 clock: clock.clone(),
331 health: Arc::new(RwLock::new(ConnectionHealth::new())),
332 rate_limiter: Arc::new(RwLock::new(RateLimiter::new(clock))),
333 pending_requests: Arc::new(RwLock::new(FxHashMap::default())),
334 order_symbol_map: Arc::new(RwLock::new(FxHashMap::default())),
335 instrument_cache: Arc::new(RwLock::new(FxHashMap::default())),
336 ping_handle: Arc::new(RwLock::new(None)),
337 message_handle: Arc::new(RwLock::new(None)),
338 report_sender: Arc::new(RwLock::new(None)),
339 }
340 }
341
342 #[must_use]
344 pub fn get_connection_health(&self) -> ConnectionHealth {
345 self.health.read().clone()
346 }
347
348 pub fn set_rest_client(&mut self, rest_client: Arc<BybitRestClient>) {
350 self.rest_client = Some(rest_client);
351 }
352
353 pub async fn get_account_info(&self) -> Result<BybitAccountInfo> {
355 if let Some(cached_info) = self.account_info.read().as_ref() {
357 return Ok(cached_info.clone());
358 }
359
360 let Some(rest_client) = &self.rest_client else {
362 bail!("REST client not available - cannot fetch account info");
363 };
364
365 let account_info = rest_client
366 .get_account_info()
367 .await
368 .map_err(|e| anyhow!("Failed to get account info: {e}"))?;
369
370 *self.account_info.write() = Some(account_info.clone());
372
373 Ok(account_info)
374 }
375
376 pub async fn supports_unified_features(&self) -> Result<bool> {
378 let account_info = self.get_account_info().await?;
379 Ok(account_info.supports_unified_features())
380 }
381
382 pub async fn supports_uta2_features(&self) -> Result<bool> {
384 let account_info = self.get_account_info().await?;
385 Ok(account_info.supports_uta2_features())
386 }
387
388 async fn validate_order_for_account(&self, order: &Order, category: &str) -> Result<()> {
390 if self.rest_client.is_none() {
392 debug!("REST client not available - skipping V5 account validation");
393 return Ok(());
394 }
395
396 let account_info = match self.get_account_info().await {
397 Ok(info) => info,
398 Err(e) => {
399 warn!("Failed to get account info for validation: {e} - proceeding with order");
400 return Ok(());
401 }
402 };
403
404 let account_type = account_info.get_account_type();
405
406 if let Some(account_type) = account_type {
407 if account_type.supports_uta2_features() && category == "inverse" {
409 debug!("UTA 2.0 account: Using one-way mode for inverse futures");
412 }
413
414 if account_type == BybitAccountType::Classic {
416 if category == "spot" {
418 debug!("Classic account spot trading via WebSocket");
419 }
420 }
421
422 debug!("V5 Account validation passed for account type: {account_type:?}");
423 }
424
425 Ok(())
426 }
427
428 fn map_v5_order_status(status_str: &str) -> OrderStatus {
431 match status_str {
432 "New" => OrderStatus::New,
434 "PartiallyFilled" => OrderStatus::PartiallyFilled,
435 "Filled" => OrderStatus::Filled,
436 "Cancelled" => OrderStatus::Cancelled,
437 "Rejected" => OrderStatus::Rejected,
438
439 "PendingCancel" => OrderStatus::PendingCancel,
441 "Untriggered" => OrderStatus::Pending, "Triggered" => OrderStatus::New, "Deactivated" => OrderStatus::Cancelled, "Active" => OrderStatus::Open, "PENDING" => OrderStatus::Pending, "OPEN" => OrderStatus::Open, "PARTIALLY_FILLED" => OrderStatus::PartiallyFilled, "ORDER_FILLED" => OrderStatus::Filled, "CANCELED" => OrderStatus::Cancelled, "ORDER_FAILED" => OrderStatus::Rejected, "Created" => OrderStatus::New, "Expired" => OrderStatus::Expired, "Unknown" => OrderStatus::Unknown,
460 "" => OrderStatus::Unknown, _ => {
464 warn!("Unknown Bybit V5 order status: '{status_str}' - mapping to Unknown");
465 OrderStatus::Unknown
466 }
467 }
468 }
469
470 const fn map_order_status_to_exec_type(status: OrderStatus) -> &'static str {
472 match status {
473 OrderStatus::New => "New",
474 OrderStatus::Open => "New", OrderStatus::PartiallyFilled => "Trade",
476 OrderStatus::Filled => "Trade",
477 OrderStatus::Cancelled => "Canceled",
478 OrderStatus::Rejected => "Rejected",
479 OrderStatus::Expired => "Expired",
480 OrderStatus::Pending => "PendingNew",
481 OrderStatus::PendingCancel => "PendingCancel",
482 OrderStatus::Unknown => "Unknown",
483 }
484 }
485
486 pub async fn send_ping(&self) -> Result<()> {
488 let req_id = Uuid::new_v4().to_string();
489 let ping_msg = json!({
490 "req_id": req_id,
491 "op": "ping"
492 })
493 .to_string();
494
495 if let Some(sink) = &mut *self.trade_sink.write().await {
496 sink.send(Message::text(ping_msg).to_frame_view()).await?;
497
498 let mut health = self.health.write();
499 health.last_ping_sent = self.clock.raw();
500 health.pending_pings += 1;
501 health.messages_sent += 1;
502 } else {
503 bail!("WebSocket trade connection not established");
504 }
505
506 Ok(())
507 }
508
509 async fn authenticate(&self) -> Result<()> {
511 let expires = time::get_timestamp_ms() + 5000;
512 let signature = self.auth.generate_ws_signature(expires)?;
513
514 let auth_msg = json!({
515 "op": "auth",
516 "args": [
517 self.auth.api_key(),
518 expires,
519 signature
520 ]
521 })
522 .to_string();
523
524 if let Some(sink) = &mut *self.trade_sink.write().await {
526 sink.send(Message::text(auth_msg.clone()).to_frame_view())
527 .await?;
528 debug!("Sent auth message to trade connection");
529 }
530
531 if let Some(sink) = &mut *self.private_sink.write().await {
533 sink.send(Message::text(auth_msg).to_frame_view()).await?;
534 debug!("Sent auth message to private connection");
535
536 let subscribe_msg = json!({
538 "op": "subscribe",
539 "args": ["order"]
540 })
541 .to_string();
542
543 sink.send(Message::text(subscribe_msg).to_frame_view())
544 .await?;
545 debug!("Subscribed to order stream");
546 }
547
548 Ok(())
549 }
550
551 async fn connect_internal(&self, report_sender: Sender<ExecutionReport>) -> Result<()> {
553 *self.report_sender.write() = Some(report_sender.clone());
555
556 let trade_url = if self.testnet {
557 BYBIT_WS_TRADE_TESTNET_URL
558 } else {
559 BYBIT_WS_TRADE_URL
560 };
561
562 let private_url = if self.testnet {
563 BYBIT_WS_PRIVATE_TESTNET_URL
564 } else {
565 BYBIT_WS_PRIVATE_URL
566 };
567
568 use yawc::WebSocket;
570 let trade_parsed_url = trade_url
571 .parse()
572 .map_err(|e| anyhow::anyhow!("Invalid trade URL: {}", e))?;
573 let trade_ws = WebSocket::connect(trade_parsed_url)
574 .await
575 .map_err(|e| anyhow::anyhow!("Trade connection failed: {}", e))?;
576 let (trade_sink, trade_stream) = trade_ws.split();
577 *self.trade_sink.write().await = Some(trade_sink);
578 *self.trade_stream.write().await = Some(trade_stream);
579
580 let private_parsed_url = private_url
582 .parse()
583 .map_err(|e| anyhow::anyhow!("Invalid private URL: {}", e))?;
584 let private_ws = WebSocket::connect(private_parsed_url)
585 .await
586 .map_err(|e| anyhow::anyhow!("Private connection failed: {}", e))?;
587 let (private_sink, private_stream) = private_ws.split();
588 *self.private_sink.write().await = Some(private_sink);
589 *self.private_stream.write().await = Some(private_stream);
590
591 self.is_connected.store(true, Ordering::Relaxed);
592
593 self.authenticate().await?;
595
596 self.start_ping_task();
598
599 self.start_message_processing();
601
602 info!("Bybit WebSocket trading connected and authenticated");
603 Ok(())
604 }
605
606 fn start_ping_task(&self) {
608 let trade_sink = self.trade_sink.clone();
609 let health = self.health.clone();
610 let clock = self.clock.clone();
611 let is_connected = self.is_connected.clone();
612
613 let handle = tokio::spawn(async move {
614 let mut interval = interval(Duration::from_secs(PING_INTERVAL_SECONDS));
615
616 loop {
617 interval.tick().await;
618
619 if !is_connected.load(Ordering::Relaxed) {
620 break;
621 }
622
623 let should_send = {
625 let h = health.read();
626 let now = clock.raw();
627 let time_since_last_ping = (now - h.last_ping_sent) / 1_000_000_000; time_since_last_ping >= PING_INTERVAL_SECONDS && h.pending_pings < 3
630 };
631
632 if should_send {
633 let req_id = Uuid::new_v4().to_string();
634 let ping_msg = json!({
635 "req_id": req_id,
636 "op": "ping"
637 })
638 .to_string();
639
640 if let Some(sink) = &mut *trade_sink.write().await {
641 if let Err(e) = sink.send(Message::text(ping_msg).to_frame_view()).await {
642 error!("Failed to send ping: {e}");
643 is_connected.store(false, Ordering::Relaxed);
644 break;
645 }
646
647 let mut h = health.write();
648 h.last_ping_sent = clock.raw();
649 h.pending_pings += 1;
650 h.messages_sent += 1;
651 }
652 }
653
654 {
656 let h = health.read();
657 let now = clock.raw();
658 let time_since_last_pong = (now - h.last_pong_received) / 1_000_000_000;
659
660 if h.pending_pings > 0 && time_since_last_pong > PONG_TIMEOUT_SECONDS {
661 warn!("Pong timeout detected, connection may be dead");
662 is_connected.store(false, Ordering::Relaxed);
663 break;
664 }
665 }
666 }
667
668 debug!("Ping task terminated");
669 });
670
671 *self.ping_handle.write() = Some(handle);
672 }
673
674 fn start_message_processing(&self) {
676 let trade_stream = self.trade_stream.clone();
677 let private_stream = self.private_stream.clone();
678 let health = self.health.clone();
679 let clock = self.clock.clone();
680 let is_authenticated = self.is_authenticated.clone();
681 let is_connected = self.is_connected.clone();
682 let pending_requests = self.pending_requests.clone();
683 let report_sender = self.report_sender.clone();
684 let order_symbol_map = self.order_symbol_map.clone();
685
686 let handle = tokio::spawn(async move {
687 loop {
688 if !is_connected.load(Ordering::Relaxed) {
689 break;
690 }
691
692 if let Some(stream) = &mut *trade_stream.write().await {
694 if let Some(frame) = stream.next().await {
695 let msg = Message::from_frame_view(frame);
696 match msg {
697 Message::Text(text) => {
698 health.write().messages_received += 1;
699
700 if let Err(e) = Self::process_trade_message(
701 &text,
702 &health,
703 &clock,
704 &is_authenticated,
705 &pending_requests,
706 &report_sender,
707 &order_symbol_map,
708 )
709 .await
710 {
711 error!("Failed to process trade message: {e}");
712 }
713 }
714 Message::Binary(_) => {
715 warn!("Received unexpected binary message");
716 }
717 _ => {}
718 }
719 } else {
720 warn!("Trade stream closed");
721 is_connected.store(false, Ordering::Relaxed);
722 break;
723 }
724 }
725
726 if let Some(stream) = &mut *private_stream.write().await {
728 match stream.next().await {
729 Some(frame) => {
730 let msg = Message::from_frame_view(frame);
731 match msg {
732 Message::Text(text) => {
733 health.write().messages_received += 1;
734
735 if let Err(e) =
736 Self::process_private_message(&text, &report_sender).await
737 {
738 error!("Failed to process private message: {e}");
739 }
740 }
741 Message::Binary(_) => {
742 warn!("Received unexpected binary message on private stream");
743 }
744 _ => {}
745 }
746 }
747 None => {
748 warn!("Private stream closed");
749 }
750 }
751 }
752 }
753
754 debug!("Message processing task terminated");
755 });
756
757 *self.message_handle.write() = Some(handle);
758 }
759
760 async fn process_trade_message(
762 text: &str,
763 health: &Arc<RwLock<ConnectionHealth>>,
764 clock: &Clock,
765 is_authenticated: &Arc<AtomicBool>,
766 pending_requests: &Arc<RwLock<FxHashMap<SmartString, PendingRequest>>>,
767 report_sender: &Arc<RwLock<Option<Sender<ExecutionReport>>>>,
768 order_symbol_map: &Arc<RwLock<FxHashMap<SmartString, SmartString>>>,
769 ) -> Result<()> {
770 let mut text_bytes = text.as_bytes().to_vec();
771 let msg = simd_json::from_slice::<JsonValue>(&mut text_bytes)?;
772
773 if let Some(op) = msg.get("op").and_then(|v| v.as_str()) {
774 match op {
775 "auth" => {
776 let ret_code = msg
777 .get("retCode")
778 .and_then(simd_json::prelude::ValueAsScalar::as_i64)
779 .unwrap_or(-1);
780
781 if ret_code == 0 {
782 is_authenticated.store(true, Ordering::Relaxed);
783 info!("WebSocket authentication successful");
784 } else {
785 let ret_msg = msg
786 .get("retMsg")
787 .and_then(|v| v.as_str())
788 .unwrap_or("Unknown error");
789 error!("WebSocket authentication failed: {ret_msg}");
790 }
791 }
792 "ping" => {
793 if let Some(ret_msg) = msg.get("ret_msg").and_then(|v| v.as_str())
794 && ret_msg == "pong"
795 {
796 let mut h = health.write();
797 h.last_pong_received = clock.raw();
798 h.pending_pings = h.pending_pings.saturating_sub(1);
799 h.is_healthy = true;
800 }
801 }
802 "order.create" | "order.amend" | "order.cancel" => {
803 Self::process_order_response(
804 msg.clone(),
805 op,
806 pending_requests,
807 report_sender,
808 order_symbol_map,
809 )
810 .await?;
811 }
812 "order.create-batch" | "order.amend-batch" | "order.cancel-batch" => {
813 Self::process_batch_order_response(
814 msg.clone(),
815 op,
816 pending_requests,
817 report_sender,
818 order_symbol_map,
819 )
820 .await?;
821 }
822 "order.cancel-all" => {
823 Self::process_cancel_all_response(msg, pending_requests, report_sender).await?;
824 }
825 _ => {
826 debug!("Unhandled op type: {op}");
827 }
828 }
829 }
830
831 Ok(())
832 }
833
834 async fn process_private_message(
836 text: &str,
837 report_sender: &Arc<RwLock<Option<Sender<ExecutionReport>>>>,
838 ) -> Result<()> {
839 let mut text_bytes = text.as_bytes().to_vec();
840 let msg = simd_json::from_slice::<JsonValue>(&mut text_bytes)?;
841
842 if let Some(topic) = msg.get("topic").and_then(|v| v.as_str())
843 && (topic == "order" || topic.starts_with("order."))
844 {
845 Self::process_order_update(msg, report_sender).await?;
846 }
847
848 Ok(())
849 }
850
851 async fn process_order_response(
853 msg: JsonValue,
854 op: &str,
855 pending_requests: &Arc<RwLock<FxHashMap<SmartString, PendingRequest>>>,
856 report_sender: &Arc<RwLock<Option<Sender<ExecutionReport>>>>,
857 order_symbol_map: &Arc<RwLock<FxHashMap<SmartString, SmartString>>>,
858 ) -> Result<()> {
859 let req_id: Option<SmartString> = msg
860 .get("reqId")
861 .and_then(|v| v.as_str())
862 .map(std::convert::Into::into);
863
864 let ret_code = msg
865 .get("retCode")
866 .and_then(simd_json::prelude::ValueAsScalar::as_i64)
867 .unwrap_or(-1);
868
869 if let Some(req_id) = req_id
870 && let Some(pending) = pending_requests.write().remove(&req_id)
871 {
872 if ret_code == 0 {
873 if let Some(data) = msg.get("data") {
875 let order_id = data.get("orderId").and_then(|v| v.as_str()).unwrap_or("");
876 let client_order_id = data
877 .get("orderLinkId")
878 .and_then(|v| v.as_str())
879 .unwrap_or("");
880
881 if !order_id.is_empty()
883 && !client_order_id.is_empty()
884 && let Some(symbol) = order_symbol_map.read().get(client_order_id)
885 {
886 order_symbol_map
887 .write()
888 .insert(order_id.into(), symbol.clone());
889 }
890
891 let report = ExecutionReport {
892 id: Uuid::new_v4().to_string().into(),
893 order_id: order_id.into(),
894 exchange_timestamp: time::get_epoch_timestamp_ns(),
895 system_timestamp: time::get_epoch_timestamp_ns(),
896 instrument_id: InstrumentId::new(
897 "UNKNOWN",
898 rusty_model::venues::Venue::Bybit,
899 ),
900 status: OrderStatus::New,
901 filled_quantity: Decimal::ZERO,
902 remaining_quantity: Decimal::ZERO,
903 execution_price: None,
904 reject_reason: None,
905 exchange_execution_id: None,
906 is_final: false,
907 };
908
909 if let Err(e) = pending.report_sender.send(report) {
910 error!("Failed to send execution report: {e}");
911 }
912 }
913 } else {
914 let ret_msg = msg
916 .get("retMsg")
917 .and_then(|v| v.as_str())
918 .unwrap_or("Unknown error");
919
920 let report = ExecutionReport {
921 id: Uuid::new_v4().to_string().into(),
922 order_id: SmartString::default(),
923 exchange_timestamp: time::get_epoch_timestamp_ns(),
924 system_timestamp: time::get_epoch_timestamp_ns(),
925 instrument_id: InstrumentId::new("UNKNOWN", rusty_model::venues::Venue::Bybit),
926 status: OrderStatus::Rejected,
927 filled_quantity: Decimal::ZERO,
928 remaining_quantity: Decimal::ZERO,
929 execution_price: None,
930 reject_reason: Some(ret_msg.into()),
931 exchange_execution_id: None,
932 is_final: true,
933 };
934
935 if let Err(e) = pending.report_sender.send(report) {
936 error!("Failed to send rejection report: {e}");
937 }
938 }
939 }
940
941 Ok(())
942 }
943
944 async fn process_batch_order_response(
946 msg: JsonValue,
947 op: &str,
948 pending_requests: &Arc<RwLock<FxHashMap<SmartString, PendingRequest>>>,
949 report_sender: &Arc<RwLock<Option<Sender<ExecutionReport>>>>,
950 order_symbol_map: &Arc<RwLock<FxHashMap<SmartString, SmartString>>>,
951 ) -> Result<()> {
952 let req_id: Option<SmartString> = msg
953 .get("reqId")
954 .and_then(|v| v.as_str())
955 .map(std::convert::Into::into);
956
957 let ret_code = msg
958 .get("retCode")
959 .and_then(simd_json::prelude::ValueAsScalar::as_i64)
960 .unwrap_or(-1);
961
962 if let Some(req_id) = req_id
963 && let Some(pending) = pending_requests.write().remove(&req_id)
964 {
965 if ret_code == 0 {
966 if let Some(data) = msg.get("data")
968 && let Some(list) = data.get("list").and_then(|v| v.as_array())
969 {
970 for item in list {
971 let order_id = item.get("orderId").and_then(|v| v.as_str()).unwrap_or("");
972
973 let order_link_id = item
974 .get("orderLinkId")
975 .and_then(|v| v.as_str())
976 .unwrap_or("");
977
978 let report = ExecutionReport {
979 id: Uuid::new_v4().to_string().into(),
980 order_id: order_id.into(),
981 exchange_timestamp: time::get_epoch_timestamp_ns(),
982 system_timestamp: time::get_epoch_timestamp_ns(),
983 instrument_id: InstrumentId::new(
984 "UNKNOWN",
985 rusty_model::venues::Venue::Bybit,
986 ),
987 status: OrderStatus::New,
988 filled_quantity: Decimal::ZERO,
989 remaining_quantity: Decimal::ZERO,
990 execution_price: None,
991 reject_reason: None,
992 exchange_execution_id: Some(order_link_id.into()),
993 is_final: false,
994 };
995
996 if let Err(e) = pending.report_sender.send(report) {
997 error!("Failed to send batch execution report: {e}");
998 }
999 }
1000 }
1001 } else {
1002 let ret_msg = msg
1004 .get("retMsg")
1005 .and_then(|v| v.as_str())
1006 .unwrap_or("Batch order failed");
1007
1008 error!("Batch order failed: {ret_msg}");
1009 }
1010 }
1011
1012 Ok(())
1013 }
1014
1015 async fn process_cancel_all_response(
1017 msg: JsonValue,
1018 pending_requests: &Arc<RwLock<FxHashMap<SmartString, PendingRequest>>>,
1019 report_sender: &Arc<RwLock<Option<Sender<ExecutionReport>>>>,
1020 ) -> Result<()> {
1021 let req_id: Option<SmartString> = msg
1022 .get("reqId")
1023 .and_then(|v| v.as_str())
1024 .map(std::convert::Into::into);
1025
1026 let ret_code = msg
1027 .get("retCode")
1028 .and_then(simd_json::prelude::ValueAsScalar::as_i64)
1029 .unwrap_or(-1);
1030
1031 if let Some(req_id) = req_id
1032 && let Some(pending) = pending_requests.write().remove(&req_id)
1033 {
1034 if ret_code == 0 {
1035 let report = ExecutionReport {
1037 id: Uuid::new_v4().to_string().into(),
1038 order_id: req_id.clone(),
1039 exchange_timestamp: time::get_epoch_timestamp_ns(),
1040 system_timestamp: time::get_epoch_timestamp_ns(),
1041 instrument_id: InstrumentId::new("UNKNOWN", rusty_model::venues::Venue::Bybit),
1042 status: OrderStatus::New,
1043 filled_quantity: Decimal::ZERO,
1044 remaining_quantity: Decimal::ZERO,
1045 execution_price: None,
1046 reject_reason: None,
1047 exchange_execution_id: None,
1048 is_final: false,
1049 };
1050
1051 if let Err(e) = pending.report_sender.send(report) {
1052 error!("Failed to send cancel all report: {e}");
1053 }
1054 } else {
1055 let ret_msg = msg
1056 .get("retMsg")
1057 .and_then(|v| v.as_str())
1058 .unwrap_or("Cancel all failed");
1059
1060 error!("Cancel all failed: {ret_msg}");
1061 }
1062 }
1063
1064 Ok(())
1065 }
1066
1067 async fn process_order_update(
1069 msg: JsonValue,
1070 report_sender: &Arc<RwLock<Option<Sender<ExecutionReport>>>>,
1071 ) -> Result<()> {
1072 if let Some(data) = msg.get("data").and_then(|v| v.as_array()) {
1073 for order_data in data {
1074 let order_id = order_data
1075 .get("orderId")
1076 .and_then(|v| v.as_str())
1077 .unwrap_or("");
1078
1079 let order_link_id = order_data
1080 .get("orderLinkId")
1081 .and_then(|v| v.as_str())
1082 .unwrap_or("");
1083
1084 let symbol = order_data
1085 .get("symbol")
1086 .and_then(|v| v.as_str())
1087 .unwrap_or("");
1088
1089 let side_str = order_data
1090 .get("side")
1091 .and_then(|v| v.as_str())
1092 .unwrap_or("Buy");
1093
1094 let side = match side_str {
1095 "Sell" => OrderSide::Sell,
1096 _ => OrderSide::Buy,
1097 };
1098
1099 let order_status_str = order_data
1100 .get("orderStatus")
1101 .and_then(|v| v.as_str())
1102 .unwrap_or("New");
1103
1104 let order_status = Self::map_v5_order_status(order_status_str);
1105
1106 let qty = order_data
1107 .get("qty")
1108 .and_then(|v| v.as_str())
1109 .and_then(|s| Decimal::from_str_exact(s).ok())
1110 .unwrap_or(Decimal::ZERO);
1111
1112 let price = order_data
1113 .get("price")
1114 .and_then(|v| v.as_str())
1115 .and_then(|s| Decimal::from_str_exact(s).ok())
1116 .unwrap_or(Decimal::ZERO);
1117
1118 let cum_exec_qty = order_data
1119 .get("cumExecQty")
1120 .and_then(|v| v.as_str())
1121 .and_then(|s| Decimal::from_str_exact(s).ok())
1122 .unwrap_or(Decimal::ZERO);
1123
1124 let avg_price = order_data
1125 .get("avgPrice")
1126 .and_then(|v| v.as_str())
1127 .and_then(|s| Decimal::from_str_exact(s).ok())
1128 .unwrap_or(Decimal::ZERO);
1129
1130 let leaves_qty = order_data
1131 .get("leavesQty")
1132 .and_then(|v| v.as_str())
1133 .and_then(|s| Decimal::from_str_exact(s).ok())
1134 .unwrap_or(qty - cum_exec_qty);
1135
1136 let exec_type = Self::map_order_status_to_exec_type(order_status);
1137
1138 let report = ExecutionReport {
1139 id: Uuid::new_v4().to_string().into(),
1140 order_id: order_id.into(),
1141 exchange_timestamp: time::get_epoch_timestamp_ns(),
1142 system_timestamp: time::get_epoch_timestamp_ns(),
1143 instrument_id: InstrumentId::new("UNKNOWN", rusty_model::venues::Venue::Bybit),
1144 status: order_status,
1145 filled_quantity: Decimal::ZERO,
1146 remaining_quantity: Decimal::ZERO,
1147 execution_price: None,
1148 reject_reason: None,
1149 exchange_execution_id: Some(order_link_id.into()),
1150 is_final: false,
1151 };
1152
1153 if let Some(sender) = &*report_sender.read()
1154 && let Err(e) = sender.send(report)
1155 {
1156 error!("Failed to send order update report: {e}");
1157 }
1158 }
1159 }
1160
1161 Ok(())
1162 }
1163
1164 async fn place_order_internal(&self, order: &Order) -> Result<SmartString> {
1166 if !self.rate_limiter.write().can_send_request() {
1168 bail!("Rate limit exceeded");
1169 }
1170
1171 let req_id = Uuid::new_v4().to_string();
1172 let timestamp = time::get_timestamp_ms();
1173
1174 let category = self.get_category_from_symbol(&order.symbol);
1176
1177 let mut order_args = json!({
1179 "symbol": order.symbol.as_str(),
1180 "side": match order.side {
1181 OrderSide::Buy => "Buy",
1182 OrderSide::Sell => "Sell",
1183 },
1184 "orderType": match order.order_type {
1185 OrderType::Market => "Market",
1186 OrderType::Limit => "Limit",
1187 OrderType::PostOnly => "Limit", _ => "Limit",
1189 },
1190 "qty": order.quantity.to_string(),
1191 "category": category,
1192 "timeInForce": match order.time_in_force {
1193 TimeInForce::GTC => "GTC",
1194 TimeInForce::IOC => "IOC",
1195 TimeInForce::FOK => "FOK",
1196 TimeInForce::GTX => "PostOnly", TimeInForce::GTD => {
1198 warn!("GTD (Good Till Date) not supported by Bybit, using GTC instead for order {}", order.id);
1199 "GTC" },
1201 }
1202 });
1203
1204 if category == "linear" || category == "inverse" {
1206 order_args["positionIdx"] = json!(0); }
1208
1209 if order.order_type == OrderType::Limit
1211 && let Some(price) = order.price
1212 {
1213 order_args["price"] = json!(price.to_string());
1214 }
1215
1216 order_args["orderLinkId"] = json!(order.id.to_string());
1219
1220 if let Some(stop_price) = order.stop_price
1222 && stop_price > Decimal::ZERO
1223 {
1224 order_args["stopLoss"] = json!(stop_price.to_string());
1225 order_args["slTriggerBy"] = json!("LastPrice");
1226 }
1227
1228 let request = json!({
1233 "reqId": &req_id,
1234 "header": {
1235 "X-BAPI-TIMESTAMP": timestamp.to_string(),
1236 "X-BAPI-RECV-WINDOW": "5000"
1237 },
1238 "op": "order.create",
1239 "args": [order_args]
1240 })
1241 .to_string();
1242
1243 if let Some(sink) = &mut *self.trade_sink.write().await {
1245 sink.send(Message::text(request).to_frame_view()).await?;
1246 self.rate_limiter.write().record_request();
1247 } else {
1248 bail!("WebSocket trade connection not established");
1249 }
1250
1251 Ok(req_id.into())
1252 }
1253
1254 async fn amend_order_internal(
1256 &self,
1257 order_id: &str,
1258 symbol: &str,
1259 new_price: Option<Decimal>,
1260 new_quantity: Option<Decimal>,
1261 ) -> Result<SmartString> {
1262 if !self.rate_limiter.write().can_send_request() {
1264 bail!("Rate limit exceeded");
1265 }
1266
1267 let req_id = Uuid::new_v4().to_string();
1268 let timestamp = time::get_timestamp_ms();
1269
1270 let category = self.get_category_from_symbol(symbol);
1272
1273 let mut amend_args = json!({
1275 "orderId": order_id,
1276 "symbol": symbol,
1277 "category": category,
1278 });
1279
1280 if let Some(price) = new_price {
1282 amend_args["price"] = json!(price.to_string());
1283 }
1284
1285 if let Some(qty) = new_quantity {
1287 amend_args["qty"] = json!(qty.to_string());
1288 }
1289
1290 let request = json!({
1292 "reqId": &req_id,
1293 "header": {
1294 "X-BAPI-TIMESTAMP": timestamp.to_string(),
1295 "X-BAPI-RECV-WINDOW": "5000"
1296 },
1297 "op": "order.amend",
1298 "args": [amend_args]
1299 })
1300 .to_string();
1301
1302 if let Some(sink) = &mut *self.trade_sink.write().await {
1304 sink.send(Message::text(request).to_frame_view()).await?;
1305 self.rate_limiter.write().record_request();
1306 } else {
1307 bail!("WebSocket trade connection not established");
1308 }
1309
1310 Ok(req_id.into())
1311 }
1312
1313 async fn cancel_order_internal(&self, order_id: &str, symbol: &str) -> Result<SmartString> {
1315 if !self.rate_limiter.write().can_send_request() {
1317 bail!("Rate limit exceeded");
1318 }
1319
1320 let req_id = Uuid::new_v4().to_string();
1321 let timestamp = time::get_timestamp_ms();
1322
1323 let category = self.get_category_from_symbol(symbol);
1325
1326 let cancel_args = json!({
1328 "orderId": order_id,
1329 "symbol": symbol,
1330 "category": category,
1331 });
1332
1333 let request = json!({
1335 "reqId": &req_id,
1336 "header": {
1337 "X-BAPI-TIMESTAMP": timestamp.to_string(),
1338 "X-BAPI-RECV-WINDOW": "5000"
1339 },
1340 "op": "order.cancel",
1341 "args": [cancel_args]
1342 })
1343 .to_string();
1344
1345 if let Some(sink) = &mut *self.trade_sink.write().await {
1347 sink.send(Message::text(request).to_frame_view()).await?;
1348 self.rate_limiter.write().record_request();
1349 } else {
1350 bail!("WebSocket trade connection not established");
1351 }
1352
1353 Ok(req_id.into())
1354 }
1355
1356 async fn cancel_all_orders_internal(
1358 &self,
1359 category: &str,
1360 symbol: Option<&str>,
1361 ) -> Result<SmartString> {
1362 if !self.rate_limiter.write().can_send_request() {
1364 bail!("Rate limit exceeded");
1365 }
1366
1367 let req_id = Uuid::new_v4().to_string();
1368 let timestamp = time::get_timestamp_ms();
1369
1370 let mut cancel_args = json!({
1372 "category": category,
1373 });
1374
1375 if let Some(sym) = symbol {
1377 cancel_args["symbol"] = json!(sym);
1378 }
1379
1380 let request = json!({
1382 "reqId": &req_id,
1383 "header": {
1384 "X-BAPI-TIMESTAMP": timestamp.to_string(),
1385 "X-BAPI-RECV-WINDOW": "5000"
1386 },
1387 "op": "order.cancel-all",
1388 "args": [cancel_args]
1389 })
1390 .to_string();
1391
1392 if let Some(sink) = &mut *self.trade_sink.write().await {
1394 sink.send(Message::text(request).to_frame_view()).await?;
1395 self.rate_limiter.write().record_request();
1396 } else {
1397 bail!("WebSocket trade connection not established");
1398 }
1399
1400 Ok(req_id.into())
1401 }
1402
1403 fn get_category_from_symbol(&self, symbol: &str) -> &'static str {
1405 if let Some(info) = self.instrument_cache.read().get(symbol)
1407 && !info.is_expired()
1408 {
1409 return info.category.as_str();
1410 }
1411
1412 let fallback_category = self.get_category_from_symbol_heuristic(symbol);
1414
1415 let symbol_owned = symbol.to_owned();
1417 let cache = self.instrument_cache.clone();
1418 let auth = self.auth.clone();
1419 let testnet = self.testnet;
1420 tokio::spawn(async move {
1421 if let Err(e) =
1422 Self::refresh_instrument_info_for_symbol(&cache, &auth, &symbol_owned, testnet)
1423 .await
1424 {
1425 debug!("Failed to refresh instrument info for {symbol_owned}: {e}");
1426 }
1427 });
1428
1429 fallback_category
1430 }
1431
1432 fn get_category_from_symbol_heuristic(&self, symbol: &str) -> &'static str {
1434 if symbol.ends_with("USDT") || symbol.ends_with("USDC") {
1435 if symbol.contains('-') {
1436 CATEGORY_OPTION
1437 } else {
1438 CATEGORY_LINEAR
1439 }
1440 } else if symbol.ends_with("USD") {
1441 CATEGORY_INVERSE
1442 } else {
1443 CATEGORY_SPOT
1444 }
1445 }
1446
1447 async fn refresh_instrument_info_for_symbol(
1449 cache: &Arc<RwLock<FxHashMap<SmartString, InstrumentInfo>>>,
1450 auth: &BybitAuth,
1451 symbol: &str,
1452 testnet: bool,
1453 ) -> Result<()> {
1454 let categories = ["spot", "linear", "inverse", "option"];
1456
1457 for category in &categories {
1458 if let Ok(info) = Self::fetch_instrument_info(auth, symbol, category, testnet).await {
1459 cache.write().insert(symbol.into(), info);
1460 debug!("Cached instrument info for symbol: {symbol} in category: {category}");
1461 return Ok(());
1462 }
1463 }
1464
1465 let fallback_category = if symbol.ends_with("USDT") || symbol.ends_with("USDC") {
1467 if symbol.contains('-') {
1468 BybitCategory::Option
1469 } else {
1470 BybitCategory::Linear
1471 }
1472 } else if symbol.ends_with("USD") {
1473 BybitCategory::Inverse
1474 } else {
1475 BybitCategory::Spot
1476 };
1477
1478 let info = InstrumentInfo::new(
1479 symbol.into(),
1480 fallback_category,
1481 "Unknown".into(),
1482 "Unknown".into(),
1483 "Unknown".into(),
1484 None,
1485 );
1486
1487 cache.write().insert(symbol.into(), info);
1488 warn!(
1489 "Symbol {} not found via API, cached with heuristic category: {}",
1490 symbol,
1491 fallback_category.as_str()
1492 );
1493
1494 Ok(())
1495 }
1496
1497 async fn fetch_instrument_info(
1499 auth: &BybitAuth,
1500 symbol: &str,
1501 category: &str,
1502 testnet: bool,
1503 ) -> Result<InstrumentInfo> {
1504 let base_url = if testnet {
1505 "https://api-testnet.bybit.com"
1506 } else {
1507 "https://api.bybit.com"
1508 };
1509
1510 let url =
1511 format!("{base_url}/v5/market/instruments-info?category={category}&symbol={symbol}");
1512
1513 let client = reqwest::Client::builder()
1515 .timeout(std::time::Duration::from_secs(10))
1516 .build()?;
1517
1518 let response = client
1519 .get(&url)
1520 .header("Content-Type", "application/json")
1521 .send()
1522 .await?;
1523
1524 if !response.status().is_success() {
1525 bail!("HTTP request failed: {}", response.status());
1526 }
1527
1528 let text = response.text().await?;
1529 let mut text_bytes = text.as_bytes().to_vec();
1530 let data = simd_json::from_slice::<JsonValue>(&mut text_bytes)?;
1531
1532 let ret_code = data
1534 .get("retCode")
1535 .and_then(simd_json::prelude::ValueAsScalar::as_i64)
1536 .unwrap_or(-1);
1537 if ret_code != 0 {
1538 let ret_msg = data
1539 .get("retMsg")
1540 .and_then(|v| v.as_str())
1541 .unwrap_or("Unknown error");
1542 bail!("API error: {}", ret_msg);
1543 }
1544
1545 let result = data
1546 .get("result")
1547 .ok_or_else(|| anyhow!("Missing result field"))?;
1548 let list = result
1549 .get("list")
1550 .and_then(|v| v.as_array())
1551 .ok_or_else(|| anyhow!("Missing or invalid list field"))?;
1552
1553 if list.is_empty() {
1554 bail!("Symbol not found in category {}", category);
1555 }
1556
1557 let instrument = &list[0];
1558
1559 let symbol_str: SmartString = instrument
1560 .get("symbol")
1561 .and_then(|v| v.as_str())
1562 .unwrap_or(symbol)
1563 .into();
1564
1565 let category_enum = BybitCategory::parse_category(category)
1566 .ok_or_else(|| anyhow!("Invalid category: {}", category))?;
1567
1568 let status: SmartString = instrument
1569 .get("status")
1570 .and_then(|v| v.as_str())
1571 .unwrap_or("Unknown")
1572 .into();
1573
1574 let base_coin: SmartString = instrument
1575 .get("baseCoin")
1576 .and_then(|v| v.as_str())
1577 .unwrap_or("Unknown")
1578 .into();
1579
1580 let quote_coin: SmartString = instrument
1581 .get("quoteCoin")
1582 .and_then(|v| v.as_str())
1583 .unwrap_or("Unknown")
1584 .into();
1585
1586 let contract_type: Option<SmartString> = instrument
1587 .get("contractType")
1588 .and_then(|v| v.as_str())
1589 .map(std::convert::Into::into);
1590
1591 Ok(InstrumentInfo::new(
1592 symbol_str,
1593 category_enum,
1594 status,
1595 base_coin,
1596 quote_coin,
1597 contract_type,
1598 ))
1599 }
1600
1601 pub async fn refresh_instrument_cache(&self) -> Result<()> {
1603 let symbols: Vec<SmartString> = self.order_symbol_map.read().values().cloned().collect();
1604
1605 if symbols.is_empty() {
1606 debug!("No symbols to refresh in instrument cache");
1607 return Ok(());
1608 }
1609
1610 info!("Refreshing instrument cache for {} symbols", symbols.len());
1611
1612 for chunk in symbols.chunks(10) {
1614 let mut tasks = Vec::new();
1615
1616 for symbol in chunk {
1617 let cache = self.instrument_cache.clone();
1618 let auth = self.auth.clone();
1619 let symbol_owned = symbol.clone();
1620 let testnet = self.testnet;
1621
1622 let task = tokio::spawn(async move {
1623 Self::refresh_instrument_info_for_symbol(&cache, &auth, &symbol_owned, testnet)
1624 .await
1625 });
1626
1627 tasks.push(task);
1628 }
1629
1630 for task in tasks {
1632 if let Err(e) = task.await {
1633 error!("Failed to refresh instrument info: {e}");
1634 }
1635 }
1636
1637 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1639 }
1640
1641 info!("Instrument cache refresh completed");
1642 Ok(())
1643 }
1644
1645 #[must_use]
1647 pub fn get_instrument_info(&self, symbol: &str) -> Option<InstrumentInfo> {
1648 self.instrument_cache.read().get(symbol).cloned()
1649 }
1650
1651 pub fn cleanup_instrument_cache(&self) {
1653 let mut cache = self.instrument_cache.write();
1654 let initial_count = cache.len();
1655
1656 cache.retain(|_, info| !info.is_expired());
1657
1658 let removed_count = initial_count - cache.len();
1659 if removed_count > 0 {
1660 debug!("Removed {removed_count} expired entries from instrument cache");
1661 }
1662 }
1663
1664 fn get_max_batch_size(&self, category: &str) -> usize {
1666 match category {
1667 CATEGORY_SPOT => MAX_BATCH_SPOT,
1668 CATEGORY_LINEAR => MAX_BATCH_LINEAR,
1669 CATEGORY_INVERSE => MAX_BATCH_INVERSE,
1670 CATEGORY_OPTION => MAX_BATCH_OPTION,
1671 _ => MAX_BATCH_SPOT,
1672 }
1673 }
1674
1675 const fn get_max_batch_size_for_category(&self, category: BybitCategory) -> usize {
1677 category.max_batch_size()
1678 }
1679
1680 fn get_category_enum_from_symbol(&self, symbol: &str) -> BybitCategory {
1682 if let Some(info) = self.instrument_cache.read().get(symbol)
1684 && !info.is_expired()
1685 {
1686 return info.category;
1687 }
1688
1689 if symbol.ends_with("USDT") || symbol.ends_with("USDC") {
1691 if symbol.contains('-') {
1692 BybitCategory::Option
1693 } else {
1694 BybitCategory::Linear
1695 }
1696 } else if symbol.ends_with("USD") {
1697 BybitCategory::Inverse
1698 } else {
1699 BybitCategory::Spot
1700 }
1701 }
1702}
1703#[async_trait]
1704impl Exchange for BybitWebsocketTrading {
1705 fn venue(&self) -> Venue {
1706 Venue::Bybit
1707 }
1708
1709 async fn place_order(
1710 &self,
1711 order: Order,
1712 report_sender: Sender<ExecutionReport>,
1713 ) -> Result<()> {
1714 let category = self.get_category_from_symbol(&order.symbol);
1716 if let Err(e) = self.validate_order_for_account(&order, category).await {
1717 warn!("V5 validation failed but continuing with order: {e}");
1718 }
1719
1720 let req_id = self.place_order_internal(&order).await?;
1722
1723 self.order_symbol_map
1726 .write()
1727 .insert(order.id.to_string().into(), order.symbol.clone());
1728
1729 let pending = PendingRequest {
1730 op_type: SmartString::from("order.create"),
1731 timestamp: self.clock.raw(),
1732 report_sender,
1733 };
1734
1735 self.pending_requests.write().insert(req_id, pending);
1736
1737 Ok(())
1738 }
1739
1740 async fn cancel_order(
1741 &self,
1742 order_id: SmartString,
1743 report_sender: Sender<ExecutionReport>,
1744 ) -> Result<()> {
1745 let symbol = self
1747 .order_symbol_map
1748 .read()
1749 .get(&order_id)
1750 .cloned()
1751 .ok_or_else(|| anyhow!("Symbol not found for order_id: {}", order_id))?;
1752
1753 let req_id = self.cancel_order_internal(&order_id, &symbol).await?;
1754
1755 let pending = PendingRequest {
1756 op_type: SmartString::from("order.cancel"),
1757 timestamp: self.clock.raw(),
1758 report_sender,
1759 };
1760
1761 self.pending_requests.write().insert(req_id, pending);
1762
1763 Ok(())
1764 }
1765
1766 async fn modify_order(
1767 &self,
1768 order_id: SmartString,
1769 new_price: Option<Decimal>,
1770 new_quantity: Option<Decimal>,
1771 report_sender: Sender<ExecutionReport>,
1772 ) -> Result<()> {
1773 let symbol = self
1775 .order_symbol_map
1776 .read()
1777 .get(&order_id)
1778 .cloned()
1779 .ok_or_else(|| anyhow!("Symbol not found for order_id: {}", order_id))?;
1780
1781 let req_id = self
1782 .amend_order_internal(&order_id, &symbol, new_price, new_quantity)
1783 .await?;
1784
1785 let pending = PendingRequest {
1786 op_type: SmartString::from("order.amend"),
1787 timestamp: self.clock.raw(),
1788 report_sender,
1789 };
1790
1791 self.pending_requests.write().insert(req_id, pending);
1792
1793 Ok(())
1794 }
1795
1796 async fn cancel_all_orders(
1797 &self,
1798 instrument_id: Option<InstrumentId>,
1799 report_sender: Sender<ExecutionReport>,
1800 ) -> Result<()> {
1801 let (category, symbol_str) = if let Some(id) = &instrument_id {
1802 let category = self.get_category_from_symbol(&id.symbol);
1803 (category, Some(id.symbol.as_str()))
1804 } else {
1805 (CATEGORY_LINEAR, None) };
1808
1809 let req_id = self
1810 .cancel_all_orders_internal(category, symbol_str)
1811 .await?;
1812
1813 let pending = PendingRequest {
1814 op_type: SmartString::from("order.cancel-all"),
1815 timestamp: self.clock.raw(),
1816 report_sender,
1817 };
1818
1819 self.pending_requests.write().insert(req_id, pending);
1820
1821 Ok(())
1822 }
1823
1824 async fn get_order_status(&self, order_id: &str) -> Result<OrderStatus> {
1825 bail!("Order status query not supported via WebSocket - track from order stream")
1828 }
1829
1830 async fn connect(&self, report_sender: Sender<ExecutionReport>) -> Result<()> {
1831 self.connect_internal(report_sender).await
1833 }
1834
1835 async fn disconnect(&self) -> Result<()> {
1836 self.is_connected.store(false, Ordering::Relaxed);
1837 self.is_authenticated.store(false, Ordering::Relaxed);
1838
1839 if let Some(sink) = &mut *self.trade_sink.write().await {
1841 let _ = sink.close().await;
1842 }
1843 *self.trade_sink.write().await = None;
1844 *self.trade_stream.write().await = None;
1845
1846 if let Some(sink) = &mut *self.private_sink.write().await {
1847 let _ = sink.close().await;
1848 }
1849 *self.private_sink.write().await = None;
1850 *self.private_stream.write().await = None;
1851
1852 if let Some(handle) = self.ping_handle.write().take() {
1854 handle.abort();
1855 }
1856
1857 if let Some(handle) = self.message_handle.write().take() {
1858 handle.abort();
1859 }
1860
1861 self.pending_requests.write().clear();
1863
1864 info!("Bybit WebSocket trading disconnected");
1865 Ok(())
1866 }
1867
1868 async fn is_connected(&self) -> bool {
1869 self.is_connected.load(Ordering::Relaxed) && self.is_authenticated.load(Ordering::Relaxed)
1870 }
1871
1872 async fn get_instruments(&self) -> Result<SmallVec<[InstrumentId; 32]>> {
1873 bail!("Instrument query not supported via WebSocket")
1876 }
1877
1878 async fn send_fix_message(&self, message: Vec<u8>) -> Result<()> {
1879 bail!("FIX protocol not supported on Bybit WebSocket")
1881 }
1882
1883 async fn receive_fix_message(&self) -> Result<Vec<u8>> {
1884 bail!("FIX protocol not supported on Bybit WebSocket")
1886 }
1887}