rusty_feeder/exchange/upbit/
feeder.rs

1/*
2 * Upbit exchange-specific feeder implementation
3 * Processes raw market data and normalizes it to the common format
4 */
5
6use rusty_common::collections::FxHashMap;
7use smartstring::alias::String;
8use std::sync::Arc;
9
10use anyhow::{Result, anyhow};
11use async_trait::async_trait;
12use parking_lot::RwLock;
13use quanta::Clock;
14use rust_decimal::Decimal;
15use rusty_model::{
16    data::{
17        bar::{Bar, BarAggregation, BarCache, BarType},
18        book_snapshot::OrderBookSnapshot,
19        market_trade::MarketTrade,
20        orderbook::PriceLevel,
21        simd_orderbook::{SharedSimdOrderBook, SimdOrderBook},
22    },
23    enums::OrderSide,
24    instruments::InstrumentId,
25};
26use smallvec::SmallVec;
27use tokio::sync::{mpsc, watch};
28
29use super::data::{
30    orderbook::{MAX_ORDERBOOK_LEVELS, OrderbookMessage, ParsedOrderbookData},
31    trade::{TradeMessage, Transaction},
32};
33use crate::feeder::{FeedStats, Feeder, FeederOptions};
34
35/// Upbit exchange feeder implementation
36#[derive(Debug)]
37pub struct UpbitFeeder {
38    /// Stop signals for active feeds
39    stop_signals: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
40
41    /// Shared clock for time synchronization
42    clock: Clock,
43
44    /// Feed statistics
45    stats: Arc<RwLock<FxHashMap<String, FeedStats>>>,
46
47    /// Shared orderbooks for each instrument
48    orderbooks: Arc<RwLock<FxHashMap<String, SharedSimdOrderBook>>>,
49
50    /// Bar caches for each instrument and interval
51    bar_caches: Arc<RwLock<FxHashMap<String, Arc<RwLock<BarCache>>>>>,
52}
53
54impl UpbitFeeder {
55    /// Create a new Upbit feeder
56    #[inline]
57    #[must_use]
58    pub fn new() -> Self {
59        Self {
60            stop_signals: Arc::new(RwLock::new(FxHashMap::default())),
61            clock: Clock::new(),
62            stats: Arc::new(RwLock::new(FxHashMap::default())),
63            orderbooks: Arc::new(RwLock::new(FxHashMap::default())),
64            bar_caches: Arc::new(RwLock::new(FxHashMap::default())),
65        }
66    }
67
68    /// Helper method to create stop signal subscription
69    /// Returns a watch receiver for the stop signal
70    fn create_stop_signal_subscription(&self, key: String) -> Result<watch::Receiver<bool>> {
71        let (stop_tx, _) = watch::channel(false);
72        let stop_rx = {
73            let mut stop_signals = self.stop_signals.write();
74            stop_signals.insert(key.clone(), stop_tx);
75            stop_signals
76                .get(&key)
77                .ok_or_else(|| anyhow!("Failed to get stop signal for key: {}", key))?
78                .subscribe()
79        };
80        Ok(stop_rx)
81    }
82}
83
84impl Default for UpbitFeeder {
85    fn default() -> Self {
86        Self::new()
87    }
88}
89
90#[async_trait]
91impl Feeder for UpbitFeeder {
92    type DepthMessage = OrderbookMessage;
93    type TradeMessage = TradeMessage;
94
95    async fn start_feed_depth(
96        &self,
97        instrument_id: InstrumentId,
98        mut depth_rx: mpsc::Receiver<Self::DepthMessage>,
99        _options: Option<FeederOptions>,
100    ) -> Result<mpsc::Receiver<OrderBookSnapshot>> {
101        // Extract symbol from InstrumentId
102        let symbol = instrument_id.symbol.clone();
103        let (tx, rx) = mpsc::channel(1024);
104
105        // Create a shared orderbook and store it
106        let ob = rusty_model::data::orderbook::OrderBook::<64>::new(
107            symbol.clone(),
108            0,                // exchange_timestamp_ns - will be updated on first message
109            self.clock.raw(), // system_timestamp_ns
110            SmallVec::<[PriceLevel; 64]>::new(), // empty bids
111            SmallVec::<[PriceLevel; 64]>::new(), // empty asks
112        );
113        let shared_orderbook = SharedSimdOrderBook::from_orderbook(&ob);
114        self.orderbooks
115            .write()
116            .insert(symbol.clone(), shared_orderbook.clone());
117
118        // Create stop signal
119        let key = String::from(format!("depth:{symbol}"));
120        let mut stop_rx = self.create_stop_signal_subscription(key)?;
121
122        // Clone clock for use in the task
123        let clock = self.clock.clone();
124
125        // Clone stats for updating feed statistics
126        let stats = self.stats.clone();
127        let stats_key = String::from(format!("depth:{symbol}"));
128
129        // Initialize stats for this feed if not already present
130        stats.write().entry(stats_key.clone()).or_default();
131
132        tokio::spawn(async move {
133            loop {
134                tokio::select! {
135                    // Check for stop signal
136                    _ = stop_rx.changed() => {
137                        break;
138                    }
139
140                    // Process incoming depth updates
141                    Some(update) = depth_rx.recv() => {
142                        // Skip if not for our symbol
143                        if update.code != symbol {
144                            continue;
145                        }
146
147                        // Capture processing start time
148                        let process_start = clock.raw();
149
150                        // Process the orderbook message
151                        let parsed_data = ParsedOrderbookData::from_message(&update);
152
153                        // Convert to OrderBookSnapshot format
154                        // Create PriceLevel objects for bids and asks
155                        let bids_vec: SmallVec<[PriceLevel; 64]> = parsed_data.bids
156                            .iter()
157                            .map(|&(price, size)| PriceLevel::new(price, size))
158                            .collect();
159
160                        let asks_vec: SmallVec<[PriceLevel; 64]> = parsed_data.asks
161                            .iter()
162                            .map(|&(price, size)| PriceLevel::new(price, size))
163                            .collect();
164
165                        // Update the shared orderbook model using fixed-size arrays to avoid heap allocations
166                        let mut bids_arr = [PriceLevel::new(Decimal::ZERO, Decimal::ZERO); MAX_ORDERBOOK_LEVELS];
167                        let mut asks_arr = [PriceLevel::new(Decimal::ZERO, Decimal::ZERO); MAX_ORDERBOOK_LEVELS];
168                        for (i, lvl) in bids_vec.iter().enumerate().take(MAX_ORDERBOOK_LEVELS) {
169                            bids_arr[i] = *lvl;
170                        }
171                        for (i, lvl) in asks_vec.iter().enumerate().take(MAX_ORDERBOOK_LEVELS) {
172                            asks_arr[i] = *lvl;
173                        }
174                        let bids_levels: SmallVec<[PriceLevel; 64]> = parsed_data.bids
175                            .iter()
176                            .map(|&(price, quantity)| PriceLevel::new(price, quantity))
177                            .collect();
178                        let asks_levels: SmallVec<[PriceLevel; 64]> = parsed_data.asks
179                            .iter()
180                            .map(|&(price, quantity)| PriceLevel::new(price, quantity))
181                            .collect();
182
183                        let model_orderbook = rusty_model::data::orderbook::OrderBook::<64>::new(
184                            instrument_id.symbol.as_str(),
185                            parsed_data.timestamp_ns,
186                            clock.raw(),
187                            bids_levels,
188                            asks_levels,
189                        );
190                        shared_orderbook.write(|ob| {
191                            *ob = SimdOrderBook::from_orderbook(&model_orderbook);
192                        });
193
194                        // Create OrderBookSnapshot from the current state
195                        let mut depth = OrderBookSnapshot::new_empty(
196                            instrument_id.clone(),
197                            process_start,
198                            parsed_data.sequence,
199                        );
200                        for &(price, qty) in &parsed_data.bids {
201                            depth.add_bid(price, qty);
202                        }
203                        for &(price, qty) in &parsed_data.asks {
204                            depth.add_ask(price, qty);
205                        }
206
207                        // Calculate processing latency
208                        let process_end = clock.raw();
209                        let latency_ns = process_end.saturating_sub(process_start);
210
211                        // Update statistics
212                        if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
213                            feed_stats.messages_processed += 1;
214                            feed_stats.avg_process_latency_ns =
215                                (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; // Moving average
216                            feed_stats.max_process_latency_ns =
217                                feed_stats.max_process_latency_ns.max(latency_ns);
218                            feed_stats.last_update_time = process_end;
219                        }
220
221                        // Send the order book depth
222                        if tx.send(depth).await.is_err() {
223                            // Update dropped message count on error
224                            if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
225                                feed_stats.dropped_messages += 1;
226                            }
227                            break;
228                        }
229                    }
230                }
231            }
232        });
233
234        Ok(rx)
235    }
236
237    async fn stop_feed_depth(&self, instrument_id: &InstrumentId) -> Result<()> {
238        // Extract symbol from InstrumentId
239        let symbol = &instrument_id.symbol;
240        let key = String::from(format!("depth:{symbol}"));
241
242        // Remove the orderbook
243        self.orderbooks.write().remove(symbol);
244
245        // Send stop signal
246        if let Some(tx) = self.stop_signals.write().remove(&key) {
247            let _ = tx.send(true);
248        }
249
250        Ok(())
251    }
252
253    async fn start_feed_trades(
254        &self,
255        instrument_id: InstrumentId,
256        mut trade_rx: mpsc::Receiver<Self::TradeMessage>,
257        _options: Option<FeederOptions>,
258    ) -> Result<mpsc::Receiver<MarketTrade>> {
259        // Extract symbol from InstrumentId
260        let symbol = instrument_id.symbol.clone();
261        let (tx, rx) = mpsc::channel(1024);
262
263        // Create stop signal
264        let key = String::from(format!("trades:{symbol}"));
265        let mut stop_rx = self.create_stop_signal_subscription(key)?;
266
267        // Clone clock for use in the task
268        let clock = self.clock.clone();
269
270        // Clone stats for updating feed statistics
271        let stats = self.stats.clone();
272        let stats_key = String::from(format!("trades:{symbol}"));
273
274        // Initialize stats for this feed if not already present
275        stats.write().entry(stats_key.clone()).or_default();
276
277        tokio::spawn(async move {
278            loop {
279                tokio::select! {
280                    // Check for stop signal
281                    _ = stop_rx.changed() => {
282                        break;
283                    }
284
285                    // Process incoming trade messages
286                    Some(trade_msg) = trade_rx.recv() => {
287                        // Skip if not for our symbol
288                        if trade_msg.code != symbol {
289                            continue;
290                        }
291
292                        // Capture processing start time
293                        let process_start = clock.raw();
294
295                        // Convert to our transaction type first
296                        let transaction = Transaction::from_trade_message(&trade_msg, process_start);
297
298                        // Create standardized Trade model
299                        let trade = MarketTrade {
300                            timestamp: clock.now(),
301                            exchange_time_ns: transaction.trade_timestamp_ns,
302                            price: transaction.price,
303                            quantity: transaction.volume,
304                            direction: transaction.side,
305                            instrument_id: instrument_id.clone(),
306                        };
307
308                        // Calculate processing latency
309                        let process_end = clock.raw();
310                        let latency_ns = process_end.saturating_sub(process_start);
311
312                        // Update statistics
313                        if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
314                            feed_stats.messages_processed += 1;
315                            feed_stats.avg_process_latency_ns =
316                                (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; // Moving average
317                            feed_stats.max_process_latency_ns =
318                                feed_stats.max_process_latency_ns.max(latency_ns);
319                            feed_stats.last_update_time = process_end;
320                        }
321
322                        // Send the trade
323                        if tx.send(trade).await.is_err() {
324                            // Update dropped message count on error
325                            if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
326                                feed_stats.dropped_messages += 1;
327                            }
328                            break;
329                        }
330                    }
331                }
332            }
333        });
334
335        Ok(rx)
336    }
337
338    async fn stop_feed_trades(&self, instrument_id: &InstrumentId) -> Result<()> {
339        // Extract symbol from InstrumentId
340        let symbol = &instrument_id.symbol;
341        let key = String::from(format!("trades:{symbol}"));
342        if let Some(tx) = self.stop_signals.write().remove(&key) {
343            let _ = tx.send(true);
344        }
345
346        Ok(())
347    }
348
349    async fn start_feed_bars(
350        &self,
351        instrument_id: InstrumentId,
352        bar_type: BarType,
353        mut trade_rx: mpsc::Receiver<MarketTrade>,
354        _options: Option<FeederOptions>,
355    ) -> Result<mpsc::Receiver<Bar>> {
356        // Extract symbol and interval from parameters
357        let symbol = instrument_id.symbol.clone();
358        let interval_sec = match bar_type.get_spec().aggregation {
359            BarAggregation::Second => bar_type.get_spec().step,
360            BarAggregation::Minute => bar_type.get_spec().step * 60,
361            BarAggregation::Hour => bar_type.get_spec().step * 3600,
362            BarAggregation::Day => bar_type.get_spec().step * 86400,
363            _ => return Err(anyhow!("Only time-based bars are supported")),
364        };
365
366        // Create bar cache key
367        let cache_key = String::from(format!("{symbol}:{bar_type}"));
368
369        // Create a new bar cache or get existing one
370        let bar_cache = {
371            let mut caches = self.bar_caches.write();
372            if !caches.contains_key(&cache_key) {
373                caches.insert(cache_key.clone(), Arc::new(RwLock::new(BarCache::new())));
374            }
375            caches
376                .get(&cache_key)
377                .ok_or_else(|| anyhow!("Cache not found for key: {}", cache_key))?
378                .clone()
379        };
380
381        let (tx, rx) = mpsc::channel(1024);
382
383        // Create stop signal
384        let key = String::from(format!("bars:{symbol}:{interval_sec}"));
385        let mut stop_rx = self.create_stop_signal_subscription(key)?;
386
387        // Clone clock for use in the task
388        let clock = self.clock.clone();
389
390        // Clone stats for updating feed statistics
391        let stats = self.stats.clone();
392        let stats_key = String::from(format!("bars:{symbol}:{interval_sec}"));
393
394        // Initialize stats for this feed if not already present
395        stats.write().entry(stats_key.clone()).or_default();
396
397        // Bar aggregation state
398        let mut open = None;
399        let mut high = None;
400        let mut low = None;
401        let mut close = None;
402        let mut volume = Decimal::ZERO;
403        let mut buy_volume = Decimal::ZERO;
404        let mut sell_volume = Decimal::ZERO;
405        let mut trade_count = 0;
406        let mut last_bar_time = 0u64;
407
408        tokio::spawn(async move {
409            loop {
410                tokio::select! {
411                    // Check for stop signal
412                    _ = stop_rx.changed() => {
413                        break;
414                    }
415
416                    // Process incoming trades
417                    Some(trade) = trade_rx.recv() => {
418                        // Capture processing start time
419                        let process_start = clock.raw();
420
421                        let now = clock.raw();
422                        let current_interval = now / 1_000_000_000 / interval_sec;
423
424                        // Check if we need to start a new bar
425                        if last_bar_time == 0 {
426                            // First trade
427                            open = Some(trade.price);
428                            high = Some(trade.price);
429                            low = Some(trade.price);
430                            close = Some(trade.price);
431                            volume = trade.quantity;
432
433                            // Update buy/sell volumes
434                            match trade.direction {
435                                OrderSide::Buy => buy_volume = trade.quantity,
436                                OrderSide::Sell => sell_volume = trade.quantity,
437                            }
438
439                            trade_count = 1;
440                            last_bar_time = current_interval;
441                        } else if current_interval > last_bar_time {
442                            // New bar interval - send the current bar and start a new one
443                            if let (Some(o), Some(h), Some(l), Some(c)) = (open, high, low, close) {
444                                let bar = Bar {
445                                    bar_type: bar_type.clone(),
446                                    open: o,
447                                    high: h,
448                                    low: l,
449                                    close: c,
450                                    volume,
451                                    timestamp_ns: clock.raw(),
452                                };
453
454                                // Add to the bar cache
455                                bar_cache.write().add_bar(bar.clone());
456
457                                // Send the bar
458                                if tx.send(bar).await.is_err() {
459                                    // Update dropped message count on error
460                                    if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
461                                        feed_stats.dropped_messages += 1;
462                                    }
463                                    break;
464                                }
465                            }
466
467                            // Start a new bar
468                            open = Some(trade.price);
469                            high = Some(trade.price);
470                            low = Some(trade.price);
471                            close = Some(trade.price);
472                            volume = trade.quantity;
473
474                            // Reset buy/sell volumes
475                            buy_volume = Decimal::ZERO;
476                            sell_volume = Decimal::ZERO;
477
478                            // Update buy/sell volumes
479                            match trade.direction {
480                                OrderSide::Buy => buy_volume = trade.quantity,
481                                OrderSide::Sell => sell_volume = trade.quantity,
482                            }
483
484                            trade_count = 1;
485                            last_bar_time = current_interval;
486                        } else {
487                            // Update the current bar
488                            high = Some(std::cmp::max(high.unwrap_or(trade.price), trade.price));
489                            low = Some(std::cmp::min(low.unwrap_or(trade.price), trade.price));
490                            close = Some(trade.price);
491                            volume += trade.quantity;
492
493                            // Update buy/sell volumes
494                            match trade.direction {
495                                OrderSide::Buy => buy_volume += trade.quantity,
496                                OrderSide::Sell => sell_volume += trade.quantity,
497                            }
498
499                            trade_count += 1;
500                        }
501
502                        // Calculate processing latency
503                        let process_end = clock.raw();
504                        let latency_ns = process_end.saturating_sub(process_start);
505
506                        // Update statistics
507                        if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
508                            feed_stats.messages_processed += 1;
509                            feed_stats.avg_process_latency_ns =
510                                (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; // Moving average
511                            feed_stats.max_process_latency_ns =
512                                feed_stats.max_process_latency_ns.max(latency_ns);
513                            feed_stats.last_update_time = process_end;
514                        }
515                    }
516                }
517            }
518        });
519
520        Ok(rx)
521    }
522
523    async fn stop_feed_bars(&self, instrument_id: &InstrumentId, bar_type: &BarType) -> Result<()> {
524        // Extract symbol and interval from parameters
525        let symbol = &instrument_id.symbol;
526        let interval_sec = match bar_type.get_spec().aggregation {
527            BarAggregation::Second => bar_type.get_spec().step,
528            BarAggregation::Minute => bar_type.get_spec().step * 60,
529            BarAggregation::Hour => bar_type.get_spec().step * 3600,
530            BarAggregation::Day => bar_type.get_spec().step * 86400,
531            _ => return Err(anyhow!("Only time-based bars are supported")),
532        };
533
534        let key = String::from(format!("bars:{symbol}:{interval_sec}"));
535        if let Some(tx) = self.stop_signals.write().remove(&key) {
536            let _ = tx.send(true);
537        }
538
539        Ok(())
540    }
541
542    async fn get_shared_orderbook(
543        &self,
544        instrument_id: &InstrumentId,
545    ) -> Result<SharedSimdOrderBook> {
546        // Extract symbol from InstrumentId
547        let symbol = &instrument_id.symbol;
548
549        // Get existing orderbook or create a new one
550        let orderbooks = self.orderbooks.read();
551        if let Some(orderbook) = orderbooks.get(symbol) {
552            Ok(orderbook.clone())
553        } else {
554            // Create a new orderbook
555            let ob =
556                rusty_model::data::orderbook::OrderBook::<64>::new_empty(instrument_id.clone());
557            let shared_orderbook = SharedSimdOrderBook::from_orderbook(&ob);
558            drop(orderbooks); // Release the read lock before acquiring write lock
559
560            // Store the orderbook
561            self.orderbooks
562                .write()
563                .insert(symbol.clone(), shared_orderbook.clone());
564
565            Ok(shared_orderbook)
566        }
567    }
568
569    async fn get_bar_cache(
570        &self,
571        instrument_id: &InstrumentId,
572        bar_type: &BarType,
573        max_bars: usize,
574    ) -> Result<Arc<RwLock<BarCache>>> {
575        // Create a unique key for this bar cache
576        let key = String::from(format!("{}:{}", instrument_id.symbol, bar_type));
577
578        // Check if we already have this bar cache
579        let caches = self.bar_caches.read();
580        if let Some(cache) = caches.get(&key) {
581            return Ok(cache.clone());
582        }
583        drop(caches); // Release read lock
584
585        // Create a new bar cache
586        let cache = Arc::new(RwLock::new(BarCache::new()));
587        self.bar_caches.write().insert(key, cache.clone());
588
589        Ok(cache)
590    }
591
592    async fn get_stats(&self, instrument_id: &InstrumentId) -> Result<FeedStats> {
593        // Check for depth stats
594        let key = String::from(format!("depth:{}", instrument_id.symbol));
595        if let Some(stats) = self.stats.read().get(&key) {
596            return Ok(stats.clone());
597        }
598
599        // Check for trade stats
600        let key = String::from(format!("trades:{}", instrument_id.symbol));
601        if let Some(stats) = self.stats.read().get(&key) {
602            return Ok(stats.clone());
603        }
604
605        // Return default stats if not found
606        Ok(FeedStats::default())
607    }
608
609    async fn reset_stats(&self) -> Result<()> {
610        self.stats.write().clear();
611        Ok(())
612    }
613}