rusty_bin/monitor/collector/
exchange_client.rs

1//! Exchange client for connecting to individual exchanges
2//!
3//! This module handles the connection and data streaming from individual exchanges
4//! using the rusty-feeder provider infrastructure.
5
6use crate::monitor::collector::{
7    CollectionError, CollectionStatus, DataType, MarketDataEvent, Result,
8};
9use crate::monitor::schema::{OrderBookRecord, PriceLevel, TradeRecord, TradeSide, timestamp};
10use flume::Sender;
11use rusty_common::collections::FxHashMap;
12use rusty_feeder::Provider;
13use rusty_feeder::exchange::binance::futures::data::{
14    orderbook::OrderbookMessage, trade::AggTradeMessage,
15};
16use rusty_feeder::exchange::{
17    binance::futures::provider::{BinanceFuturesMarketType, BinanceFuturesProvider},
18    bithumb::provider::BithumbProvider,
19    bybit::provider::BybitProvider,
20    coinbase::provider::CoinbaseProvider,
21    upbit::provider::UpbitProvider,
22};
23use rusty_feeder::provider::config::ConnectionConfig;
24use rusty_model::{
25    data::{book_snapshot::OrderBookSnapshot, market_trade::MarketTrade},
26    enums::OrderSide,
27};
28use smallvec::SmallVec;
29use smartstring::alias::String as SmartString;
30use std::sync::Arc;
31use tokio::sync::RwLock;
32use tokio::task::JoinHandle;
33
34/// Provider wrapper that abstracts over different exchange providers
35pub enum ExchangeProvider {
36    /// Binance Futures provider for USD-M (USDT-settled) perpetual futures
37    BinanceFutures(Box<BinanceFuturesProvider>),
38    /// Upbit provider for Korean spot market trading
39    Upbit(Box<UpbitProvider>),
40    /// Bybit provider for derivatives and spot trading
41    Bybit(Box<BybitProvider>),
42    /// Coinbase provider for US-regulated spot trading
43    Coinbase(Box<CoinbaseProvider>),
44    /// Bithumb provider for Korean spot market trading
45    Bithumb(Box<BithumbProvider>),
46}
47
48// TODO!: refactoring feeder and make ext trait, and integrate it all in here. not just in `ExchangeClient`
49impl ExchangeProvider {
50    /// Create a new provider based on exchange name
51    pub async fn new(exchange_name: &str) -> Result<Self> {
52        match exchange_name.to_lowercase().as_str() {
53            "binance" => {
54                // Use USD-M Futures by default (USDT-settled perpetual futures)
55                let provider = BinanceFuturesProvider::new(
56                    ConnectionConfig::default(),
57                    BinanceFuturesMarketType::UsdM,
58                );
59                Ok(Self::BinanceFutures(Box::new(provider)))
60            }
61            "upbit" => {
62                let provider = UpbitProvider::new();
63                Ok(Self::Upbit(Box::new(provider)))
64            }
65            "bybit" => {
66                let provider = BybitProvider::new_spot();
67                Ok(Self::Bybit(Box::new(provider)))
68            }
69            "coinbase" => {
70                let provider = CoinbaseProvider::new();
71                Ok(Self::Coinbase(Box::new(provider)))
72            }
73            "bithumb" => {
74                let provider = BithumbProvider::new();
75                Ok(Self::Bithumb(Box::new(provider)))
76            }
77            _ => Err(CollectionError::Configuration(format!(
78                "Unsupported exchange: {exchange_name}"
79            ))),
80        }
81    }
82
83    /// Get the exchange name
84    #[must_use]
85    pub const fn name(&self) -> &'static str {
86        match self {
87            Self::BinanceFutures(_) => "binance",
88            Self::Upbit(_) => "upbit",
89            Self::Bybit(_) => "bybit",
90            Self::Coinbase(_) => "coinbase",
91            Self::Bithumb(_) => "bithumb",
92        }
93    }
94
95    /// Check if provider is connected
96    pub async fn is_connected(&self) -> bool {
97        match self {
98            Self::BinanceFutures(p) => Provider::is_connected(p.as_ref()).await,
99            Self::Upbit(p) => p.as_ref().is_connected().await,
100            Self::Bybit(p) => p.is_connected().await,
101            Self::Coinbase(p) => p.is_connected().await,
102            Self::Bithumb(p) => p.is_connected().await,
103        }
104    }
105
106    // Removed subscribe_trades and subscribe_orderbook methods as they are now handled
107    // directly in spawn_trade_processor and spawn_orderbook_processor
108
109    /// Shutdown the provider
110    pub async fn shutdown(&mut self) -> Result<()> {
111        match self {
112            Self::BinanceFutures(p) => {
113                p.shutdown()
114                    .await
115                    .map_err(|e| CollectionError::ConnectionFailed {
116                        exchange: "binance".to_string(),
117                        reason: e.to_string(),
118                    })
119            }
120            Self::Upbit(p) => p
121                .shutdown()
122                .await
123                .map_err(|e| CollectionError::ConnectionFailed {
124                    exchange: "upbit".to_string(),
125                    reason: e.to_string(),
126                }),
127            Self::Bybit(p) => p
128                .shutdown()
129                .await
130                .map_err(|e| CollectionError::ConnectionFailed {
131                    exchange: "bybit".to_string(),
132                    reason: e.to_string(),
133                }),
134            Self::Coinbase(p) => {
135                p.shutdown()
136                    .await
137                    .map_err(|e| CollectionError::ConnectionFailed {
138                        exchange: "coinbase".to_string(),
139                        reason: e.to_string(),
140                    })
141            }
142            Self::Bithumb(p) => p
143                .shutdown()
144                .await
145                .map_err(|e| CollectionError::ConnectionFailed {
146                    exchange: "bithumb".to_string(),
147                    reason: e.to_string(),
148                }),
149        }
150    }
151}
152
153/// Client for managing connection to a specific exchange
154pub struct ExchangeClient {
155    exchange_name: SmartString,
156    provider: Option<ExchangeProvider>,
157    active_symbols: Arc<RwLock<FxHashMap<SmartString, Vec<DataType>>>>,
158    trade_handles: Arc<RwLock<Vec<JoinHandle<()>>>>,
159    orderbook_handles: Arc<RwLock<Vec<JoinHandle<()>>>>,
160}
161
162impl ExchangeClient {
163    /// Create a new exchange client
164    #[must_use]
165    pub fn new(exchange_name: SmartString) -> Self {
166        Self {
167            exchange_name,
168            provider: None,
169            active_symbols: Arc::new(RwLock::new(FxHashMap::default())),
170            trade_handles: Arc::new(RwLock::new(Vec::new())),
171            orderbook_handles: Arc::new(RwLock::new(Vec::new())),
172        }
173    }
174
175    /// Connect to the exchange
176    pub async fn connect(&mut self) -> Result<()> {
177        log::info!("Connecting to exchange: {}", self.exchange_name);
178
179        let mut provider = ExchangeProvider::new(&self.exchange_name).await?;
180
181        // Initialize the provider
182        match &mut provider {
183            ExchangeProvider::BinanceFutures(p) => {
184                p.init()
185                    .await
186                    .map_err(|e| CollectionError::ConnectionFailed {
187                        exchange: "binance".to_string(),
188                        reason: e.to_string(),
189                    })?
190            }
191            ExchangeProvider::Upbit(p) => {
192                p.init()
193                    .await
194                    .map_err(|e| CollectionError::ConnectionFailed {
195                        exchange: "upbit".to_string(),
196                        reason: e.to_string(),
197                    })?
198            }
199            ExchangeProvider::Bybit(p) => {
200                p.init()
201                    .await
202                    .map_err(|e| CollectionError::ConnectionFailed {
203                        exchange: "bybit".to_string(),
204                        reason: e.to_string(),
205                    })?
206            }
207            ExchangeProvider::Coinbase(p) => {
208                p.init()
209                    .await
210                    .map_err(|e| CollectionError::ConnectionFailed {
211                        exchange: "coinbase".to_string(),
212                        reason: e.to_string(),
213                    })?
214            }
215            ExchangeProvider::Bithumb(p) => {
216                p.init()
217                    .await
218                    .map_err(|e| CollectionError::ConnectionFailed {
219                        exchange: "bithumb".to_string(),
220                        reason: e.to_string(),
221                    })?
222            }
223        }
224
225        self.provider = Some(provider);
226
227        Ok(())
228    }
229
230    /// Disconnect from the exchange
231    pub async fn disconnect(&mut self) -> Result<()> {
232        log::info!("Disconnecting from exchange: {}", self.exchange_name);
233
234        // Stop all active tasks
235        let mut trade_handles = self.trade_handles.write().await;
236        for handle in trade_handles.drain(..) {
237            handle.abort();
238        }
239
240        let mut orderbook_handles = self.orderbook_handles.write().await;
241        for handle in orderbook_handles.drain(..) {
242            handle.abort();
243        }
244
245        // Shutdown provider
246        if let Some(mut provider) = self.provider.take() {
247            provider.shutdown().await?;
248        }
249
250        // Clear active symbols
251        self.active_symbols.write().await.clear();
252
253        Ok(())
254    }
255
256    /// Start collecting data for specific symbols and data types
257    pub async fn start_collection(
258        &mut self,
259        symbols: Vec<SmartString>,
260        data_types: Vec<DataType>,
261        event_sender: Sender<MarketDataEvent>,
262    ) -> Result<()> {
263        let provider = self
264            .provider
265            .as_ref()
266            .ok_or_else(|| CollectionError::ConnectionFailed {
267                exchange: self.exchange_name.to_string(),
268                reason: "Provider not connected".to_string(),
269            })?;
270
271        let exchange_name = self.exchange_name.clone();
272
273        // Update active symbols
274        {
275            let mut active_symbols = self.active_symbols.write().await;
276            for symbol in &symbols {
277                active_symbols.insert(symbol.clone(), data_types.clone());
278            }
279        }
280
281        // Start trade collection if requested
282        if data_types.contains(&DataType::Trades) {
283            let handle = self
284                .spawn_trade_processor(
285                    exchange_name.clone(),
286                    provider,
287                    symbols.clone(),
288                    event_sender.clone(),
289                )
290                .await?;
291
292            self.trade_handles.write().await.push(handle);
293        }
294
295        // Start orderbook collection if requested
296        if data_types.contains(&DataType::OrderBook) {
297            let handle = self
298                .spawn_orderbook_processor(
299                    exchange_name.clone(),
300                    provider,
301                    symbols.clone(),
302                    event_sender.clone(),
303                )
304                .await?;
305
306            self.orderbook_handles.write().await.push(handle);
307        }
308
309        Ok(())
310    }
311
312    /// Get current collection status
313    pub async fn get_status(&self) -> Option<CollectionStatus> {
314        let provider = self.provider.as_ref()?;
315        let active_symbols = self.active_symbols.read().await;
316
317        if active_symbols.is_empty() {
318            return None;
319        }
320
321        // Get first symbol for status (could be improved to aggregate)
322        let (symbol, data_types) = active_symbols.iter().next()?;
323
324        Some(CollectionStatus {
325            exchange: self.exchange_name.clone(),
326            symbol: symbol.clone(),
327            data_types: data_types.clone(),
328            connected: provider.is_connected().await,
329            last_seen: Some(timestamp::now_nanos()),
330            stats: Default::default(),
331            error_count: 0,
332            last_error: None,
333        })
334    }
335
336    /// Spawn a task to process trade messages
337    async fn spawn_trade_processor(
338        &self,
339        exchange_name: SmartString,
340        provider: &ExchangeProvider,
341        symbols: Vec<SmartString>,
342        event_sender: Sender<MarketDataEvent>,
343    ) -> Result<JoinHandle<()>> {
344        // Subscribe and get receiver based on provider type
345        let symbols_small: SmallVec<[SmartString; 8]> = symbols.iter().cloned().collect();
346
347        match provider {
348            ExchangeProvider::BinanceFutures(p) => {
349                let mut receiver = p.subscribe_trades(symbols_small).await.map_err(|e| {
350                    CollectionError::SubscriptionError {
351                        exchange: "binance".to_string(),
352                        reason: e.to_string(),
353                    }
354                })?;
355
356                let exchange_name = exchange_name.clone();
357                let func = async move {
358                    log::info!("Started trade processor for exchange: {exchange_name}");
359
360                    while let Some(msg) = receiver.recv().await {
361                        // Process Binance futures trade message (AggTradeMessage)
362                        match convert_binance_trade(msg, exchange_name.clone()) {
363                            Ok(trade_record) => {
364                                let event = MarketDataEvent::Trade(trade_record);
365
366                                if let Err(e) = event_sender.send(event) {
367                                    log::error!("Failed to send trade event: {e}");
368                                    break;
369                                }
370                            }
371                            Err(e) => {
372                                log::error!("Failed to convert Binance trade: {e}");
373                                // Skip this trade and continue processing
374                            }
375                        }
376                    }
377
378                    log::info!("Trade processor stopped for exchange: {exchange_name}");
379                };
380
381                Ok(tokio::spawn(func))
382            }
383            _ => {
384                // For other exchanges, we need to implement similar patterns
385                // For now, return a placeholder
386                Ok(tokio::spawn(async move {
387                    log::warn!("Trade processor not yet implemented for exchange: {exchange_name}");
388                }))
389            }
390        }
391    }
392
393    /// Spawn a task to process orderbook messages
394    async fn spawn_orderbook_processor(
395        &self,
396        exchange_name: SmartString,
397        provider: &ExchangeProvider,
398        symbols: Vec<SmartString>,
399        event_sender: Sender<MarketDataEvent>,
400    ) -> Result<JoinHandle<()>> {
401        // Subscribe and get receiver based on provider type
402        let symbols_small: SmallVec<[SmartString; 8]> = symbols.iter().cloned().collect();
403
404        match provider {
405            ExchangeProvider::BinanceFutures(p) => {
406                let mut receiver = p.subscribe_orderbook(symbols_small).await.map_err(|e| {
407                    CollectionError::SubscriptionError {
408                        exchange: "binance".to_string(),
409                        reason: e.to_string(),
410                    }
411                })?;
412
413                let exchange_name = exchange_name.clone();
414                Ok(tokio::spawn(async move {
415                    log::info!("Started orderbook processor for exchange: {exchange_name}");
416
417                    while let Some(msg) = receiver.recv().await {
418                        // Process Binance futures orderbook message (OrderbookMessage)
419                        let orderbook_record =
420                            convert_binance_orderbook(msg, exchange_name.clone());
421
422                        let event = MarketDataEvent::OrderBook(orderbook_record);
423
424                        if let Err(e) = event_sender.send(event) {
425                            log::error!("Failed to send orderbook event: {e}");
426                            break;
427                        }
428                    }
429
430                    log::info!("Orderbook processor stopped for exchange: {exchange_name}");
431                }))
432            }
433            _ => {
434                // For other exchanges, we need to implement similar patterns
435                // For now, return a placeholder
436                Ok(tokio::spawn(async move {
437                    log::warn!(
438                        "Orderbook processor not yet implemented for exchange: {exchange_name}"
439                    );
440                }))
441            }
442        }
443    }
444}
445
446/// Convert rusty-model MarketTrade to rusty-bin TradeRecord
447#[must_use]
448pub fn convert_trade(trade: MarketTrade, exchange: SmartString) -> TradeRecord {
449    TradeRecord {
450        timestamp_exchange: trade.exchange_time_ns,
451        timestamp_system: crate::monitor::schema::timestamp::now_nanos(),
452        symbol: trade.instrument_id.symbol.to_string().into(),
453        exchange,
454        price: trade.price,
455        quantity: trade.quantity,
456        side: match trade.direction {
457            OrderSide::Buy => TradeSide::Buy,
458            OrderSide::Sell => TradeSide::Sell,
459        },
460        trade_id: format!("{}", trade.exchange_time_ns).into(), // Use timestamp as trade ID
461        buyer_order_id: None,                                   // Not available in MarketTrade
462        seller_order_id: None,                                  // Not available in MarketTrade
463        sequence: 0,                                            // Not available in MarketTrade
464    }
465}
466
467/// Convert rusty-model OrderBookSnapshot to rusty-bin OrderBookRecord
468#[must_use]
469pub fn convert_orderbook(snapshot: OrderBookSnapshot, exchange: SmartString) -> OrderBookRecord {
470    let bids: Vec<PriceLevel> = snapshot
471        .bids
472        .iter()
473        .map(|level| PriceLevel {
474            price: level.price,
475            quantity: level.quantity,
476            order_count: None, // PriceLevel in rusty-model doesn't have count
477        })
478        .collect();
479
480    let asks: Vec<PriceLevel> = snapshot
481        .asks
482        .iter()
483        .map(|level| PriceLevel {
484            price: level.price,
485            quantity: level.quantity,
486            order_count: None, // PriceLevel in rusty-model doesn't have count
487        })
488        .collect();
489
490    OrderBookRecord {
491        timestamp_exchange: snapshot.timestamp_event,
492        timestamp_system: crate::monitor::schema::timestamp::now_nanos(),
493        symbol: snapshot.instrument_id.symbol.to_string().into(),
494        exchange,
495        bids,
496        asks,
497        sequence: snapshot.sequence_id,
498        checksum: None, // Not available in OrderBookSnapshot
499    }
500}
501
502/// Convert Binance AggTradeMessage to rusty-bin TradeRecord
503pub fn convert_binance_trade(msg: AggTradeMessage, exchange: SmartString) -> Result<TradeRecord> {
504    let price =
505        crate::monitor::schema::decimal_optimized::fast_parse_decimal(&msg.price).map_err(|e| {
506            CollectionError::DataProcessing(format!("Failed to parse price '{}': {}", msg.price, e))
507        })?;
508
509    let quantity = crate::monitor::schema::decimal_optimized::fast_parse_decimal(&msg.quantity)
510        .map_err(|e| {
511            CollectionError::DataProcessing(format!(
512                "Failed to parse quantity '{}': {}",
513                msg.quantity, e
514            ))
515        })?;
516
517    Ok(TradeRecord {
518        timestamp_exchange: msg.trade_time * 1_000_000, // Convert ms to ns
519        timestamp_system: crate::monitor::schema::timestamp::now_nanos(),
520        symbol: msg.symbol,
521        exchange,
522        price,
523        quantity,
524        side: if msg.is_buyer_market_maker {
525            TradeSide::Sell
526        } else {
527            TradeSide::Buy
528        }, // Market maker on buy side means taker was selling
529        trade_id: msg.agg_trade_id.to_string().into(),
530        buyer_order_id: None,
531        seller_order_id: None,
532        sequence: 0,
533    })
534}
535
536/// Convert Binance OrderbookMessage to rusty-bin OrderBookRecord
537#[must_use]
538pub fn convert_binance_orderbook(msg: OrderbookMessage, exchange: SmartString) -> OrderBookRecord {
539    let bids: Vec<PriceLevel> = msg
540        .bids
541        .iter()
542        .filter_map(|bid| {
543            if bid.len() >= 2 {
544                Some(PriceLevel {
545                    price: crate::monitor::schema::decimal_optimized::fast_parse_decimal(&bid[0])
546                        .ok()?,
547                    quantity: crate::monitor::schema::decimal_optimized::fast_parse_decimal(
548                        &bid[1],
549                    )
550                    .ok()?,
551                    order_count: None,
552                })
553            } else {
554                None
555            }
556        })
557        .collect();
558
559    let asks: Vec<PriceLevel> = msg
560        .asks
561        .iter()
562        .filter_map(|ask| {
563            if ask.len() >= 2 {
564                Some(PriceLevel {
565                    price: crate::monitor::schema::decimal_optimized::fast_parse_decimal(&ask[0])
566                        .ok()?,
567                    quantity: crate::monitor::schema::decimal_optimized::fast_parse_decimal(
568                        &ask[1],
569                    )
570                    .ok()?,
571                    order_count: None,
572                })
573            } else {
574                None
575            }
576        })
577        .collect();
578
579    OrderBookRecord {
580        timestamp_exchange: msg.event_time * 1_000_000, // Convert ms to ns
581        timestamp_system: crate::monitor::schema::timestamp::now_nanos(),
582        symbol: msg.symbol,
583        exchange,
584        bids,
585        asks,
586        sequence: msg.final_update_id,
587        checksum: None,
588    }
589}