1use crate::monitor::collector::{
7 CollectionError, CollectionStatus, DataType, MarketDataEvent, Result,
8};
9use crate::monitor::schema::{OrderBookRecord, PriceLevel, TradeRecord, TradeSide, timestamp};
10use flume::Sender;
11use rusty_common::collections::FxHashMap;
12use rusty_feeder::Provider;
13use rusty_feeder::exchange::binance::futures::data::{
14 orderbook::OrderbookMessage, trade::AggTradeMessage,
15};
16use rusty_feeder::exchange::{
17 binance::futures::provider::{BinanceFuturesMarketType, BinanceFuturesProvider},
18 bithumb::provider::BithumbProvider,
19 bybit::provider::BybitProvider,
20 coinbase::provider::CoinbaseProvider,
21 upbit::provider::UpbitProvider,
22};
23use rusty_feeder::provider::config::ConnectionConfig;
24use rusty_model::{
25 data::{book_snapshot::OrderBookSnapshot, market_trade::MarketTrade},
26 enums::OrderSide,
27};
28use smallvec::SmallVec;
29use smartstring::alias::String as SmartString;
30use std::sync::Arc;
31use tokio::sync::RwLock;
32use tokio::task::JoinHandle;
33
34pub enum ExchangeProvider {
36 BinanceFutures(Box<BinanceFuturesProvider>),
38 Upbit(Box<UpbitProvider>),
40 Bybit(Box<BybitProvider>),
42 Coinbase(Box<CoinbaseProvider>),
44 Bithumb(Box<BithumbProvider>),
46}
47
48impl ExchangeProvider {
50 pub async fn new(exchange_name: &str) -> Result<Self> {
52 match exchange_name.to_lowercase().as_str() {
53 "binance" => {
54 let provider = BinanceFuturesProvider::new(
56 ConnectionConfig::default(),
57 BinanceFuturesMarketType::UsdM,
58 );
59 Ok(Self::BinanceFutures(Box::new(provider)))
60 }
61 "upbit" => {
62 let provider = UpbitProvider::new();
63 Ok(Self::Upbit(Box::new(provider)))
64 }
65 "bybit" => {
66 let provider = BybitProvider::new_spot();
67 Ok(Self::Bybit(Box::new(provider)))
68 }
69 "coinbase" => {
70 let provider = CoinbaseProvider::new();
71 Ok(Self::Coinbase(Box::new(provider)))
72 }
73 "bithumb" => {
74 let provider = BithumbProvider::new();
75 Ok(Self::Bithumb(Box::new(provider)))
76 }
77 _ => Err(CollectionError::Configuration(format!(
78 "Unsupported exchange: {exchange_name}"
79 ))),
80 }
81 }
82
83 #[must_use]
85 pub const fn name(&self) -> &'static str {
86 match self {
87 Self::BinanceFutures(_) => "binance",
88 Self::Upbit(_) => "upbit",
89 Self::Bybit(_) => "bybit",
90 Self::Coinbase(_) => "coinbase",
91 Self::Bithumb(_) => "bithumb",
92 }
93 }
94
95 pub async fn is_connected(&self) -> bool {
97 match self {
98 Self::BinanceFutures(p) => Provider::is_connected(p.as_ref()).await,
99 Self::Upbit(p) => p.as_ref().is_connected().await,
100 Self::Bybit(p) => p.is_connected().await,
101 Self::Coinbase(p) => p.is_connected().await,
102 Self::Bithumb(p) => p.is_connected().await,
103 }
104 }
105
106 pub async fn shutdown(&mut self) -> Result<()> {
111 match self {
112 Self::BinanceFutures(p) => {
113 p.shutdown()
114 .await
115 .map_err(|e| CollectionError::ConnectionFailed {
116 exchange: "binance".to_string(),
117 reason: e.to_string(),
118 })
119 }
120 Self::Upbit(p) => p
121 .shutdown()
122 .await
123 .map_err(|e| CollectionError::ConnectionFailed {
124 exchange: "upbit".to_string(),
125 reason: e.to_string(),
126 }),
127 Self::Bybit(p) => p
128 .shutdown()
129 .await
130 .map_err(|e| CollectionError::ConnectionFailed {
131 exchange: "bybit".to_string(),
132 reason: e.to_string(),
133 }),
134 Self::Coinbase(p) => {
135 p.shutdown()
136 .await
137 .map_err(|e| CollectionError::ConnectionFailed {
138 exchange: "coinbase".to_string(),
139 reason: e.to_string(),
140 })
141 }
142 Self::Bithumb(p) => p
143 .shutdown()
144 .await
145 .map_err(|e| CollectionError::ConnectionFailed {
146 exchange: "bithumb".to_string(),
147 reason: e.to_string(),
148 }),
149 }
150 }
151}
152
153pub struct ExchangeClient {
155 exchange_name: SmartString,
156 provider: Option<ExchangeProvider>,
157 active_symbols: Arc<RwLock<FxHashMap<SmartString, Vec<DataType>>>>,
158 trade_handles: Arc<RwLock<Vec<JoinHandle<()>>>>,
159 orderbook_handles: Arc<RwLock<Vec<JoinHandle<()>>>>,
160}
161
162impl ExchangeClient {
163 #[must_use]
165 pub fn new(exchange_name: SmartString) -> Self {
166 Self {
167 exchange_name,
168 provider: None,
169 active_symbols: Arc::new(RwLock::new(FxHashMap::default())),
170 trade_handles: Arc::new(RwLock::new(Vec::new())),
171 orderbook_handles: Arc::new(RwLock::new(Vec::new())),
172 }
173 }
174
175 pub async fn connect(&mut self) -> Result<()> {
177 log::info!("Connecting to exchange: {}", self.exchange_name);
178
179 let mut provider = ExchangeProvider::new(&self.exchange_name).await?;
180
181 match &mut provider {
183 ExchangeProvider::BinanceFutures(p) => {
184 p.init()
185 .await
186 .map_err(|e| CollectionError::ConnectionFailed {
187 exchange: "binance".to_string(),
188 reason: e.to_string(),
189 })?
190 }
191 ExchangeProvider::Upbit(p) => {
192 p.init()
193 .await
194 .map_err(|e| CollectionError::ConnectionFailed {
195 exchange: "upbit".to_string(),
196 reason: e.to_string(),
197 })?
198 }
199 ExchangeProvider::Bybit(p) => {
200 p.init()
201 .await
202 .map_err(|e| CollectionError::ConnectionFailed {
203 exchange: "bybit".to_string(),
204 reason: e.to_string(),
205 })?
206 }
207 ExchangeProvider::Coinbase(p) => {
208 p.init()
209 .await
210 .map_err(|e| CollectionError::ConnectionFailed {
211 exchange: "coinbase".to_string(),
212 reason: e.to_string(),
213 })?
214 }
215 ExchangeProvider::Bithumb(p) => {
216 p.init()
217 .await
218 .map_err(|e| CollectionError::ConnectionFailed {
219 exchange: "bithumb".to_string(),
220 reason: e.to_string(),
221 })?
222 }
223 }
224
225 self.provider = Some(provider);
226
227 Ok(())
228 }
229
230 pub async fn disconnect(&mut self) -> Result<()> {
232 log::info!("Disconnecting from exchange: {}", self.exchange_name);
233
234 let mut trade_handles = self.trade_handles.write().await;
236 for handle in trade_handles.drain(..) {
237 handle.abort();
238 }
239
240 let mut orderbook_handles = self.orderbook_handles.write().await;
241 for handle in orderbook_handles.drain(..) {
242 handle.abort();
243 }
244
245 if let Some(mut provider) = self.provider.take() {
247 provider.shutdown().await?;
248 }
249
250 self.active_symbols.write().await.clear();
252
253 Ok(())
254 }
255
256 pub async fn start_collection(
258 &mut self,
259 symbols: Vec<SmartString>,
260 data_types: Vec<DataType>,
261 event_sender: Sender<MarketDataEvent>,
262 ) -> Result<()> {
263 let provider = self
264 .provider
265 .as_ref()
266 .ok_or_else(|| CollectionError::ConnectionFailed {
267 exchange: self.exchange_name.to_string(),
268 reason: "Provider not connected".to_string(),
269 })?;
270
271 let exchange_name = self.exchange_name.clone();
272
273 {
275 let mut active_symbols = self.active_symbols.write().await;
276 for symbol in &symbols {
277 active_symbols.insert(symbol.clone(), data_types.clone());
278 }
279 }
280
281 if data_types.contains(&DataType::Trades) {
283 let handle = self
284 .spawn_trade_processor(
285 exchange_name.clone(),
286 provider,
287 symbols.clone(),
288 event_sender.clone(),
289 )
290 .await?;
291
292 self.trade_handles.write().await.push(handle);
293 }
294
295 if data_types.contains(&DataType::OrderBook) {
297 let handle = self
298 .spawn_orderbook_processor(
299 exchange_name.clone(),
300 provider,
301 symbols.clone(),
302 event_sender.clone(),
303 )
304 .await?;
305
306 self.orderbook_handles.write().await.push(handle);
307 }
308
309 Ok(())
310 }
311
312 pub async fn get_status(&self) -> Option<CollectionStatus> {
314 let provider = self.provider.as_ref()?;
315 let active_symbols = self.active_symbols.read().await;
316
317 if active_symbols.is_empty() {
318 return None;
319 }
320
321 let (symbol, data_types) = active_symbols.iter().next()?;
323
324 Some(CollectionStatus {
325 exchange: self.exchange_name.clone(),
326 symbol: symbol.clone(),
327 data_types: data_types.clone(),
328 connected: provider.is_connected().await,
329 last_seen: Some(timestamp::now_nanos()),
330 stats: Default::default(),
331 error_count: 0,
332 last_error: None,
333 })
334 }
335
336 async fn spawn_trade_processor(
338 &self,
339 exchange_name: SmartString,
340 provider: &ExchangeProvider,
341 symbols: Vec<SmartString>,
342 event_sender: Sender<MarketDataEvent>,
343 ) -> Result<JoinHandle<()>> {
344 let symbols_small: SmallVec<[SmartString; 8]> = symbols.iter().cloned().collect();
346
347 match provider {
348 ExchangeProvider::BinanceFutures(p) => {
349 let mut receiver = p.subscribe_trades(symbols_small).await.map_err(|e| {
350 CollectionError::SubscriptionError {
351 exchange: "binance".to_string(),
352 reason: e.to_string(),
353 }
354 })?;
355
356 let exchange_name = exchange_name.clone();
357 let func = async move {
358 log::info!("Started trade processor for exchange: {exchange_name}");
359
360 while let Some(msg) = receiver.recv().await {
361 match convert_binance_trade(msg, exchange_name.clone()) {
363 Ok(trade_record) => {
364 let event = MarketDataEvent::Trade(trade_record);
365
366 if let Err(e) = event_sender.send(event) {
367 log::error!("Failed to send trade event: {e}");
368 break;
369 }
370 }
371 Err(e) => {
372 log::error!("Failed to convert Binance trade: {e}");
373 }
375 }
376 }
377
378 log::info!("Trade processor stopped for exchange: {exchange_name}");
379 };
380
381 Ok(tokio::spawn(func))
382 }
383 _ => {
384 Ok(tokio::spawn(async move {
387 log::warn!("Trade processor not yet implemented for exchange: {exchange_name}");
388 }))
389 }
390 }
391 }
392
393 async fn spawn_orderbook_processor(
395 &self,
396 exchange_name: SmartString,
397 provider: &ExchangeProvider,
398 symbols: Vec<SmartString>,
399 event_sender: Sender<MarketDataEvent>,
400 ) -> Result<JoinHandle<()>> {
401 let symbols_small: SmallVec<[SmartString; 8]> = symbols.iter().cloned().collect();
403
404 match provider {
405 ExchangeProvider::BinanceFutures(p) => {
406 let mut receiver = p.subscribe_orderbook(symbols_small).await.map_err(|e| {
407 CollectionError::SubscriptionError {
408 exchange: "binance".to_string(),
409 reason: e.to_string(),
410 }
411 })?;
412
413 let exchange_name = exchange_name.clone();
414 Ok(tokio::spawn(async move {
415 log::info!("Started orderbook processor for exchange: {exchange_name}");
416
417 while let Some(msg) = receiver.recv().await {
418 let orderbook_record =
420 convert_binance_orderbook(msg, exchange_name.clone());
421
422 let event = MarketDataEvent::OrderBook(orderbook_record);
423
424 if let Err(e) = event_sender.send(event) {
425 log::error!("Failed to send orderbook event: {e}");
426 break;
427 }
428 }
429
430 log::info!("Orderbook processor stopped for exchange: {exchange_name}");
431 }))
432 }
433 _ => {
434 Ok(tokio::spawn(async move {
437 log::warn!(
438 "Orderbook processor not yet implemented for exchange: {exchange_name}"
439 );
440 }))
441 }
442 }
443 }
444}
445
446#[must_use]
448pub fn convert_trade(trade: MarketTrade, exchange: SmartString) -> TradeRecord {
449 TradeRecord {
450 timestamp_exchange: trade.exchange_time_ns,
451 timestamp_system: crate::monitor::schema::timestamp::now_nanos(),
452 symbol: trade.instrument_id.symbol.to_string().into(),
453 exchange,
454 price: trade.price,
455 quantity: trade.quantity,
456 side: match trade.direction {
457 OrderSide::Buy => TradeSide::Buy,
458 OrderSide::Sell => TradeSide::Sell,
459 },
460 trade_id: format!("{}", trade.exchange_time_ns).into(), buyer_order_id: None, seller_order_id: None, sequence: 0, }
465}
466
467#[must_use]
469pub fn convert_orderbook(snapshot: OrderBookSnapshot, exchange: SmartString) -> OrderBookRecord {
470 let bids: Vec<PriceLevel> = snapshot
471 .bids
472 .iter()
473 .map(|level| PriceLevel {
474 price: level.price,
475 quantity: level.quantity,
476 order_count: None, })
478 .collect();
479
480 let asks: Vec<PriceLevel> = snapshot
481 .asks
482 .iter()
483 .map(|level| PriceLevel {
484 price: level.price,
485 quantity: level.quantity,
486 order_count: None, })
488 .collect();
489
490 OrderBookRecord {
491 timestamp_exchange: snapshot.timestamp_event,
492 timestamp_system: crate::monitor::schema::timestamp::now_nanos(),
493 symbol: snapshot.instrument_id.symbol.to_string().into(),
494 exchange,
495 bids,
496 asks,
497 sequence: snapshot.sequence_id,
498 checksum: None, }
500}
501
502pub fn convert_binance_trade(msg: AggTradeMessage, exchange: SmartString) -> Result<TradeRecord> {
504 let price =
505 crate::monitor::schema::decimal_optimized::fast_parse_decimal(&msg.price).map_err(|e| {
506 CollectionError::DataProcessing(format!("Failed to parse price '{}': {}", msg.price, e))
507 })?;
508
509 let quantity = crate::monitor::schema::decimal_optimized::fast_parse_decimal(&msg.quantity)
510 .map_err(|e| {
511 CollectionError::DataProcessing(format!(
512 "Failed to parse quantity '{}': {}",
513 msg.quantity, e
514 ))
515 })?;
516
517 Ok(TradeRecord {
518 timestamp_exchange: msg.trade_time * 1_000_000, timestamp_system: crate::monitor::schema::timestamp::now_nanos(),
520 symbol: msg.symbol,
521 exchange,
522 price,
523 quantity,
524 side: if msg.is_buyer_market_maker {
525 TradeSide::Sell
526 } else {
527 TradeSide::Buy
528 }, trade_id: msg.agg_trade_id.to_string().into(),
530 buyer_order_id: None,
531 seller_order_id: None,
532 sequence: 0,
533 })
534}
535
536#[must_use]
538pub fn convert_binance_orderbook(msg: OrderbookMessage, exchange: SmartString) -> OrderBookRecord {
539 let bids: Vec<PriceLevel> = msg
540 .bids
541 .iter()
542 .filter_map(|bid| {
543 if bid.len() >= 2 {
544 Some(PriceLevel {
545 price: crate::monitor::schema::decimal_optimized::fast_parse_decimal(&bid[0])
546 .ok()?,
547 quantity: crate::monitor::schema::decimal_optimized::fast_parse_decimal(
548 &bid[1],
549 )
550 .ok()?,
551 order_count: None,
552 })
553 } else {
554 None
555 }
556 })
557 .collect();
558
559 let asks: Vec<PriceLevel> = msg
560 .asks
561 .iter()
562 .filter_map(|ask| {
563 if ask.len() >= 2 {
564 Some(PriceLevel {
565 price: crate::monitor::schema::decimal_optimized::fast_parse_decimal(&ask[0])
566 .ok()?,
567 quantity: crate::monitor::schema::decimal_optimized::fast_parse_decimal(
568 &ask[1],
569 )
570 .ok()?,
571 order_count: None,
572 })
573 } else {
574 None
575 }
576 })
577 .collect();
578
579 OrderBookRecord {
580 timestamp_exchange: msg.event_time * 1_000_000, timestamp_system: crate::monitor::schema::timestamp::now_nanos(),
582 symbol: msg.symbol,
583 exchange,
584 bids,
585 asks,
586 sequence: msg.final_update_id,
587 checksum: None,
588 }
589}