rusty_feeder/exchange/bithumb/
feeder.rs1use rusty_common::collections::FxHashMap;
7use smartstring::alias::String;
8use std::sync::Arc;
9
10use anyhow::Result;
11use async_trait::async_trait;
12use parking_lot::RwLock;
13use quanta::Clock;
14use smallvec::SmallVec;
15use tokio::sync::{mpsc, watch};
16
17use crate::feeder::{FeedStats, Feeder, FeederOptions};
18use rusty_model::{
19 PriceLevel,
20 data::{
21 bar::{Bar, BarAggregation, BarCache, BarType},
22 book_snapshot::OrderBookSnapshot,
23 market_trade::MarketTrade,
24 simd_orderbook::{SharedSimdOrderBook, SimdOrderBook},
25 },
26 enums::OrderSide,
27 instruments::InstrumentId,
28};
29
30use super::data::{
31 orderbook::{Orderbook, OrderbookMessage},
32 trade::TradeMessage,
33};
34
35#[derive(Debug)]
37pub struct BithumbFeeder {
38 stop_signals: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
40
41 orderbooks: Arc<RwLock<FxHashMap<String, SharedSimdOrderBook>>>,
43
44 bar_caches: Arc<RwLock<FxHashMap<String, Arc<RwLock<BarCache>>>>>,
46
47 clock: Clock,
49
50 stats: Arc<RwLock<FxHashMap<String, FeedStats>>>,
52}
53
54impl Default for BithumbFeeder {
55 fn default() -> Self {
56 Self::new()
57 }
58}
59
60impl BithumbFeeder {
61 #[must_use]
63 pub fn new() -> Self {
64 Self {
65 stop_signals: Arc::new(RwLock::new(FxHashMap::default())),
66 orderbooks: Arc::new(RwLock::new(FxHashMap::default())),
67 bar_caches: Arc::new(RwLock::new(FxHashMap::default())),
68 clock: Clock::new(),
69 stats: Arc::new(RwLock::new(FxHashMap::default())),
70 }
71 }
72}
73
74#[async_trait]
75impl Feeder for BithumbFeeder {
76 type DepthMessage = OrderbookMessage;
77 type TradeMessage = TradeMessage;
78
79 async fn start_feed_depth(
80 &self,
81 instrument_id: InstrumentId,
82 mut depth_rx: mpsc::Receiver<Self::DepthMessage>,
83 _options: Option<FeederOptions>,
84 ) -> Result<mpsc::Receiver<OrderBookSnapshot>> {
85 let symbol = instrument_id.symbol.clone();
87 let (tx, rx) = mpsc::channel(1024);
88
89 let ob = rusty_model::data::orderbook::OrderBook::<64>::new_empty(instrument_id.clone());
91 let shared_orderbook = SharedSimdOrderBook::from_orderbook(&ob);
92 self.orderbooks
93 .write()
94 .insert(symbol.clone(), shared_orderbook.clone());
95
96 let (stop_tx, _) = watch::channel(false);
98 let key = String::from(format!("depth:{symbol}"));
99 self.stop_signals.write().insert(key.clone(), stop_tx);
100 let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
101
102 let clock = self.clock.clone();
104
105 let stats = self.stats.clone();
107 let stats_key = String::from(format!("depth:{symbol}"));
108
109 stats.write().entry(stats_key.clone()).or_default();
111
112 tokio::spawn(async move {
113 loop {
114 tokio::select! {
115 _ = stop_rx.changed() => {
117 break;
118 }
119
120 Some(update) = depth_rx.recv() => {
122 if update.code != symbol {
124 continue;
125 }
126
127 let process_start = clock.raw();
129
130 let orderbook = Orderbook::from_orderbook_message(&update, process_start);
132
133 let bids_vec: SmallVec<[PriceLevel; 64]> = orderbook.bids.iter().cloned().collect();
135 let asks_vec: SmallVec<[PriceLevel; 64]> = orderbook.asks.iter().cloned().collect();
136
137 let model_orderbook = rusty_model::data::orderbook::OrderBook::new(
138 &symbol,
139 orderbook.timestamp_ns, process_start, bids_vec,
142 asks_vec,
143 );
144 shared_orderbook.write(|ob| {
145 *ob = SimdOrderBook::from_orderbook(&model_orderbook);
146 });
147
148 let bids_vec: SmallVec<[PriceLevel; 64]> = orderbook.bids.iter().cloned().collect();
151 let asks_vec: SmallVec<[PriceLevel; 64]> = orderbook.asks.iter().cloned().collect();
152
153 let depth = OrderBookSnapshot::new(
154 instrument_id.clone(),
155 bids_vec,
156 asks_vec,
157 0, orderbook.timestamp_ns, process_start, );
161
162 let process_end = clock.raw();
164 let latency_ns = process_end.saturating_sub(process_start);
165
166 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
168 feed_stats.messages_processed += 1;
169 feed_stats.avg_process_latency_ns =
170 (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; feed_stats.max_process_latency_ns =
172 feed_stats.max_process_latency_ns.max(latency_ns);
173 feed_stats.last_update_time = process_end;
174 }
175
176 if tx.send(depth).await.is_err() {
178 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
180 feed_stats.increment_dropped();
181 }
182 break;
183 }
184 }
185 }
186 }
187 });
188
189 Ok(rx)
190 }
191
192 async fn stop_feed_depth(&self, instrument_id: &InstrumentId) -> Result<()> {
193 let symbol = &instrument_id.symbol;
195 let key = String::from(format!("depth:{symbol}"));
196
197 self.orderbooks.write().remove(symbol);
199
200 if let Some(tx) = self.stop_signals.write().remove(&key) {
202 let _ = tx.send(true);
203 }
204
205 Ok(())
206 }
207
208 async fn start_feed_trades(
209 &self,
210 instrument_id: InstrumentId,
211 mut trade_rx: mpsc::Receiver<Self::TradeMessage>,
212 _options: Option<FeederOptions>,
213 ) -> Result<mpsc::Receiver<MarketTrade>> {
214 let symbol = instrument_id.symbol.clone();
216 let (tx, rx) = mpsc::channel(1024);
217
218 let (stop_tx, _) = watch::channel(false);
220 let key = String::from(format!("trades:{symbol}"));
221 self.stop_signals.write().insert(key.clone(), stop_tx);
222 let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
223
224 let clock = self.clock.clone();
226
227 let stats = self.stats.clone();
229 let stats_key = String::from(format!("trades:{symbol}"));
230
231 stats.write().entry(stats_key.clone()).or_default();
233
234 tokio::spawn(async move {
235 loop {
236 tokio::select! {
237 _ = stop_rx.changed() => {
239 break;
240 }
241
242 Some(trade_msg) = trade_rx.recv() => {
244 if trade_msg.code != symbol {
246 continue;
247 }
248
249 let process_start = clock.raw();
251
252 let side = match trade_msg.ask_bid.as_str() {
254 "BID" => OrderSide::Buy,
255 _ => OrderSide::Sell,
256 };
257
258 let trade = MarketTrade {
260 timestamp: clock.now(),
261 exchange_time_ns: trade_msg.trade_timestamp * 1_000_000, price: trade_msg.trade_price,
263 quantity: trade_msg.trade_volume,
264 direction: side,
265 instrument_id: instrument_id.clone(),
266 };
267
268 let process_end = clock.raw();
270 let latency_ns = process_end.saturating_sub(process_start);
271
272 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
274 feed_stats.messages_processed += 1;
275 feed_stats.avg_process_latency_ns =
276 (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; feed_stats.max_process_latency_ns =
278 feed_stats.max_process_latency_ns.max(latency_ns);
279 feed_stats.last_update_time = process_end;
280 }
281
282 if tx.send(trade).await.is_err() {
284 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
286 feed_stats.increment_dropped();
287 }
288 break;
289 }
290 }
291 }
292 }
293 });
294
295 Ok(rx)
296 }
297
298 async fn stop_feed_trades(&self, instrument_id: &InstrumentId) -> Result<()> {
299 let symbol = &instrument_id.symbol;
301 let key = String::from(format!("trades:{symbol}"));
302 if let Some(tx) = self.stop_signals.write().remove(&key) {
303 let _ = tx.send(true);
304 }
305
306 Ok(())
307 }
308
309 async fn start_feed_bars(
310 &self,
311 instrument_id: InstrumentId,
312 bar_type: BarType,
313 mut trade_rx: mpsc::Receiver<MarketTrade>,
314 _options: Option<FeederOptions>,
315 ) -> Result<mpsc::Receiver<Bar>> {
316 let symbol = instrument_id.symbol.clone();
317 let bar_type_str = bar_type.to_string(); let cache_key = String::from(format!("{symbol}:{bar_type_str}"));
319
320 let max_bars = 1000;
322
323 let bar_caches_arc = self.bar_caches.clone();
325 let clock_arc = self.clock.clone(); let instrument_id_clone = instrument_id.clone(); let task_bar_cache_arc = bar_caches_arc
331 .write()
332 .entry(cache_key) .or_insert_with(|| Arc::new(RwLock::new(BarCache::new())))
334 .clone(); let (tx, rx) = mpsc::channel(1024);
337
338 let interval_sec = match bar_type.get_spec().aggregation {
340 BarAggregation::Second => bar_type.get_spec().step,
341 BarAggregation::Minute => bar_type.get_spec().step * 60,
342 BarAggregation::Hour => bar_type.get_spec().step * 3600,
343 BarAggregation::Day => bar_type.get_spec().step * 86400,
344 _ => 60, };
346
347 let stats = self.stats.clone();
349 let stats_key = String::from(format!("bars:{symbol}:{interval_sec}"));
350
351 stats.write().entry(stats_key.clone()).or_default();
353
354 tokio::spawn(async move {
361 let mut current_bar: Option<Bar> = None;
362 let bar_interval_ns = rusty_model::data::bar::get_bar_interval_ns(&bar_type);
363
364 while let Some(trade) = trade_rx.recv().await {
365 let process_start = clock_arc.raw();
367
368 if trade.instrument_id != instrument_id_clone {
369 continue;
370 }
371
372 let trade_timestamp_ns = trade.exchange_time_ns; match current_bar.as_mut() {
375 Some(bar) => {
376 let bar_start_time_ns = bar.timestamp_ns; if trade_timestamp_ns >= bar_start_time_ns + bar_interval_ns {
378 if tx.send(bar.clone()).await.is_err() {
381 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
383 feed_stats.increment_dropped();
384 }
385 break; }
387 current_bar = Some(Bar {
389 bar_type: bar_type.clone(),
390 open: trade.price,
391 high: trade.price,
392 low: trade.price,
393 close: trade.price,
394 volume: trade.quantity,
395 timestamp_ns: trade_timestamp_ns,
396 });
397 } else {
398 bar.high = bar.high.max(trade.price);
400 bar.low = bar.low.min(trade.price);
401 bar.close = trade.price;
402 bar.volume += trade.quantity;
403 }
404 }
405 None => {
406 current_bar = Some(Bar {
409 bar_type: bar_type.clone(),
410 open: trade.price,
411 high: trade.price,
412 low: trade.price,
413 close: trade.price,
414 volume: trade.quantity,
415 timestamp_ns: trade_timestamp_ns,
416 });
417 }
418 }
419 if let Some(ref bar_to_cache) = current_bar {
425 task_bar_cache_arc.write().add_bar(bar_to_cache.clone());
428 }
429
430 let process_end = clock_arc.raw();
432 let latency_ns = process_end.saturating_sub(process_start);
433
434 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
436 feed_stats.messages_processed += 1;
437 feed_stats.avg_process_latency_ns =
438 (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; feed_stats.max_process_latency_ns =
440 feed_stats.max_process_latency_ns.max(latency_ns);
441 feed_stats.last_update_time = process_end;
442 }
443 }
444 });
445
446 Ok(rx)
447 }
448
449 async fn stop_feed_bars(&self, instrument_id: &InstrumentId, bar_type: &BarType) -> Result<()> {
450 let symbol = &instrument_id.symbol;
452 let interval_sec = match bar_type.get_spec().aggregation {
453 BarAggregation::Second => bar_type.get_spec().step,
454 BarAggregation::Minute => bar_type.get_spec().step * 60,
455 BarAggregation::Hour => bar_type.get_spec().step * 3600,
456 BarAggregation::Day => bar_type.get_spec().step * 86400,
457 _ => return Err(anyhow::anyhow!("Only time-based bars are supported")),
458 };
459
460 let key = String::from(format!("bars:{symbol}:{interval_sec}"));
461 if let Some(tx) = self.stop_signals.write().remove(&key) {
462 let _ = tx.send(true);
463 }
464
465 Ok(())
466 }
467
468 async fn get_shared_orderbook(
469 &self,
470 instrument_id: &InstrumentId,
471 ) -> Result<SharedSimdOrderBook> {
472 let symbol = &instrument_id.symbol;
474
475 let orderbooks = self.orderbooks.read();
477 if let Some(orderbook) = orderbooks.get(symbol) {
478 Ok(orderbook.clone())
479 } else {
480 let instrument_id =
482 InstrumentId::new(symbol.clone(), rusty_model::venues::Venue::Bithumb);
483 let ob = rusty_model::data::orderbook::OrderBook::<64>::new_empty(instrument_id);
484 let orderbook = SharedSimdOrderBook::from_orderbook(&ob);
485 drop(orderbooks); self.orderbooks
489 .write()
490 .insert(symbol.clone(), orderbook.clone());
491
492 Ok(orderbook)
493 }
494 }
495
496 async fn get_bar_cache(
497 &self,
498 instrument_id: &InstrumentId,
499 bar_type: &BarType,
500 max_bars: usize,
501 ) -> Result<Arc<RwLock<BarCache>>> {
502 let key = String::from(format!("{}:{}", instrument_id.symbol, bar_type));
504
505 let caches = self.bar_caches.read();
507 if let Some(cache) = caches.get(&key) {
508 return Ok(cache.clone());
509 }
510 drop(caches); let cache = Arc::new(RwLock::new(BarCache::new()));
514 self.bar_caches.write().insert(key, cache.clone());
515
516 Ok(cache)
517 }
518
519 async fn get_stats(&self, instrument_id: &InstrumentId) -> Result<FeedStats> {
520 let key = String::from(format!("depth:{}", instrument_id.symbol));
522 if let Some(stats) = self.stats.read().get(&key) {
523 return Ok(stats.clone());
524 }
525
526 let key = String::from(format!("trades:{}", instrument_id.symbol));
528 if let Some(stats) = self.stats.read().get(&key) {
529 return Ok(stats.clone());
530 }
531
532 Ok(FeedStats::default())
534 }
535
536 async fn reset_stats(&self) -> Result<()> {
537 self.stats.write().clear();
538 Ok(())
539 }
540}