rusty_feeder/exchange/binance/futures/
feeder.rs

1//! Binance Futures exchange-specific feeder implementation
2//!
3//! Processes raw market data and normalizes it to the common format
4
5use rusty_common::collections::FxHashMap;
6use smartstring::alias::String;
7use std::sync::Arc;
8
9use anyhow::{Result, anyhow};
10use async_trait::async_trait;
11use parking_lot::RwLock;
12use quanta::Clock;
13use rusty_model::{
14    PriceLevel,
15    data::{
16        bar::{Bar, BarAggregation, BarCache, BarType},
17        book_snapshot::OrderBookSnapshot,
18        market_trade::MarketTrade,
19        orderbook::OrderBook,
20        simd_orderbook::{SharedSimdOrderBook, SimdOrderBook},
21    },
22    enums::OrderSide,
23    instruments::InstrumentId,
24};
25use smallvec::SmallVec;
26use tokio::sync::{mpsc, watch};
27
28use super::data::{
29    orderbook::{OrderbookMessage, ParsedOrderbookData},
30    trade::{AggTradeMessage, ParsedAggTradeData},
31};
32use crate::exchange::binance::common::bar_aggregator::BarAggregator;
33use crate::feeder::{FeedStats, Feeder, FeederOptions};
34
35/// Binance Futures exchange feeder implementation
36#[derive(Debug)]
37pub struct BinanceFuturesFeeder {
38    /// Stop signals for active feeds
39    stop_signals: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
40
41    /// Shared clock for time synchronization
42    clock: Clock,
43
44    /// Feed statistics
45    stats: Arc<RwLock<FxHashMap<String, FeedStats>>>,
46
47    /// Shared orderbooks for each instrument
48    orderbooks: Arc<RwLock<FxHashMap<String, SharedSimdOrderBook>>>,
49
50    /// Bar caches for each instrument and interval
51    bar_caches: Arc<RwLock<FxHashMap<String, Arc<RwLock<BarCache>>>>>,
52}
53
54impl BinanceFuturesFeeder {
55    /// Create a new Binance Futures feeder
56    #[inline]
57    #[must_use]
58    pub fn new() -> Self {
59        Self {
60            stop_signals: Arc::new(RwLock::new(FxHashMap::default())),
61            clock: Clock::new(),
62            stats: Arc::new(RwLock::new(FxHashMap::default())),
63            orderbooks: Arc::new(RwLock::new(FxHashMap::default())),
64            bar_caches: Arc::new(RwLock::new(FxHashMap::default())),
65        }
66    }
67}
68
69impl Default for BinanceFuturesFeeder {
70    fn default() -> Self {
71        Self::new()
72    }
73}
74
75#[async_trait]
76impl Feeder for BinanceFuturesFeeder {
77    type DepthMessage = OrderbookMessage;
78    type TradeMessage = AggTradeMessage;
79
80    async fn start_feed_depth(
81        &self,
82        instrument_id: InstrumentId,
83        mut depth_rx: mpsc::Receiver<Self::DepthMessage>,
84        options: Option<FeederOptions>,
85    ) -> Result<mpsc::Receiver<OrderBookSnapshot>> {
86        // Extract symbol from InstrumentId
87        let symbol = instrument_id.symbol.clone();
88        let (tx, rx) = mpsc::channel(
89            options
90                .as_ref()
91                .map(|opt| opt.channel_buffer_size)
92                .unwrap_or(1024),
93        );
94
95        // Create a shared orderbook and store it
96        let ob = OrderBook::<64>::new(
97            symbol.clone(),
98            0,                // exchange_timestamp_ns - will be updated on first message
99            self.clock.raw(), // system_timestamp_ns
100            SmallVec::<[PriceLevel; 64]>::new(), // empty bids
101            SmallVec::<[PriceLevel; 64]>::new(), // empty asks
102        );
103        let shared_orderbook = SharedSimdOrderBook::from_orderbook(&ob);
104        self.orderbooks
105            .write()
106            .insert(symbol.clone(), shared_orderbook.clone());
107
108        // Create stop signal
109        let (stop_tx, _) = watch::channel(false);
110        let key = String::from(format!("depth:{symbol}"));
111        self.stop_signals.write().insert(key.clone(), stop_tx);
112        let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
113
114        // Clone clock for use in the task
115        let clock = self.clock.clone();
116
117        // Clone stats for updating feed statistics
118        let stats = self.stats.clone();
119        let stats_key = String::from(format!("depth:{symbol}"));
120
121        // Initialize stats for this feed if not already present
122        stats.write().entry(stats_key.clone()).or_default();
123
124        tokio::spawn(async move {
125            loop {
126                tokio::select! {
127                    // Check for stop signal
128                    _ = stop_rx.changed() => {
129                        break;
130                    }
131
132                    // Process incoming depth updates
133                    Some(update) = depth_rx.recv() => {
134                        // Skip if not for our symbol
135                        if update.symbol != symbol {
136                            continue;
137                        }
138
139                        // Capture processing start time
140                        let process_start = clock.raw();
141
142                        // Process the orderbook message
143                        let parsed_data = ParsedOrderbookData::from(update);
144
145                        // Create PriceLevel objects for bids and asks
146                        let _bids_vec: SmallVec<[PriceLevel; 64]> = parsed_data.bids
147                            .iter()
148                            .map(|&(price, quantity)| PriceLevel::new(price, quantity))
149                            .collect();
150
151                        let _asks_vec: SmallVec<[PriceLevel; 64]> = parsed_data.asks
152                            .iter()
153                            .map(|&(price, quantity)| PriceLevel::new(price, quantity))
154                            .collect();
155
156                        // Update the shared orderbook model
157                        let bids_vec: SmallVec<[PriceLevel; 64]> = parsed_data.bids
158                            .iter()
159                            .map(|&(price, quantity)| PriceLevel::new(price, quantity))
160                            .collect();
161                        let asks_vec: SmallVec<[PriceLevel; 64]> = parsed_data.asks
162                            .iter()
163                            .map(|&(price, quantity)| PriceLevel::new(price, quantity))
164                            .collect();
165
166                        let model_orderbook = OrderBook::<64>::new(
167                            instrument_id.symbol.clone(),
168                            parsed_data.final_update_id * 1_000_000, // Convert to nanoseconds
169                            process_start,
170                            bids_vec,
171                            asks_vec,
172                        );
173                        shared_orderbook.write(|ob| {
174                            *ob = SimdOrderBook::from_orderbook(&model_orderbook);
175                        });
176
177                        // Create OrderBookSnapshot from the current state
178                        let mut depth = OrderBookSnapshot::new_empty(
179                            instrument_id.clone(),
180                            process_start,
181                            parsed_data.final_update_id,
182                        );
183
184                        // Add bids and asks
185                        for (price, quantity) in parsed_data.bids {
186                            depth.add_bid(price, quantity);
187                        }
188
189                        for (price, quantity) in parsed_data.asks {
190                            depth.add_ask(price, quantity);
191                        }
192
193                        // Calculate processing latency
194                        let process_end = clock.raw();
195                        let latency_ns = process_end.saturating_sub(process_start);
196
197                        // Update statistics
198                        if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
199                            feed_stats.messages_processed += 1;
200                            feed_stats.avg_process_latency_ns =
201                                (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; // Moving average
202                            feed_stats.max_process_latency_ns =
203                                feed_stats.max_process_latency_ns.max(latency_ns);
204                            feed_stats.last_update_time = process_end;
205                        }
206
207                        // Send the order book depth
208                        if tx.send(depth).await.is_err() {
209                            // Update dropped message count on error
210                            if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
211                                feed_stats.dropped_messages += 1;
212                            }
213                            break;
214                        }
215                    }
216                }
217            }
218        });
219
220        Ok(rx)
221    }
222
223    async fn stop_feed_depth(&self, instrument_id: &InstrumentId) -> Result<()> {
224        // Extract symbol from InstrumentId
225        let symbol = &instrument_id.symbol;
226        let key = String::from(format!("depth:{symbol}"));
227
228        // Remove the orderbook
229        self.orderbooks.write().remove(symbol);
230
231        // Send stop signal
232        if let Some(tx) = self.stop_signals.write().remove(&key) {
233            let _ = tx.send(true);
234        }
235
236        Ok(())
237    }
238
239    async fn start_feed_trades(
240        &self,
241        instrument_id: InstrumentId,
242        mut trade_rx: mpsc::Receiver<Self::TradeMessage>,
243        options: Option<FeederOptions>,
244    ) -> Result<mpsc::Receiver<MarketTrade>> {
245        // Extract symbol from InstrumentId
246        let symbol = instrument_id.symbol.clone();
247        let (tx, rx) = mpsc::channel(
248            options
249                .as_ref()
250                .map(|opt| opt.channel_buffer_size)
251                .unwrap_or(1024),
252        );
253
254        // Create stop signal
255        let (stop_tx, _) = watch::channel(false);
256        let key = String::from(format!("trades:{symbol}"));
257        self.stop_signals.write().insert(key.clone(), stop_tx);
258        let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
259
260        // Clone clock for use in the task
261        let clock = self.clock.clone();
262
263        // Clone stats for updating feed statistics
264        let stats = self.stats.clone();
265        let stats_key = String::from(format!("trades:{symbol}"));
266
267        // Initialize stats for this feed if not already present
268        stats.write().entry(stats_key.clone()).or_default();
269
270        tokio::spawn(async move {
271            loop {
272                tokio::select! {
273                    // Check for stop signal
274                    _ = stop_rx.changed() => {
275                        break;
276                    }
277
278                    // Process incoming trade messages
279                    Some(trade_msg) = trade_rx.recv() => {
280                        // Skip if not for our symbol
281                        if trade_msg.symbol != symbol {
282                            continue;
283                        }
284
285                        // Capture processing timestamp Instant and processing start (ns)
286                        let timestamp = clock.now();
287                        let process_start = clock.raw();
288
289                        // Convert to ParsedAggTradeData
290                        let parsed_trade = ParsedAggTradeData::from(trade_msg);
291
292                        // Determine order side
293                        let direction = if parsed_trade.is_buyer_market_maker {
294                            OrderSide::Sell
295                        } else {
296                            OrderSide::Buy
297                        };
298
299                        // Create standardized Trade model
300                        let trade = MarketTrade {
301                            timestamp,
302                            exchange_time_ns: parsed_trade.trade_time * 1_000_000, // Convert ms to ns
303                            price: parsed_trade.price,
304                            quantity: parsed_trade.quantity,
305                            direction,
306                            instrument_id: instrument_id.clone(),
307                        };
308
309                        // Calculate processing latency
310                        let process_end = clock.raw();
311                        let latency_ns = process_end.saturating_sub(process_start);
312
313                        // Update statistics
314                        if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
315                            feed_stats.messages_processed += 1;
316                            feed_stats.avg_process_latency_ns =
317                                (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; // Moving average
318                            feed_stats.max_process_latency_ns =
319                                feed_stats.max_process_latency_ns.max(latency_ns);
320                            feed_stats.last_update_time = process_end;
321                        }
322
323                        // Send the trade
324                        if tx.send(trade).await.is_err() {
325                            // Update dropped message count on error
326                            if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
327                                feed_stats.dropped_messages += 1;
328                            }
329                            break;
330                        }
331                    }
332                }
333            }
334        });
335
336        Ok(rx)
337    }
338
339    async fn stop_feed_trades(&self, instrument_id: &InstrumentId) -> Result<()> {
340        // Extract symbol from InstrumentId
341        let symbol = &instrument_id.symbol;
342        let key = String::from(format!("trades:{symbol}"));
343
344        if let Some(tx) = self.stop_signals.write().remove(&key) {
345            let _ = tx.send(true);
346        }
347
348        Ok(())
349    }
350
351    async fn start_feed_bars(
352        &self,
353        instrument_id: InstrumentId,
354        bar_type: BarType,
355        mut trade_rx: mpsc::Receiver<MarketTrade>,
356        options: Option<FeederOptions>,
357    ) -> Result<mpsc::Receiver<Bar>> {
358        // 심볼 및 캐시 키 추출
359        let symbol = instrument_id.symbol.clone();
360        let cache_key = String::from(format!("{symbol}:{bar_type}"));
361
362        // 바 캐시 생성 또는 가져오기
363        let bar_cache = {
364            let mut caches = self.bar_caches.write();
365            if !caches.contains_key(&cache_key) {
366                caches.insert(cache_key.clone(), Arc::new(RwLock::new(BarCache::new())));
367            }
368            caches.get(&cache_key).unwrap().clone()
369        };
370
371        // 채널 생성
372        let buffer_size = options
373            .as_ref()
374            .map(|opt| opt.channel_buffer_size)
375            .unwrap_or(1024);
376        let (tx, rx) = mpsc::channel(buffer_size);
377
378        // 스탑 시그널 생성
379        let interval_sec = match bar_type.get_spec().aggregation {
380            BarAggregation::Second => bar_type.get_spec().step,
381            BarAggregation::Minute => bar_type.get_spec().step * 60,
382            BarAggregation::Hour => bar_type.get_spec().step * 3600,
383            BarAggregation::Day => bar_type.get_spec().step * 86400,
384            _ => return Err(anyhow!("Only time-based bars are supported")),
385        };
386        let (stop_tx, _) = watch::channel(false);
387        let key = String::from(format!("bars:{symbol}:{interval_sec}"));
388        self.stop_signals.write().insert(key.clone(), stop_tx);
389        let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
390
391        // 클럭 및 통계 복제
392        let clock = self.clock.clone();
393        let stats = self.stats.clone();
394        let stats_key = String::from(format!("bars:{symbol}:{interval_sec}"));
395
396        // 통계 초기화
397        stats.write().entry(stats_key.clone()).or_default();
398
399        // 바 어그리게이터 생성
400        let mut bar_aggregator = BarAggregator::new(
401            instrument_id.clone(),
402            bar_type.clone(),
403            bar_cache.clone(),
404            clock.clone(),
405        )?;
406
407        // 처리 태스크 생성
408        tokio::spawn(async move {
409            loop {
410                tokio::select! {
411                    // 스탑 시그널 확인
412                    _ = stop_rx.changed() => {
413                        break;
414                    }
415
416                    // 거래 처리
417                    Some(trade) = trade_rx.recv() => {
418                        // 처리 시작 시간
419                        let process_start = clock.raw();
420
421                        // 거래 처리 및 바 생성
422                        if let Some(bar) = bar_aggregator.process_trade(&trade) {
423                            // 바 캐시에 추가
424                            bar_cache.write().add_bar(bar.clone());
425
426                            // 바 전송
427                            if tx.send(bar).await.is_err() {
428                                // 오류 시 통계 업데이트
429                                if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
430                                    feed_stats.dropped_messages += 1;
431                                }
432                                break;
433                            }
434                        }
435
436                        // 처리 지연 시간 계산
437                        let process_end = clock.raw();
438                        let latency_ns = process_end.saturating_sub(process_start);
439
440                        // 통계 업데이트
441                        if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
442                            feed_stats.messages_processed += 1;
443                            feed_stats.avg_process_latency_ns =
444                                (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; // 이동 평균
445                            feed_stats.max_process_latency_ns =
446                                feed_stats.max_process_latency_ns.max(latency_ns);
447                            feed_stats.last_update_time = process_end;
448                        }
449                    }
450                }
451            }
452        });
453
454        Ok(rx)
455    }
456
457    async fn stop_feed_bars(&self, instrument_id: &InstrumentId, bar_type: &BarType) -> Result<()> {
458        // Extract symbol and interval from parameters
459        let symbol = &instrument_id.symbol;
460        let interval_sec = match bar_type.get_spec().aggregation {
461            BarAggregation::Second => bar_type.get_spec().step,
462            BarAggregation::Minute => bar_type.get_spec().step * 60,
463            BarAggregation::Hour => bar_type.get_spec().step * 3600,
464            BarAggregation::Day => bar_type.get_spec().step * 86400,
465            _ => return Err(anyhow!("Only time-based bars are supported")),
466        };
467
468        let key = String::from(format!("bars:{symbol}:{interval_sec}"));
469        if let Some(tx) = self.stop_signals.write().remove(&key) {
470            let _ = tx.send(true);
471        }
472
473        Ok(())
474    }
475
476    async fn get_shared_orderbook(
477        &self,
478        instrument_id: &InstrumentId,
479    ) -> Result<SharedSimdOrderBook> {
480        // Extract symbol from InstrumentId
481        let symbol = &instrument_id.symbol;
482
483        // Get existing orderbook or create a new one
484        let orderbooks = self.orderbooks.read();
485        if let Some(orderbook) = orderbooks.get(symbol) {
486            Ok(orderbook.clone())
487        } else {
488            // Create a new orderbook
489            let ob = OrderBook::<64>::new(
490                symbol.clone(),
491                0,                // exchange_timestamp_ns - will be updated on first message
492                self.clock.raw(), // system_timestamp_ns
493                SmallVec::<[PriceLevel; 64]>::new(), // empty bids
494                SmallVec::<[PriceLevel; 64]>::new(), // empty asks
495            );
496            let shared_orderbook = SharedSimdOrderBook::from_orderbook(&ob);
497            drop(orderbooks);
498
499            // Store the orderbook
500            self.orderbooks
501                .write()
502                .insert(symbol.clone(), shared_orderbook.clone());
503
504            Ok(shared_orderbook)
505        }
506    }
507
508    async fn get_bar_cache(
509        &self,
510        instrument_id: &InstrumentId,
511        bar_type: &BarType,
512        max_bars: usize,
513    ) -> Result<Arc<RwLock<BarCache>>> {
514        // Create a unique key for this bar cache
515        let key = String::from(format!("{}:{}", instrument_id.symbol, bar_type));
516
517        // Check if we already have this bar cache
518        let caches = self.bar_caches.read();
519        if let Some(cache) = caches.get(&key) {
520            return Ok(cache.clone());
521        }
522        drop(caches); // Release read lock
523
524        // Create a new bar cache
525        let cache = Arc::new(RwLock::new(BarCache::new()));
526        self.bar_caches.write().insert(key, cache.clone());
527
528        Ok(cache)
529    }
530
531    async fn get_stats(&self, instrument_id: &InstrumentId) -> Result<FeedStats> {
532        // Check for depth stats
533        let key = String::from(format!("depth:{}", instrument_id.symbol));
534        if let Some(stats) = self.stats.read().get(&key) {
535            return Ok(stats.clone());
536        }
537
538        // Check for trade stats
539        let key = String::from(format!("trades:{}", instrument_id.symbol));
540        if let Some(stats) = self.stats.read().get(&key) {
541            return Ok(stats.clone());
542        }
543
544        // Return default stats if not found
545        Ok(FeedStats::default())
546    }
547
548    async fn reset_stats(&self) -> Result<()> {
549        self.stats.write().clear();
550        Ok(())
551    }
552}