rusty_feeder/exchange/binance/spot/
feeder.rs

1//! Binance Spot exchange-specific feeder implementation
2//!
3//! Processes raw market data and normalizes it to the common format
4
5use rusty_common::collections::FxHashMap;
6use smartstring::alias::String;
7use std::sync::Arc;
8
9use anyhow::{Result, anyhow};
10use async_trait::async_trait;
11use parking_lot::RwLock;
12use quanta::Clock;
13use rusty_model::{
14    data::{
15        bar::{Bar, BarAggregation, BarCache, BarType},
16        book_snapshot::OrderBookSnapshot,
17        market_trade::MarketTrade,
18        orderbook::PriceLevel,
19        simd_orderbook::{SharedSimdOrderBook, SimdOrderBook},
20    },
21    enums::OrderSide,
22    instruments::InstrumentId,
23};
24use smallvec::SmallVec;
25use tokio::sync::{mpsc, watch};
26
27use super::data::{
28    orderbook::{OrderbookMessage, ParsedOrderbookData},
29    trade::{ParsedTradeData, TradeMessage},
30};
31use crate::exchange::binance::common::bar_aggregator::BarAggregator;
32use crate::feeder::{FeedStats, Feeder, FeederOptions};
33
34/// Binance Spot exchange feeder implementation
35#[derive(Debug)]
36pub struct BinanceSpotFeeder {
37    /// Stop signals for active feeds
38    stop_signals: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
39
40    /// Shared clock for time synchronization
41    clock: Clock,
42
43    /// Feed statistics
44    stats: Arc<RwLock<FxHashMap<String, FeedStats>>>,
45
46    /// Shared orderbooks for each instrument
47    orderbooks: Arc<RwLock<FxHashMap<String, SharedSimdOrderBook>>>,
48
49    /// Bar caches for each instrument and interval
50    bar_caches: Arc<RwLock<FxHashMap<String, Arc<RwLock<BarCache>>>>>,
51}
52
53impl BinanceSpotFeeder {
54    /// Create a new Binance Spot feeder
55    #[inline]
56    #[must_use]
57    pub fn new() -> Self {
58        Self {
59            stop_signals: Arc::new(RwLock::new(FxHashMap::default())),
60            clock: Clock::new(),
61            stats: Arc::new(RwLock::new(FxHashMap::default())),
62            orderbooks: Arc::new(RwLock::new(FxHashMap::default())),
63            bar_caches: Arc::new(RwLock::new(FxHashMap::default())),
64        }
65    }
66}
67
68impl Default for BinanceSpotFeeder {
69    fn default() -> Self {
70        Self::new()
71    }
72}
73
74#[async_trait]
75impl Feeder for BinanceSpotFeeder {
76    type DepthMessage = OrderbookMessage;
77    type TradeMessage = TradeMessage;
78
79    async fn start_feed_depth(
80        &self,
81        instrument_id: InstrumentId,
82        mut depth_rx: mpsc::Receiver<Self::DepthMessage>,
83        options: Option<FeederOptions>,
84    ) -> Result<mpsc::Receiver<OrderBookSnapshot>> {
85        // Extract symbol from InstrumentId
86        let symbol = instrument_id.symbol.clone();
87        let (tx, rx) = mpsc::channel(
88            options
89                .as_ref()
90                .map(|opt| opt.channel_buffer_size)
91                .unwrap_or(1024),
92        );
93
94        // Create a shared orderbook and store it
95        let ob = rusty_model::data::orderbook::OrderBook::<64>::new(
96            symbol.clone(),
97            0,                // exchange_timestamp_ns - will be updated on first message
98            self.clock.raw(), // system_timestamp_ns
99            SmallVec::<[PriceLevel; 64]>::new(), // empty bids
100            SmallVec::<[PriceLevel; 64]>::new(), // empty asks
101        );
102        let shared_orderbook = SharedSimdOrderBook::from_orderbook(&ob);
103        self.orderbooks
104            .write()
105            .insert(symbol.clone(), shared_orderbook.clone());
106
107        // Create stop signal
108        let (stop_tx, _) = watch::channel(false);
109        let key = String::from(format!("depth:{symbol}"));
110        self.stop_signals.write().insert(key.clone(), stop_tx);
111        let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
112
113        // Clone clock for use in the task
114        let clock = self.clock.clone();
115
116        // Clone stats for updating feed statistics
117        let stats = self.stats.clone();
118        let stats_key = String::from(format!("depth:{symbol}"));
119
120        // Initialize stats for this feed if not already present
121        stats.write().entry(stats_key.clone()).or_default();
122
123        tokio::spawn(async move {
124            loop {
125                tokio::select! {
126                    // Check for stop signal
127                    _ = stop_rx.changed() => {
128                        break;
129                    }
130
131                    // Process incoming depth updates
132                    Some(update) = depth_rx.recv() => {
133                        // Skip if not for our symbol
134                        if update.symbol != symbol {
135                            continue;
136                        }
137
138                        // Capture processing start time
139                        let process_start = clock.raw();
140
141                        // Process the orderbook message
142                        let parsed_data = ParsedOrderbookData::from(update);
143
144                        // Create PriceLevel objects for bids and asks
145                        let _bids_vec: SmallVec<[PriceLevel; 64]> = parsed_data.bids
146                            .iter()
147                            .map(|&(price, size)| PriceLevel::new(price, size))
148                            .collect();
149
150                        let _asks_vec: SmallVec<[PriceLevel; 64]> = parsed_data.asks
151                            .iter()
152                            .map(|&(price, size)| PriceLevel::new(price, size))
153                            .collect();
154
155                        // Update the shared orderbook model
156                        let bids_vec: SmallVec<[PriceLevel; 64]> = parsed_data.bids
157                            .iter()
158                            .map(|&(price, quantity)| PriceLevel::new(price, quantity))
159                            .collect();
160                        let asks_vec: SmallVec<[PriceLevel; 64]> = parsed_data.asks
161                            .iter()
162                            .map(|&(price, quantity)| PriceLevel::new(price, quantity))
163                            .collect();
164
165                        let model_orderbook = rusty_model::data::orderbook::OrderBook::<64>::new(
166                            symbol.clone(),
167                            parsed_data.final_update_id * 1_000_000, // Convert to nanoseconds
168                            process_start,
169                            bids_vec,
170                            asks_vec,
171                        );
172                        shared_orderbook.write(|ob| {
173                            *ob = SimdOrderBook::from_orderbook(&model_orderbook);
174                        });
175
176                        // Create OrderBookSnapshot from the current state
177                        let mut depth = OrderBookSnapshot::new_empty(
178                            instrument_id.clone(),
179                            process_start,
180                            parsed_data.final_update_id,
181                        );
182
183                        // Add bids and asks
184                        for (price, quantity) in parsed_data.bids {
185                            depth.add_bid(price, quantity);
186                        }
187
188                        for (price, quantity) in parsed_data.asks {
189                            depth.add_ask(price, quantity);
190                        }
191
192                        // Calculate processing latency
193                        let process_end = clock.raw();
194                        let latency_ns = process_end.saturating_sub(process_start);
195
196                        // Update statistics
197                        if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
198                            feed_stats.messages_processed += 1;
199                            feed_stats.avg_process_latency_ns =
200                                (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; // Moving average
201                            feed_stats.max_process_latency_ns =
202                                feed_stats.max_process_latency_ns.max(latency_ns);
203                            feed_stats.last_update_time = process_end;
204                        }
205
206                        // Send the order book depth
207                        if tx.send(depth).await.is_err() {
208                            // Update dropped message count on error
209                            if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
210                                feed_stats.dropped_messages += 1;
211                            }
212                            break;
213                        }
214                    }
215                }
216            }
217        });
218
219        Ok(rx)
220    }
221
222    async fn stop_feed_depth(&self, instrument_id: &InstrumentId) -> Result<()> {
223        // Extract symbol from InstrumentId
224        let symbol = &instrument_id.symbol;
225        let key = String::from(format!("depth:{symbol}"));
226
227        // Remove the orderbook
228        self.orderbooks.write().remove(symbol);
229
230        // Send stop signal
231        if let Some(tx) = self.stop_signals.write().remove(&key) {
232            let _ = tx.send(true);
233        }
234
235        Ok(())
236    }
237
238    async fn start_feed_trades(
239        &self,
240        instrument_id: InstrumentId,
241        mut trade_rx: mpsc::Receiver<Self::TradeMessage>,
242        options: Option<FeederOptions>,
243    ) -> Result<mpsc::Receiver<MarketTrade>> {
244        // Extract symbol from InstrumentId
245        let symbol = instrument_id.symbol.clone();
246        let (tx, rx) = mpsc::channel(
247            options
248                .as_ref()
249                .map(|opt| opt.channel_buffer_size)
250                .unwrap_or(1024),
251        );
252
253        // Create stop signal
254        let (stop_tx, _) = watch::channel(false);
255        let key = String::from(format!("trades:{symbol}"));
256        self.stop_signals.write().insert(key.clone(), stop_tx);
257        let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
258
259        // Clone clock for use in the task
260        let clock = self.clock.clone();
261
262        // Clone stats for updating feed statistics
263        let stats = self.stats.clone();
264        let stats_key = String::from(format!("trades:{symbol}"));
265
266        // Initialize stats for this feed if not already present
267        stats.write().entry(stats_key.clone()).or_default();
268
269        tokio::spawn(async move {
270            loop {
271                tokio::select! {
272                    // Check for stop signal
273                    _ = stop_rx.changed() => {
274                        break;
275                    }
276
277                    // Process incoming trade messages
278                    Some(trade_msg) = trade_rx.recv() => {
279                        // Skip if not for our symbol
280                        if trade_msg.symbol != symbol {
281                            continue;
282                        }
283
284                        // Capture processing timestamp Instant and processing start (ns)
285                        let timestamp = clock.now();
286                        let process_start = clock.raw();
287
288                        // Convert to ParsedTradeData
289                        let parsed_trade = ParsedTradeData::from(trade_msg);
290
291                        // Determine order side
292                        let direction = if parsed_trade.is_buyer_market_maker {
293                            OrderSide::Sell
294                        } else {
295                            OrderSide::Buy
296                        };
297
298                        // Create standardized Trade model
299                        let trade = MarketTrade {
300                            timestamp,
301                            exchange_time_ns: parsed_trade.trade_time * 1_000_000, // Convert ms to ns
302                            price: parsed_trade.price,
303                            quantity: parsed_trade.quantity,
304                            direction,
305                            instrument_id: instrument_id.clone(),
306                        };
307
308                        // Calculate processing latency
309                        let process_end = clock.raw();
310                        let latency_ns = process_end.saturating_sub(process_start);
311
312                        // Update statistics
313                        if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
314                            feed_stats.messages_processed += 1;
315                            feed_stats.avg_process_latency_ns =
316                                (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; // Moving average
317                            feed_stats.max_process_latency_ns =
318                                feed_stats.max_process_latency_ns.max(latency_ns);
319                            feed_stats.last_update_time = process_end;
320                        }
321
322                        // Send the trade
323                        if tx.send(trade).await.is_err() {
324                            // Update dropped message count on error
325                            if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
326                                feed_stats.dropped_messages += 1;
327                            }
328                            break;
329                        }
330                    }
331                }
332            }
333        });
334
335        Ok(rx)
336    }
337
338    async fn stop_feed_trades(&self, instrument_id: &InstrumentId) -> Result<()> {
339        // Extract symbol from InstrumentId
340        let symbol = &instrument_id.symbol;
341        let key = String::from(format!("trades:{symbol}"));
342
343        if let Some(tx) = self.stop_signals.write().remove(&key) {
344            let _ = tx.send(true);
345        }
346
347        Ok(())
348    }
349
350    async fn start_feed_bars(
351        &self,
352        instrument_id: InstrumentId,
353        bar_type: BarType,
354        mut trade_rx: mpsc::Receiver<MarketTrade>,
355        options: Option<FeederOptions>,
356    ) -> Result<mpsc::Receiver<Bar>> {
357        // Extract symbol and interval from parameters
358        let symbol = instrument_id.symbol.clone();
359        let cache_key = String::from(format!("{symbol}:{bar_type}"));
360
361        // Create a new bar cache or get existing one
362        let bar_cache = {
363            let mut caches = self.bar_caches.write();
364            if !caches.contains_key(&cache_key) {
365                caches.insert(cache_key.clone(), Arc::new(RwLock::new(BarCache::new())));
366            }
367            caches.get(&cache_key).unwrap().clone()
368        };
369
370        // Create channel with buffer size from options or default
371        let buffer_size = options
372            .as_ref()
373            .map(|opt| opt.channel_buffer_size)
374            .unwrap_or(1024);
375        let (tx, rx) = mpsc::channel(buffer_size);
376
377        // Create stop signal
378        let interval_sec = match bar_type.get_spec().aggregation {
379            BarAggregation::Second => bar_type.get_spec().step,
380            BarAggregation::Minute => bar_type.get_spec().step * 60,
381            BarAggregation::Hour => bar_type.get_spec().step * 3600,
382            BarAggregation::Day => bar_type.get_spec().step * 86400,
383            _ => return Err(anyhow!("Only time-based bars are supported")),
384        };
385        let (stop_tx, _) = watch::channel(false);
386        let key = String::from(format!("bars:{symbol}:{interval_sec}"));
387        self.stop_signals.write().insert(key.clone(), stop_tx);
388        let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
389
390        // Clone clock and stats for use in the task
391        let clock = self.clock.clone();
392        let stats = self.stats.clone();
393        let stats_key = String::from(format!("bars:{symbol}:{interval_sec}"));
394
395        // Initialize stats for this feed if not already present
396        stats.write().entry(stats_key.clone()).or_default();
397
398        // Create bar aggregator
399        let mut bar_aggregator = BarAggregator::new(
400            instrument_id.clone(),
401            bar_type.clone(),
402            bar_cache.clone(),
403            clock.clone(),
404        )?;
405
406        // Spawn task to process trades and generate bars
407        tokio::spawn(async move {
408            loop {
409                tokio::select! {
410                    // Check for stop signal
411                    _ = stop_rx.changed() => {
412                        break;
413                    }
414
415                    // Process incoming trades
416                    Some(trade) = trade_rx.recv() => {
417                        // Capture processing start time
418                        let process_start = clock.raw();
419
420                        // Process trade and create bar if needed
421                        if let Some(bar) = bar_aggregator.process_trade(&trade) {
422                            // Add to bar cache
423                            bar_cache.write().add_bar(bar.clone());
424
425                            // Send the bar
426                            if tx.send(bar).await.is_err() {
427                                // Update dropped message count on error
428                                if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
429                                    feed_stats.dropped_messages += 1;
430                                }
431                                break;
432                            }
433                        }
434
435                        // Calculate processing latency
436                        let process_end = clock.raw();
437                        let latency_ns = process_end.saturating_sub(process_start);
438
439                        // Update statistics
440                        if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
441                            feed_stats.messages_processed += 1;
442                            feed_stats.avg_process_latency_ns =
443                                (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; // Moving average
444                            feed_stats.max_process_latency_ns =
445                                feed_stats.max_process_latency_ns.max(latency_ns);
446                            feed_stats.last_update_time = process_end;
447                        }
448                    }
449                }
450            }
451        });
452
453        Ok(rx)
454    }
455
456    async fn stop_feed_bars(&self, instrument_id: &InstrumentId, bar_type: &BarType) -> Result<()> {
457        // Extract symbol and interval from parameters
458        let symbol = &instrument_id.symbol;
459        let interval_sec = match bar_type.get_spec().aggregation {
460            BarAggregation::Second => bar_type.get_spec().step,
461            BarAggregation::Minute => bar_type.get_spec().step * 60,
462            BarAggregation::Hour => bar_type.get_spec().step * 3600,
463            BarAggregation::Day => bar_type.get_spec().step * 86400,
464            _ => return Err(anyhow!("Only time-based bars are supported")),
465        };
466
467        let key = String::from(format!("bars:{symbol}:{interval_sec}"));
468        if let Some(tx) = self.stop_signals.write().remove(&key) {
469            let _ = tx.send(true);
470        }
471
472        Ok(())
473    }
474
475    async fn get_shared_orderbook(
476        &self,
477        instrument_id: &InstrumentId,
478    ) -> Result<SharedSimdOrderBook> {
479        // Extract symbol from InstrumentId
480        let symbol = &instrument_id.symbol;
481
482        // Get existing orderbook or create a new one
483        let orderbooks = self.orderbooks.read();
484        if let Some(orderbook) = orderbooks.get(symbol) {
485            Ok(orderbook.clone())
486        } else {
487            // Create a new orderbook
488            let ob = rusty_model::data::orderbook::OrderBook::<64>::new(
489                symbol.clone(),
490                0,                // exchange_timestamp_ns - will be updated on first message
491                self.clock.raw(), // system_timestamp_ns
492                SmallVec::<[PriceLevel; 64]>::new(), // empty bids
493                SmallVec::<[PriceLevel; 64]>::new(), // empty asks
494            );
495            let shared_orderbook = SharedSimdOrderBook::from_orderbook(&ob);
496            drop(orderbooks); // Release the read lock before acquiring write lock
497
498            // Store the orderbook
499            self.orderbooks
500                .write()
501                .insert(symbol.clone(), shared_orderbook.clone());
502
503            Ok(shared_orderbook)
504        }
505    }
506
507    async fn get_bar_cache(
508        &self,
509        instrument_id: &InstrumentId,
510        bar_type: &BarType,
511        max_bars: usize,
512    ) -> Result<Arc<RwLock<BarCache>>> {
513        // Create a unique key for this bar cache
514        let key = String::from(format!("{}:{}", instrument_id.symbol, bar_type));
515
516        // Check if we already have this bar cache
517        let caches = self.bar_caches.read();
518        if let Some(cache) = caches.get(&key) {
519            return Ok(cache.clone());
520        }
521        drop(caches); // Release read lock
522
523        // Create a new bar cache
524        let cache = Arc::new(RwLock::new(BarCache::new()));
525        self.bar_caches.write().insert(key, cache.clone());
526
527        Ok(cache)
528    }
529
530    async fn get_stats(&self, instrument_id: &InstrumentId) -> Result<FeedStats> {
531        // Check for depth stats
532        let key = String::from(format!("depth:{}", instrument_id.symbol));
533        if let Some(stats) = self.stats.read().get(&key) {
534            return Ok(stats.clone());
535        }
536
537        // Check for trade stats
538        let key = String::from(format!("trades:{}", instrument_id.symbol));
539        if let Some(stats) = self.stats.read().get(&key) {
540            return Ok(stats.clone());
541        }
542
543        // Return default stats if not found
544        Ok(FeedStats::default())
545    }
546
547    async fn reset_stats(&self) -> Result<()> {
548        self.stats.write().clear();
549        Ok(())
550    }
551}