rusty_feeder/exchange/binance/futures/
feeder.rs1use rusty_common::collections::FxHashMap;
6use smartstring::alias::String;
7use std::sync::Arc;
8
9use anyhow::{Result, anyhow};
10use async_trait::async_trait;
11use parking_lot::RwLock;
12use quanta::Clock;
13use rusty_model::{
14 PriceLevel,
15 data::{
16 bar::{Bar, BarAggregation, BarCache, BarType},
17 book_snapshot::OrderBookSnapshot,
18 market_trade::MarketTrade,
19 orderbook::OrderBook,
20 simd_orderbook::{SharedSimdOrderBook, SimdOrderBook},
21 },
22 enums::OrderSide,
23 instruments::InstrumentId,
24};
25use smallvec::SmallVec;
26use tokio::sync::{mpsc, watch};
27
28use super::data::{
29 orderbook::{OrderbookMessage, ParsedOrderbookData},
30 trade::{AggTradeMessage, ParsedAggTradeData},
31};
32use crate::exchange::binance::common::bar_aggregator::BarAggregator;
33use crate::feeder::{FeedStats, Feeder, FeederOptions};
34
35#[derive(Debug)]
37pub struct BinanceFuturesFeeder {
38 stop_signals: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
40
41 clock: Clock,
43
44 stats: Arc<RwLock<FxHashMap<String, FeedStats>>>,
46
47 orderbooks: Arc<RwLock<FxHashMap<String, SharedSimdOrderBook>>>,
49
50 bar_caches: Arc<RwLock<FxHashMap<String, Arc<RwLock<BarCache>>>>>,
52}
53
54impl BinanceFuturesFeeder {
55 #[inline]
57 #[must_use]
58 pub fn new() -> Self {
59 Self {
60 stop_signals: Arc::new(RwLock::new(FxHashMap::default())),
61 clock: Clock::new(),
62 stats: Arc::new(RwLock::new(FxHashMap::default())),
63 orderbooks: Arc::new(RwLock::new(FxHashMap::default())),
64 bar_caches: Arc::new(RwLock::new(FxHashMap::default())),
65 }
66 }
67}
68
69impl Default for BinanceFuturesFeeder {
70 fn default() -> Self {
71 Self::new()
72 }
73}
74
75#[async_trait]
76impl Feeder for BinanceFuturesFeeder {
77 type DepthMessage = OrderbookMessage;
78 type TradeMessage = AggTradeMessage;
79
80 async fn start_feed_depth(
81 &self,
82 instrument_id: InstrumentId,
83 mut depth_rx: mpsc::Receiver<Self::DepthMessage>,
84 options: Option<FeederOptions>,
85 ) -> Result<mpsc::Receiver<OrderBookSnapshot>> {
86 let symbol = instrument_id.symbol.clone();
88 let (tx, rx) = mpsc::channel(
89 options
90 .as_ref()
91 .map(|opt| opt.channel_buffer_size)
92 .unwrap_or(1024),
93 );
94
95 let ob = OrderBook::<64>::new(
97 symbol.clone(),
98 0, self.clock.raw(), SmallVec::<[PriceLevel; 64]>::new(), SmallVec::<[PriceLevel; 64]>::new(), );
103 let shared_orderbook = SharedSimdOrderBook::from_orderbook(&ob);
104 self.orderbooks
105 .write()
106 .insert(symbol.clone(), shared_orderbook.clone());
107
108 let (stop_tx, _) = watch::channel(false);
110 let key = String::from(format!("depth:{symbol}"));
111 self.stop_signals.write().insert(key.clone(), stop_tx);
112 let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
113
114 let clock = self.clock.clone();
116
117 let stats = self.stats.clone();
119 let stats_key = String::from(format!("depth:{symbol}"));
120
121 stats.write().entry(stats_key.clone()).or_default();
123
124 tokio::spawn(async move {
125 loop {
126 tokio::select! {
127 _ = stop_rx.changed() => {
129 break;
130 }
131
132 Some(update) = depth_rx.recv() => {
134 if update.symbol != symbol {
136 continue;
137 }
138
139 let process_start = clock.raw();
141
142 let parsed_data = ParsedOrderbookData::from(update);
144
145 let _bids_vec: SmallVec<[PriceLevel; 64]> = parsed_data.bids
147 .iter()
148 .map(|&(price, quantity)| PriceLevel::new(price, quantity))
149 .collect();
150
151 let _asks_vec: SmallVec<[PriceLevel; 64]> = parsed_data.asks
152 .iter()
153 .map(|&(price, quantity)| PriceLevel::new(price, quantity))
154 .collect();
155
156 let bids_vec: SmallVec<[PriceLevel; 64]> = parsed_data.bids
158 .iter()
159 .map(|&(price, quantity)| PriceLevel::new(price, quantity))
160 .collect();
161 let asks_vec: SmallVec<[PriceLevel; 64]> = parsed_data.asks
162 .iter()
163 .map(|&(price, quantity)| PriceLevel::new(price, quantity))
164 .collect();
165
166 let model_orderbook = OrderBook::<64>::new(
167 instrument_id.symbol.clone(),
168 parsed_data.final_update_id * 1_000_000, process_start,
170 bids_vec,
171 asks_vec,
172 );
173 shared_orderbook.write(|ob| {
174 *ob = SimdOrderBook::from_orderbook(&model_orderbook);
175 });
176
177 let mut depth = OrderBookSnapshot::new_empty(
179 instrument_id.clone(),
180 process_start,
181 parsed_data.final_update_id,
182 );
183
184 for (price, quantity) in parsed_data.bids {
186 depth.add_bid(price, quantity);
187 }
188
189 for (price, quantity) in parsed_data.asks {
190 depth.add_ask(price, quantity);
191 }
192
193 let process_end = clock.raw();
195 let latency_ns = process_end.saturating_sub(process_start);
196
197 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
199 feed_stats.messages_processed += 1;
200 feed_stats.avg_process_latency_ns =
201 (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; feed_stats.max_process_latency_ns =
203 feed_stats.max_process_latency_ns.max(latency_ns);
204 feed_stats.last_update_time = process_end;
205 }
206
207 if tx.send(depth).await.is_err() {
209 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
211 feed_stats.dropped_messages += 1;
212 }
213 break;
214 }
215 }
216 }
217 }
218 });
219
220 Ok(rx)
221 }
222
223 async fn stop_feed_depth(&self, instrument_id: &InstrumentId) -> Result<()> {
224 let symbol = &instrument_id.symbol;
226 let key = String::from(format!("depth:{symbol}"));
227
228 self.orderbooks.write().remove(symbol);
230
231 if let Some(tx) = self.stop_signals.write().remove(&key) {
233 let _ = tx.send(true);
234 }
235
236 Ok(())
237 }
238
239 async fn start_feed_trades(
240 &self,
241 instrument_id: InstrumentId,
242 mut trade_rx: mpsc::Receiver<Self::TradeMessage>,
243 options: Option<FeederOptions>,
244 ) -> Result<mpsc::Receiver<MarketTrade>> {
245 let symbol = instrument_id.symbol.clone();
247 let (tx, rx) = mpsc::channel(
248 options
249 .as_ref()
250 .map(|opt| opt.channel_buffer_size)
251 .unwrap_or(1024),
252 );
253
254 let (stop_tx, _) = watch::channel(false);
256 let key = String::from(format!("trades:{symbol}"));
257 self.stop_signals.write().insert(key.clone(), stop_tx);
258 let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
259
260 let clock = self.clock.clone();
262
263 let stats = self.stats.clone();
265 let stats_key = String::from(format!("trades:{symbol}"));
266
267 stats.write().entry(stats_key.clone()).or_default();
269
270 tokio::spawn(async move {
271 loop {
272 tokio::select! {
273 _ = stop_rx.changed() => {
275 break;
276 }
277
278 Some(trade_msg) = trade_rx.recv() => {
280 if trade_msg.symbol != symbol {
282 continue;
283 }
284
285 let timestamp = clock.now();
287 let process_start = clock.raw();
288
289 let parsed_trade = ParsedAggTradeData::from(trade_msg);
291
292 let direction = if parsed_trade.is_buyer_market_maker {
294 OrderSide::Sell
295 } else {
296 OrderSide::Buy
297 };
298
299 let trade = MarketTrade {
301 timestamp,
302 exchange_time_ns: parsed_trade.trade_time * 1_000_000, price: parsed_trade.price,
304 quantity: parsed_trade.quantity,
305 direction,
306 instrument_id: instrument_id.clone(),
307 };
308
309 let process_end = clock.raw();
311 let latency_ns = process_end.saturating_sub(process_start);
312
313 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
315 feed_stats.messages_processed += 1;
316 feed_stats.avg_process_latency_ns =
317 (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; feed_stats.max_process_latency_ns =
319 feed_stats.max_process_latency_ns.max(latency_ns);
320 feed_stats.last_update_time = process_end;
321 }
322
323 if tx.send(trade).await.is_err() {
325 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
327 feed_stats.dropped_messages += 1;
328 }
329 break;
330 }
331 }
332 }
333 }
334 });
335
336 Ok(rx)
337 }
338
339 async fn stop_feed_trades(&self, instrument_id: &InstrumentId) -> Result<()> {
340 let symbol = &instrument_id.symbol;
342 let key = String::from(format!("trades:{symbol}"));
343
344 if let Some(tx) = self.stop_signals.write().remove(&key) {
345 let _ = tx.send(true);
346 }
347
348 Ok(())
349 }
350
351 async fn start_feed_bars(
352 &self,
353 instrument_id: InstrumentId,
354 bar_type: BarType,
355 mut trade_rx: mpsc::Receiver<MarketTrade>,
356 options: Option<FeederOptions>,
357 ) -> Result<mpsc::Receiver<Bar>> {
358 let symbol = instrument_id.symbol.clone();
360 let cache_key = String::from(format!("{symbol}:{bar_type}"));
361
362 let bar_cache = {
364 let mut caches = self.bar_caches.write();
365 if !caches.contains_key(&cache_key) {
366 caches.insert(cache_key.clone(), Arc::new(RwLock::new(BarCache::new())));
367 }
368 caches.get(&cache_key).unwrap().clone()
369 };
370
371 let buffer_size = options
373 .as_ref()
374 .map(|opt| opt.channel_buffer_size)
375 .unwrap_or(1024);
376 let (tx, rx) = mpsc::channel(buffer_size);
377
378 let interval_sec = match bar_type.get_spec().aggregation {
380 BarAggregation::Second => bar_type.get_spec().step,
381 BarAggregation::Minute => bar_type.get_spec().step * 60,
382 BarAggregation::Hour => bar_type.get_spec().step * 3600,
383 BarAggregation::Day => bar_type.get_spec().step * 86400,
384 _ => return Err(anyhow!("Only time-based bars are supported")),
385 };
386 let (stop_tx, _) = watch::channel(false);
387 let key = String::from(format!("bars:{symbol}:{interval_sec}"));
388 self.stop_signals.write().insert(key.clone(), stop_tx);
389 let mut stop_rx = self.stop_signals.read().get(&key).unwrap().subscribe();
390
391 let clock = self.clock.clone();
393 let stats = self.stats.clone();
394 let stats_key = String::from(format!("bars:{symbol}:{interval_sec}"));
395
396 stats.write().entry(stats_key.clone()).or_default();
398
399 let mut bar_aggregator = BarAggregator::new(
401 instrument_id.clone(),
402 bar_type.clone(),
403 bar_cache.clone(),
404 clock.clone(),
405 )?;
406
407 tokio::spawn(async move {
409 loop {
410 tokio::select! {
411 _ = stop_rx.changed() => {
413 break;
414 }
415
416 Some(trade) = trade_rx.recv() => {
418 let process_start = clock.raw();
420
421 if let Some(bar) = bar_aggregator.process_trade(&trade) {
423 bar_cache.write().add_bar(bar.clone());
425
426 if tx.send(bar).await.is_err() {
428 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
430 feed_stats.dropped_messages += 1;
431 }
432 break;
433 }
434 }
435
436 let process_end = clock.raw();
438 let latency_ns = process_end.saturating_sub(process_start);
439
440 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
442 feed_stats.messages_processed += 1;
443 feed_stats.avg_process_latency_ns =
444 (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; feed_stats.max_process_latency_ns =
446 feed_stats.max_process_latency_ns.max(latency_ns);
447 feed_stats.last_update_time = process_end;
448 }
449 }
450 }
451 }
452 });
453
454 Ok(rx)
455 }
456
457 async fn stop_feed_bars(&self, instrument_id: &InstrumentId, bar_type: &BarType) -> Result<()> {
458 let symbol = &instrument_id.symbol;
460 let interval_sec = match bar_type.get_spec().aggregation {
461 BarAggregation::Second => bar_type.get_spec().step,
462 BarAggregation::Minute => bar_type.get_spec().step * 60,
463 BarAggregation::Hour => bar_type.get_spec().step * 3600,
464 BarAggregation::Day => bar_type.get_spec().step * 86400,
465 _ => return Err(anyhow!("Only time-based bars are supported")),
466 };
467
468 let key = String::from(format!("bars:{symbol}:{interval_sec}"));
469 if let Some(tx) = self.stop_signals.write().remove(&key) {
470 let _ = tx.send(true);
471 }
472
473 Ok(())
474 }
475
476 async fn get_shared_orderbook(
477 &self,
478 instrument_id: &InstrumentId,
479 ) -> Result<SharedSimdOrderBook> {
480 let symbol = &instrument_id.symbol;
482
483 let orderbooks = self.orderbooks.read();
485 if let Some(orderbook) = orderbooks.get(symbol) {
486 Ok(orderbook.clone())
487 } else {
488 let ob = OrderBook::<64>::new(
490 symbol.clone(),
491 0, self.clock.raw(), SmallVec::<[PriceLevel; 64]>::new(), SmallVec::<[PriceLevel; 64]>::new(), );
496 let shared_orderbook = SharedSimdOrderBook::from_orderbook(&ob);
497 drop(orderbooks);
498
499 self.orderbooks
501 .write()
502 .insert(symbol.clone(), shared_orderbook.clone());
503
504 Ok(shared_orderbook)
505 }
506 }
507
508 async fn get_bar_cache(
509 &self,
510 instrument_id: &InstrumentId,
511 bar_type: &BarType,
512 max_bars: usize,
513 ) -> Result<Arc<RwLock<BarCache>>> {
514 let key = String::from(format!("{}:{}", instrument_id.symbol, bar_type));
516
517 let caches = self.bar_caches.read();
519 if let Some(cache) = caches.get(&key) {
520 return Ok(cache.clone());
521 }
522 drop(caches); let cache = Arc::new(RwLock::new(BarCache::new()));
526 self.bar_caches.write().insert(key, cache.clone());
527
528 Ok(cache)
529 }
530
531 async fn get_stats(&self, instrument_id: &InstrumentId) -> Result<FeedStats> {
532 let key = String::from(format!("depth:{}", instrument_id.symbol));
534 if let Some(stats) = self.stats.read().get(&key) {
535 return Ok(stats.clone());
536 }
537
538 let key = String::from(format!("trades:{}", instrument_id.symbol));
540 if let Some(stats) = self.stats.read().get(&key) {
541 return Ok(stats.clone());
542 }
543
544 Ok(FeedStats::default())
546 }
547
548 async fn reset_stats(&self) -> Result<()> {
549 self.stats.write().clear();
550 Ok(())
551 }
552}