rusty_feeder/exchange/binance/spot/
feeder.rs1use rusty_common::collections::FxHashMap;
6use smartstring::alias::String;
7use std::sync::Arc;
8
9use anyhow::{Result, anyhow};
10use async_trait::async_trait;
11use parking_lot::RwLock;
12use quanta::Clock;
13use rusty_model::{
14 data::{
15 bar::{Bar, BarAggregation, BarCache, BarType},
16 book_snapshot::OrderBookSnapshot,
17 market_trade::MarketTrade,
18 orderbook::PriceLevel,
19 simd_orderbook::{SharedSimdOrderBook, SimdOrderBook},
20 },
21 enums::OrderSide,
22 instruments::InstrumentId,
23};
24use smallvec::SmallVec;
25use tokio::sync::{mpsc, watch};
26
27use super::data::{
28 orderbook::{OrderbookMessage, ParsedOrderbookData},
29 trade::{ParsedTradeData, TradeMessage},
30};
31use crate::exchange::binance::common::bar_aggregator::BarAggregator;
32use crate::feeder::{FeedStats, Feeder, FeederOptions};
33
34#[derive(Debug)]
36pub struct BinanceSpotFeeder {
37 stop_signals: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
39
40 clock: Clock,
42
43 stats: Arc<RwLock<FxHashMap<String, FeedStats>>>,
45
46 orderbooks: Arc<RwLock<FxHashMap<String, SharedSimdOrderBook>>>,
48
49 bar_caches: Arc<RwLock<FxHashMap<String, Arc<RwLock<BarCache>>>>>,
51}
52
53impl BinanceSpotFeeder {
54 #[inline]
56 #[must_use]
57 pub fn new() -> Self {
58 Self {
59 stop_signals: Arc::new(RwLock::new(FxHashMap::default())),
60 clock: Clock::new(),
61 stats: Arc::new(RwLock::new(FxHashMap::default())),
62 orderbooks: Arc::new(RwLock::new(FxHashMap::default())),
63 bar_caches: Arc::new(RwLock::new(FxHashMap::default())),
64 }
65 }
66}
67
68impl Default for BinanceSpotFeeder {
69 fn default() -> Self {
70 Self::new()
71 }
72}
73
74#[async_trait]
75impl Feeder for BinanceSpotFeeder {
76 type DepthMessage = OrderbookMessage;
77 type TradeMessage = TradeMessage;
78
79 async fn start_feed_depth(
80 &self,
81 instrument_id: InstrumentId,
82 mut depth_rx: mpsc::Receiver<Self::DepthMessage>,
83 options: Option<FeederOptions>,
84 ) -> Result<mpsc::Receiver<OrderBookSnapshot>> {
85 let symbol = instrument_id.symbol.clone();
87 let (tx, rx) = mpsc::channel(
88 options
89 .as_ref()
90 .map(|opt| opt.channel_buffer_size)
91 .unwrap_or(1024),
92 );
93
94 let ob = rusty_model::data::orderbook::OrderBook::<64>::new(
96 symbol.clone(),
97 0, self.clock.raw(), SmallVec::<[PriceLevel; 64]>::new(), SmallVec::<[PriceLevel; 64]>::new(), );
102 let shared_orderbook = SharedSimdOrderBook::from_orderbook(&ob);
103 self.orderbooks
104 .write()
105 .insert(symbol.clone(), shared_orderbook.clone());
106
107 let (stop_tx, _) = watch::channel(false);
109 let key = String::from(format!("depth:{symbol}"));
110 self.stop_signals.write().insert(key.clone(), stop_tx);
111 let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
112
113 let clock = self.clock.clone();
115
116 let stats = self.stats.clone();
118 let stats_key = String::from(format!("depth:{symbol}"));
119
120 stats.write().entry(stats_key.clone()).or_default();
122
123 tokio::spawn(async move {
124 loop {
125 tokio::select! {
126 _ = stop_rx.changed() => {
128 break;
129 }
130
131 Some(update) = depth_rx.recv() => {
133 if update.symbol != symbol {
135 continue;
136 }
137
138 let process_start = clock.raw();
140
141 let parsed_data = ParsedOrderbookData::from(update);
143
144 let _bids_vec: SmallVec<[PriceLevel; 64]> = parsed_data.bids
146 .iter()
147 .map(|&(price, size)| PriceLevel::new(price, size))
148 .collect();
149
150 let _asks_vec: SmallVec<[PriceLevel; 64]> = parsed_data.asks
151 .iter()
152 .map(|&(price, size)| PriceLevel::new(price, size))
153 .collect();
154
155 let bids_vec: SmallVec<[PriceLevel; 64]> = parsed_data.bids
157 .iter()
158 .map(|&(price, quantity)| PriceLevel::new(price, quantity))
159 .collect();
160 let asks_vec: SmallVec<[PriceLevel; 64]> = parsed_data.asks
161 .iter()
162 .map(|&(price, quantity)| PriceLevel::new(price, quantity))
163 .collect();
164
165 let model_orderbook = rusty_model::data::orderbook::OrderBook::<64>::new(
166 symbol.clone(),
167 parsed_data.final_update_id * 1_000_000, process_start,
169 bids_vec,
170 asks_vec,
171 );
172 shared_orderbook.write(|ob| {
173 *ob = SimdOrderBook::from_orderbook(&model_orderbook);
174 });
175
176 let mut depth = OrderBookSnapshot::new_empty(
178 instrument_id.clone(),
179 process_start,
180 parsed_data.final_update_id,
181 );
182
183 for (price, quantity) in parsed_data.bids {
185 depth.add_bid(price, quantity);
186 }
187
188 for (price, quantity) in parsed_data.asks {
189 depth.add_ask(price, quantity);
190 }
191
192 let process_end = clock.raw();
194 let latency_ns = process_end.saturating_sub(process_start);
195
196 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
198 feed_stats.messages_processed += 1;
199 feed_stats.avg_process_latency_ns =
200 (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; feed_stats.max_process_latency_ns =
202 feed_stats.max_process_latency_ns.max(latency_ns);
203 feed_stats.last_update_time = process_end;
204 }
205
206 if tx.send(depth).await.is_err() {
208 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
210 feed_stats.dropped_messages += 1;
211 }
212 break;
213 }
214 }
215 }
216 }
217 });
218
219 Ok(rx)
220 }
221
222 async fn stop_feed_depth(&self, instrument_id: &InstrumentId) -> Result<()> {
223 let symbol = &instrument_id.symbol;
225 let key = String::from(format!("depth:{symbol}"));
226
227 self.orderbooks.write().remove(symbol);
229
230 if let Some(tx) = self.stop_signals.write().remove(&key) {
232 let _ = tx.send(true);
233 }
234
235 Ok(())
236 }
237
238 async fn start_feed_trades(
239 &self,
240 instrument_id: InstrumentId,
241 mut trade_rx: mpsc::Receiver<Self::TradeMessage>,
242 options: Option<FeederOptions>,
243 ) -> Result<mpsc::Receiver<MarketTrade>> {
244 let symbol = instrument_id.symbol.clone();
246 let (tx, rx) = mpsc::channel(
247 options
248 .as_ref()
249 .map(|opt| opt.channel_buffer_size)
250 .unwrap_or(1024),
251 );
252
253 let (stop_tx, _) = watch::channel(false);
255 let key = String::from(format!("trades:{symbol}"));
256 self.stop_signals.write().insert(key.clone(), stop_tx);
257 let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
258
259 let clock = self.clock.clone();
261
262 let stats = self.stats.clone();
264 let stats_key = String::from(format!("trades:{symbol}"));
265
266 stats.write().entry(stats_key.clone()).or_default();
268
269 tokio::spawn(async move {
270 loop {
271 tokio::select! {
272 _ = stop_rx.changed() => {
274 break;
275 }
276
277 Some(trade_msg) = trade_rx.recv() => {
279 if trade_msg.symbol != symbol {
281 continue;
282 }
283
284 let timestamp = clock.now();
286 let process_start = clock.raw();
287
288 let parsed_trade = ParsedTradeData::from(trade_msg);
290
291 let direction = if parsed_trade.is_buyer_market_maker {
293 OrderSide::Sell
294 } else {
295 OrderSide::Buy
296 };
297
298 let trade = MarketTrade {
300 timestamp,
301 exchange_time_ns: parsed_trade.trade_time * 1_000_000, price: parsed_trade.price,
303 quantity: parsed_trade.quantity,
304 direction,
305 instrument_id: instrument_id.clone(),
306 };
307
308 let process_end = clock.raw();
310 let latency_ns = process_end.saturating_sub(process_start);
311
312 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
314 feed_stats.messages_processed += 1;
315 feed_stats.avg_process_latency_ns =
316 (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; feed_stats.max_process_latency_ns =
318 feed_stats.max_process_latency_ns.max(latency_ns);
319 feed_stats.last_update_time = process_end;
320 }
321
322 if tx.send(trade).await.is_err() {
324 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
326 feed_stats.dropped_messages += 1;
327 }
328 break;
329 }
330 }
331 }
332 }
333 });
334
335 Ok(rx)
336 }
337
338 async fn stop_feed_trades(&self, instrument_id: &InstrumentId) -> Result<()> {
339 let symbol = &instrument_id.symbol;
341 let key = String::from(format!("trades:{symbol}"));
342
343 if let Some(tx) = self.stop_signals.write().remove(&key) {
344 let _ = tx.send(true);
345 }
346
347 Ok(())
348 }
349
350 async fn start_feed_bars(
351 &self,
352 instrument_id: InstrumentId,
353 bar_type: BarType,
354 mut trade_rx: mpsc::Receiver<MarketTrade>,
355 options: Option<FeederOptions>,
356 ) -> Result<mpsc::Receiver<Bar>> {
357 let symbol = instrument_id.symbol.clone();
359 let cache_key = String::from(format!("{symbol}:{bar_type}"));
360
361 let bar_cache = {
363 let mut caches = self.bar_caches.write();
364 if !caches.contains_key(&cache_key) {
365 caches.insert(cache_key.clone(), Arc::new(RwLock::new(BarCache::new())));
366 }
367 caches.get(&cache_key).unwrap().clone()
368 };
369
370 let buffer_size = options
372 .as_ref()
373 .map(|opt| opt.channel_buffer_size)
374 .unwrap_or(1024);
375 let (tx, rx) = mpsc::channel(buffer_size);
376
377 let interval_sec = match bar_type.get_spec().aggregation {
379 BarAggregation::Second => bar_type.get_spec().step,
380 BarAggregation::Minute => bar_type.get_spec().step * 60,
381 BarAggregation::Hour => bar_type.get_spec().step * 3600,
382 BarAggregation::Day => bar_type.get_spec().step * 86400,
383 _ => return Err(anyhow!("Only time-based bars are supported")),
384 };
385 let (stop_tx, _) = watch::channel(false);
386 let key = String::from(format!("bars:{symbol}:{interval_sec}"));
387 self.stop_signals.write().insert(key.clone(), stop_tx);
388 let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
389
390 let clock = self.clock.clone();
392 let stats = self.stats.clone();
393 let stats_key = String::from(format!("bars:{symbol}:{interval_sec}"));
394
395 stats.write().entry(stats_key.clone()).or_default();
397
398 let mut bar_aggregator = BarAggregator::new(
400 instrument_id.clone(),
401 bar_type.clone(),
402 bar_cache.clone(),
403 clock.clone(),
404 )?;
405
406 tokio::spawn(async move {
408 loop {
409 tokio::select! {
410 _ = stop_rx.changed() => {
412 break;
413 }
414
415 Some(trade) = trade_rx.recv() => {
417 let process_start = clock.raw();
419
420 if let Some(bar) = bar_aggregator.process_trade(&trade) {
422 bar_cache.write().add_bar(bar.clone());
424
425 if tx.send(bar).await.is_err() {
427 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
429 feed_stats.dropped_messages += 1;
430 }
431 break;
432 }
433 }
434
435 let process_end = clock.raw();
437 let latency_ns = process_end.saturating_sub(process_start);
438
439 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
441 feed_stats.messages_processed += 1;
442 feed_stats.avg_process_latency_ns =
443 (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; feed_stats.max_process_latency_ns =
445 feed_stats.max_process_latency_ns.max(latency_ns);
446 feed_stats.last_update_time = process_end;
447 }
448 }
449 }
450 }
451 });
452
453 Ok(rx)
454 }
455
456 async fn stop_feed_bars(&self, instrument_id: &InstrumentId, bar_type: &BarType) -> Result<()> {
457 let symbol = &instrument_id.symbol;
459 let interval_sec = match bar_type.get_spec().aggregation {
460 BarAggregation::Second => bar_type.get_spec().step,
461 BarAggregation::Minute => bar_type.get_spec().step * 60,
462 BarAggregation::Hour => bar_type.get_spec().step * 3600,
463 BarAggregation::Day => bar_type.get_spec().step * 86400,
464 _ => return Err(anyhow!("Only time-based bars are supported")),
465 };
466
467 let key = String::from(format!("bars:{symbol}:{interval_sec}"));
468 if let Some(tx) = self.stop_signals.write().remove(&key) {
469 let _ = tx.send(true);
470 }
471
472 Ok(())
473 }
474
475 async fn get_shared_orderbook(
476 &self,
477 instrument_id: &InstrumentId,
478 ) -> Result<SharedSimdOrderBook> {
479 let symbol = &instrument_id.symbol;
481
482 let orderbooks = self.orderbooks.read();
484 if let Some(orderbook) = orderbooks.get(symbol) {
485 Ok(orderbook.clone())
486 } else {
487 let ob = rusty_model::data::orderbook::OrderBook::<64>::new(
489 symbol.clone(),
490 0, self.clock.raw(), SmallVec::<[PriceLevel; 64]>::new(), SmallVec::<[PriceLevel; 64]>::new(), );
495 let shared_orderbook = SharedSimdOrderBook::from_orderbook(&ob);
496 drop(orderbooks); self.orderbooks
500 .write()
501 .insert(symbol.clone(), shared_orderbook.clone());
502
503 Ok(shared_orderbook)
504 }
505 }
506
507 async fn get_bar_cache(
508 &self,
509 instrument_id: &InstrumentId,
510 bar_type: &BarType,
511 max_bars: usize,
512 ) -> Result<Arc<RwLock<BarCache>>> {
513 let key = String::from(format!("{}:{}", instrument_id.symbol, bar_type));
515
516 let caches = self.bar_caches.read();
518 if let Some(cache) = caches.get(&key) {
519 return Ok(cache.clone());
520 }
521 drop(caches); let cache = Arc::new(RwLock::new(BarCache::new()));
525 self.bar_caches.write().insert(key, cache.clone());
526
527 Ok(cache)
528 }
529
530 async fn get_stats(&self, instrument_id: &InstrumentId) -> Result<FeedStats> {
531 let key = String::from(format!("depth:{}", instrument_id.symbol));
533 if let Some(stats) = self.stats.read().get(&key) {
534 return Ok(stats.clone());
535 }
536
537 let key = String::from(format!("trades:{}", instrument_id.symbol));
539 if let Some(stats) = self.stats.read().get(&key) {
540 return Ok(stats.clone());
541 }
542
543 Ok(FeedStats::default())
545 }
546
547 async fn reset_stats(&self) -> Result<()> {
548 self.stats.write().clear();
549 Ok(())
550 }
551}