rusty_feeder/exchange/bithumb/
feeder.rs

1/*
2 * Bithumb market data feeder implementation
3 * Normalizes exchange-specific data structures to common model formats
4 */
5
6use rusty_common::collections::FxHashMap;
7use smartstring::alias::String;
8use std::sync::Arc;
9
10use anyhow::Result;
11use async_trait::async_trait;
12use parking_lot::RwLock;
13use quanta::Clock;
14use smallvec::SmallVec;
15use tokio::sync::{mpsc, watch};
16
17use crate::feeder::{FeedStats, Feeder, FeederOptions};
18use rusty_model::{
19    PriceLevel,
20    data::{
21        bar::{Bar, BarAggregation, BarCache, BarType},
22        book_snapshot::OrderBookSnapshot,
23        market_trade::MarketTrade,
24        simd_orderbook::{SharedSimdOrderBook, SimdOrderBook},
25    },
26    enums::OrderSide,
27    instruments::InstrumentId,
28};
29
30use super::data::{
31    orderbook::{Orderbook, OrderbookMessage},
32    trade::TradeMessage,
33};
34
35/// Bithumb market data feeder
36#[derive(Debug)]
37pub struct BithumbFeeder {
38    /// Stop signals for active feeds
39    stop_signals: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
40
41    /// Shared orderbooks for each instrument
42    orderbooks: Arc<RwLock<FxHashMap<String, SharedSimdOrderBook>>>,
43
44    /// Bar caches for each instrument and interval
45    bar_caches: Arc<RwLock<FxHashMap<String, Arc<RwLock<BarCache>>>>>,
46
47    /// High-precision clock for time synchronization
48    clock: Clock,
49
50    /// Feed statistics
51    stats: Arc<RwLock<FxHashMap<String, FeedStats>>>,
52}
53
54impl Default for BithumbFeeder {
55    fn default() -> Self {
56        Self::new()
57    }
58}
59
60impl BithumbFeeder {
61    /// Create a new Bithumb feeder
62    #[must_use]
63    pub fn new() -> Self {
64        Self {
65            stop_signals: Arc::new(RwLock::new(FxHashMap::default())),
66            orderbooks: Arc::new(RwLock::new(FxHashMap::default())),
67            bar_caches: Arc::new(RwLock::new(FxHashMap::default())),
68            clock: Clock::new(),
69            stats: Arc::new(RwLock::new(FxHashMap::default())),
70        }
71    }
72}
73
74#[async_trait]
75impl Feeder for BithumbFeeder {
76    type DepthMessage = OrderbookMessage;
77    type TradeMessage = TradeMessage;
78
79    async fn start_feed_depth(
80        &self,
81        instrument_id: InstrumentId,
82        mut depth_rx: mpsc::Receiver<Self::DepthMessage>,
83        _options: Option<FeederOptions>,
84    ) -> Result<mpsc::Receiver<OrderBookSnapshot>> {
85        // Extract symbol from InstrumentId
86        let symbol = instrument_id.symbol.clone();
87        let (tx, rx) = mpsc::channel(1024);
88
89        // Create a shared orderbook and store it
90        let ob = rusty_model::data::orderbook::OrderBook::<64>::new_empty(instrument_id.clone());
91        let shared_orderbook = SharedSimdOrderBook::from_orderbook(&ob);
92        self.orderbooks
93            .write()
94            .insert(symbol.clone(), shared_orderbook.clone());
95
96        // Create stop signal
97        let (stop_tx, _) = watch::channel(false);
98        let key = String::from(format!("depth:{symbol}"));
99        self.stop_signals.write().insert(key.clone(), stop_tx);
100        let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
101
102        // Clone clock for use in the task
103        let clock = self.clock.clone();
104
105        // Clone stats for updating feed statistics
106        let stats = self.stats.clone();
107        let stats_key = String::from(format!("depth:{symbol}"));
108
109        // Initialize stats for this feed if not already present
110        stats.write().entry(stats_key.clone()).or_default();
111
112        tokio::spawn(async move {
113            loop {
114                tokio::select! {
115                    // Check for stop signal
116                    _ = stop_rx.changed() => {
117                        break;
118                    }
119
120                    // Process incoming depth updates
121                    Some(update) = depth_rx.recv() => {
122                        // Skip if not for our symbol
123                        if update.code != symbol {
124                            continue;
125                        }
126
127                        // Capture processing start time
128                        let process_start = clock.raw();
129
130                        // Process the orderbook message
131                        let orderbook = Orderbook::from_orderbook_message(&update, process_start);
132
133                        // Update the shared orderbook model
134                        let bids_vec: SmallVec<[PriceLevel; 64]> = orderbook.bids.iter().cloned().collect();
135                        let asks_vec: SmallVec<[PriceLevel; 64]> = orderbook.asks.iter().cloned().collect();
136
137                        let model_orderbook = rusty_model::data::orderbook::OrderBook::new(
138                            &symbol,
139                            orderbook.timestamp_ns, // Use exchange timestamp
140                            process_start,          // Use local timestamp
141                            bids_vec,
142                            asks_vec,
143                        );
144                        shared_orderbook.write(|ob| {
145                            *ob = SimdOrderBook::from_orderbook(&model_orderbook);
146                        });
147
148                        // Create OrderBookSnapshot from the current state
149                        // The orderbook already contains PriceLevel objects, so we can clone them
150                        let bids_vec: SmallVec<[PriceLevel; 64]> = orderbook.bids.iter().cloned().collect();
151                        let asks_vec: SmallVec<[PriceLevel; 64]> = orderbook.asks.iter().cloned().collect();
152
153                        let depth = OrderBookSnapshot::new(
154                            instrument_id.clone(),
155                            bids_vec,
156                            asks_vec,
157                            0, // No sequence number for Bithumb
158                            orderbook.timestamp_ns, // Exchange time in ns
159                            process_start, // Local time in ns
160                        );
161
162                        // Calculate processing latency
163                        let process_end = clock.raw();
164                        let latency_ns = process_end.saturating_sub(process_start);
165
166                        // Update statistics
167                        if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
168                            feed_stats.messages_processed += 1;
169                            feed_stats.avg_process_latency_ns =
170                                (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; // Moving average
171                            feed_stats.max_process_latency_ns =
172                                feed_stats.max_process_latency_ns.max(latency_ns);
173                            feed_stats.last_update_time = process_end;
174                        }
175
176                        // Send the order book depth
177                        if tx.send(depth).await.is_err() {
178                            // Update dropped message count on error
179                            if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
180                                feed_stats.increment_dropped();
181                            }
182                            break;
183                        }
184                    }
185                }
186            }
187        });
188
189        Ok(rx)
190    }
191
192    async fn stop_feed_depth(&self, instrument_id: &InstrumentId) -> Result<()> {
193        // Extract symbol from InstrumentId
194        let symbol = &instrument_id.symbol;
195        let key = String::from(format!("depth:{symbol}"));
196
197        // Remove the orderbook
198        self.orderbooks.write().remove(symbol);
199
200        // Send stop signal
201        if let Some(tx) = self.stop_signals.write().remove(&key) {
202            let _ = tx.send(true);
203        }
204
205        Ok(())
206    }
207
208    async fn start_feed_trades(
209        &self,
210        instrument_id: InstrumentId,
211        mut trade_rx: mpsc::Receiver<Self::TradeMessage>,
212        _options: Option<FeederOptions>,
213    ) -> Result<mpsc::Receiver<MarketTrade>> {
214        // Extract symbol from InstrumentId
215        let symbol = instrument_id.symbol.clone();
216        let (tx, rx) = mpsc::channel(1024);
217
218        // Create stop signal
219        let (stop_tx, _) = watch::channel(false);
220        let key = String::from(format!("trades:{symbol}"));
221        self.stop_signals.write().insert(key.clone(), stop_tx);
222        let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
223
224        // Clone clock for use in the task
225        let clock = self.clock.clone();
226
227        // Clone stats for updating feed statistics
228        let stats = self.stats.clone();
229        let stats_key = String::from(format!("trades:{symbol}"));
230
231        // Initialize stats for this feed if not already present
232        stats.write().entry(stats_key.clone()).or_default();
233
234        tokio::spawn(async move {
235            loop {
236                tokio::select! {
237                    // Check for stop signal
238                    _ = stop_rx.changed() => {
239                        break;
240                    }
241
242                    // Process incoming trade messages
243                    Some(trade_msg) = trade_rx.recv() => {
244                        // Skip if not for our symbol
245                        if trade_msg.code != symbol {
246                            continue;
247                        }
248
249                        // Capture processing start time
250                        let process_start = clock.raw();
251
252                        // Convert to our transaction type
253                        let side = match trade_msg.ask_bid.as_str() {
254                            "BID" => OrderSide::Buy,
255                            _ => OrderSide::Sell,
256                        };
257
258                        // Create Trade
259                        let trade = MarketTrade {
260                            timestamp: clock.now(),
261                            exchange_time_ns: trade_msg.trade_timestamp * 1_000_000, // Convert ms to ns
262                            price: trade_msg.trade_price,
263                            quantity: trade_msg.trade_volume,
264                            direction: side,
265                            instrument_id: instrument_id.clone(),
266                        };
267
268                        // Calculate processing latency
269                        let process_end = clock.raw();
270                        let latency_ns = process_end.saturating_sub(process_start);
271
272                        // Update statistics
273                        if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
274                            feed_stats.messages_processed += 1;
275                            feed_stats.avg_process_latency_ns =
276                                (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; // Moving average
277                            feed_stats.max_process_latency_ns =
278                                feed_stats.max_process_latency_ns.max(latency_ns);
279                            feed_stats.last_update_time = process_end;
280                        }
281
282                        // Send the trade
283                        if tx.send(trade).await.is_err() {
284                            // Update dropped message count on error
285                            if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
286                                feed_stats.increment_dropped();
287                            }
288                            break;
289                        }
290                    }
291                }
292            }
293        });
294
295        Ok(rx)
296    }
297
298    async fn stop_feed_trades(&self, instrument_id: &InstrumentId) -> Result<()> {
299        // Extract symbol from InstrumentId
300        let symbol = &instrument_id.symbol;
301        let key = String::from(format!("trades:{symbol}"));
302        if let Some(tx) = self.stop_signals.write().remove(&key) {
303            let _ = tx.send(true);
304        }
305
306        Ok(())
307    }
308
309    async fn start_feed_bars(
310        &self,
311        instrument_id: InstrumentId,
312        bar_type: BarType,
313        mut trade_rx: mpsc::Receiver<MarketTrade>,
314        _options: Option<FeederOptions>,
315    ) -> Result<mpsc::Receiver<Bar>> {
316        let symbol = instrument_id.symbol.clone();
317        let bar_type_str = bar_type.to_string(); // BarType now implements Display
318        let cache_key = String::from(format!("{symbol}:{bar_type_str}"));
319
320        // Determine bar cache capacity
321        let max_bars = 1000;
322
323        // Clone Arcs for the spawned task
324        let bar_caches_arc = self.bar_caches.clone();
325        let clock_arc = self.clock.clone(); // Ensure clock is also shareable if needed inside, or pass initial time
326        let instrument_id_clone = instrument_id.clone(); // instrument_id is already Clone
327
328        // Get or create the specific BarCache for this task ONCE.
329        // The `entry` API consumes `cache_key`.
330        let task_bar_cache_arc = bar_caches_arc
331            .write()
332            .entry(cache_key) // cache_key (String) is moved here
333            .or_insert_with(|| Arc::new(RwLock::new(BarCache::new())))
334            .clone(); // Clone the Arc<RwLock<BarCache>> for use in the task
335
336        let (tx, rx) = mpsc::channel(1024);
337
338        // Get interval from bar_type for stats key
339        let interval_sec = match bar_type.get_spec().aggregation {
340            BarAggregation::Second => bar_type.get_spec().step,
341            BarAggregation::Minute => bar_type.get_spec().step * 60,
342            BarAggregation::Hour => bar_type.get_spec().step * 3600,
343            BarAggregation::Day => bar_type.get_spec().step * 86400,
344            _ => 60, // Default to 1 minute if not a time-based bar
345        };
346
347        // Clone stats for updating feed statistics
348        let stats = self.stats.clone();
349        let stats_key = String::from(format!("bars:{symbol}:{interval_sec}"));
350
351        // Initialize stats for this feed if not already present
352        stats.write().entry(stats_key.clone()).or_default();
353
354        // Stop signal handling (if needed for bars, not shown in original snippet for this error)
355        // let (stop_tx, _) = watch::channel(false);
356        // let stop_key = format!("bar:{}:{}", symbol, bar_type_str);
357        // self.stop_signals.write().insert(stop_key.clone(), stop_tx);
358        // let mut stop_rx = self.stop_signals.read().get(&stop_key).unwrap().subscribe();
359
360        tokio::spawn(async move {
361            let mut current_bar: Option<Bar> = None;
362            let bar_interval_ns = rusty_model::data::bar::get_bar_interval_ns(&bar_type);
363
364            while let Some(trade) = trade_rx.recv().await {
365                // Capture processing start time
366                let process_start = clock_arc.raw();
367
368                if trade.instrument_id != instrument_id_clone {
369                    continue;
370                }
371
372                let trade_timestamp_ns = trade.exchange_time_ns; // exchange_time_ns is u64
373
374                match current_bar.as_mut() {
375                    Some(bar) => {
376                        let bar_start_time_ns = bar.timestamp_ns; // Convert Instant to raw nanosecond count
377                        if trade_timestamp_ns >= bar_start_time_ns + bar_interval_ns {
378                            // Finalize current bar
379                            // Bar is already finalized with current values
380                            if tx.send(bar.clone()).await.is_err() {
381                                // Update dropped message count on error
382                                if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
383                                    feed_stats.increment_dropped();
384                                }
385                                break; // Channel closed
386                            }
387                            // Start new bar
388                            current_bar = Some(Bar {
389                                bar_type: bar_type.clone(),
390                                open: trade.price,
391                                high: trade.price,
392                                low: trade.price,
393                                close: trade.price,
394                                volume: trade.quantity,
395                                timestamp_ns: trade_timestamp_ns,
396                            });
397                        } else {
398                            // Update existing bar
399                            bar.high = bar.high.max(trade.price);
400                            bar.low = bar.low.min(trade.price);
401                            bar.close = trade.price;
402                            bar.volume += trade.quantity;
403                        }
404                    }
405                    None => {
406                        // Start first bar, aligning to interval if necessary
407                        // This alignment logic can be complex. For now, just using trade time.
408                        current_bar = Some(Bar {
409                            bar_type: bar_type.clone(),
410                            open: trade.price,
411                            high: trade.price,
412                            low: trade.price,
413                            close: trade.price,
414                            volume: trade.quantity,
415                            timestamp_ns: trade_timestamp_ns,
416                        });
417                    }
418                }
419                // After processing a trade, if the bar is considered "complete" by some logic
420                // (e.g. end of its interval), then it should be sent and current_bar reset.
421                // The example above tries to handle time-based bar completion.
422
423                // Add to the task-specific cache (perhaps only when bar is finalized)
424                if let Some(ref bar_to_cache) = current_bar {
425                    // Bar is complete, add to cache
426                    // Example: only cache closed bars
427                    task_bar_cache_arc.write().add_bar(bar_to_cache.clone());
428                }
429
430                // Calculate processing latency
431                let process_end = clock_arc.raw();
432                let latency_ns = process_end.saturating_sub(process_start);
433
434                // Update statistics
435                if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
436                    feed_stats.messages_processed += 1;
437                    feed_stats.avg_process_latency_ns =
438                        (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; // Moving average
439                    feed_stats.max_process_latency_ns =
440                        feed_stats.max_process_latency_ns.max(latency_ns);
441                    feed_stats.last_update_time = process_end;
442                }
443            }
444        });
445
446        Ok(rx)
447    }
448
449    async fn stop_feed_bars(&self, instrument_id: &InstrumentId, bar_type: &BarType) -> Result<()> {
450        // Extract symbol and interval from parameters
451        let symbol = &instrument_id.symbol;
452        let interval_sec = match bar_type.get_spec().aggregation {
453            BarAggregation::Second => bar_type.get_spec().step,
454            BarAggregation::Minute => bar_type.get_spec().step * 60,
455            BarAggregation::Hour => bar_type.get_spec().step * 3600,
456            BarAggregation::Day => bar_type.get_spec().step * 86400,
457            _ => return Err(anyhow::anyhow!("Only time-based bars are supported")),
458        };
459
460        let key = String::from(format!("bars:{symbol}:{interval_sec}"));
461        if let Some(tx) = self.stop_signals.write().remove(&key) {
462            let _ = tx.send(true);
463        }
464
465        Ok(())
466    }
467
468    async fn get_shared_orderbook(
469        &self,
470        instrument_id: &InstrumentId,
471    ) -> Result<SharedSimdOrderBook> {
472        // Extract symbol from InstrumentId
473        let symbol = &instrument_id.symbol;
474
475        // Get existing orderbook or create a new one
476        let orderbooks = self.orderbooks.read();
477        if let Some(orderbook) = orderbooks.get(symbol) {
478            Ok(orderbook.clone())
479        } else {
480            // Create a new orderbook
481            let instrument_id =
482                InstrumentId::new(symbol.clone(), rusty_model::venues::Venue::Bithumb);
483            let ob = rusty_model::data::orderbook::OrderBook::<64>::new_empty(instrument_id);
484            let orderbook = SharedSimdOrderBook::from_orderbook(&ob);
485            drop(orderbooks); // Release the read lock before acquiring write lock
486
487            // Store the orderbook
488            self.orderbooks
489                .write()
490                .insert(symbol.clone(), orderbook.clone());
491
492            Ok(orderbook)
493        }
494    }
495
496    async fn get_bar_cache(
497        &self,
498        instrument_id: &InstrumentId,
499        bar_type: &BarType,
500        max_bars: usize,
501    ) -> Result<Arc<RwLock<BarCache>>> {
502        // Create a unique key for this bar cache
503        let key = String::from(format!("{}:{}", instrument_id.symbol, bar_type));
504
505        // Check if we already have this bar cache
506        let caches = self.bar_caches.read();
507        if let Some(cache) = caches.get(&key) {
508            return Ok(cache.clone());
509        }
510        drop(caches); // Release read lock
511
512        // Create a new bar cache
513        let cache = Arc::new(RwLock::new(BarCache::new()));
514        self.bar_caches.write().insert(key, cache.clone());
515
516        Ok(cache)
517    }
518
519    async fn get_stats(&self, instrument_id: &InstrumentId) -> Result<FeedStats> {
520        // Check for depth stats
521        let key = String::from(format!("depth:{}", instrument_id.symbol));
522        if let Some(stats) = self.stats.read().get(&key) {
523            return Ok(stats.clone());
524        }
525
526        // Check for trade stats
527        let key = String::from(format!("trades:{}", instrument_id.symbol));
528        if let Some(stats) = self.stats.read().get(&key) {
529            return Ok(stats.clone());
530        }
531
532        // Return default stats if not found
533        Ok(FeedStats::default())
534    }
535
536    async fn reset_stats(&self) -> Result<()> {
537        self.stats.write().clear();
538        Ok(())
539    }
540}