rusty_feeder/exchange/upbit/
feeder.rs1use rusty_common::collections::FxHashMap;
7use smartstring::alias::String;
8use std::sync::Arc;
9
10use anyhow::{Result, anyhow};
11use async_trait::async_trait;
12use parking_lot::RwLock;
13use quanta::Clock;
14use rust_decimal::Decimal;
15use rusty_model::{
16 data::{
17 bar::{Bar, BarAggregation, BarCache, BarType},
18 book_snapshot::OrderBookSnapshot,
19 market_trade::MarketTrade,
20 orderbook::PriceLevel,
21 simd_orderbook::{SharedSimdOrderBook, SimdOrderBook},
22 },
23 enums::OrderSide,
24 instruments::InstrumentId,
25};
26use smallvec::SmallVec;
27use tokio::sync::{mpsc, watch};
28
29use super::data::{
30 orderbook::{MAX_ORDERBOOK_LEVELS, OrderbookMessage, ParsedOrderbookData},
31 trade::{TradeMessage, Transaction},
32};
33use crate::feeder::{FeedStats, Feeder, FeederOptions};
34
35#[derive(Debug)]
37pub struct UpbitFeeder {
38 stop_signals: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
40
41 clock: Clock,
43
44 stats: Arc<RwLock<FxHashMap<String, FeedStats>>>,
46
47 orderbooks: Arc<RwLock<FxHashMap<String, SharedSimdOrderBook>>>,
49
50 bar_caches: Arc<RwLock<FxHashMap<String, Arc<RwLock<BarCache>>>>>,
52}
53
54impl UpbitFeeder {
55 #[inline]
57 #[must_use]
58 pub fn new() -> Self {
59 Self {
60 stop_signals: Arc::new(RwLock::new(FxHashMap::default())),
61 clock: Clock::new(),
62 stats: Arc::new(RwLock::new(FxHashMap::default())),
63 orderbooks: Arc::new(RwLock::new(FxHashMap::default())),
64 bar_caches: Arc::new(RwLock::new(FxHashMap::default())),
65 }
66 }
67
68 fn create_stop_signal_subscription(&self, key: String) -> Result<watch::Receiver<bool>> {
71 let (stop_tx, _) = watch::channel(false);
72 let stop_rx = {
73 let mut stop_signals = self.stop_signals.write();
74 stop_signals.insert(key.clone(), stop_tx);
75 stop_signals
76 .get(&key)
77 .ok_or_else(|| anyhow!("Failed to get stop signal for key: {}", key))?
78 .subscribe()
79 };
80 Ok(stop_rx)
81 }
82}
83
84impl Default for UpbitFeeder {
85 fn default() -> Self {
86 Self::new()
87 }
88}
89
90#[async_trait]
91impl Feeder for UpbitFeeder {
92 type DepthMessage = OrderbookMessage;
93 type TradeMessage = TradeMessage;
94
95 async fn start_feed_depth(
96 &self,
97 instrument_id: InstrumentId,
98 mut depth_rx: mpsc::Receiver<Self::DepthMessage>,
99 _options: Option<FeederOptions>,
100 ) -> Result<mpsc::Receiver<OrderBookSnapshot>> {
101 let symbol = instrument_id.symbol.clone();
103 let (tx, rx) = mpsc::channel(1024);
104
105 let ob = rusty_model::data::orderbook::OrderBook::<64>::new(
107 symbol.clone(),
108 0, self.clock.raw(), SmallVec::<[PriceLevel; 64]>::new(), SmallVec::<[PriceLevel; 64]>::new(), );
113 let shared_orderbook = SharedSimdOrderBook::from_orderbook(&ob);
114 self.orderbooks
115 .write()
116 .insert(symbol.clone(), shared_orderbook.clone());
117
118 let key = String::from(format!("depth:{symbol}"));
120 let mut stop_rx = self.create_stop_signal_subscription(key)?;
121
122 let clock = self.clock.clone();
124
125 let stats = self.stats.clone();
127 let stats_key = String::from(format!("depth:{symbol}"));
128
129 stats.write().entry(stats_key.clone()).or_default();
131
132 tokio::spawn(async move {
133 loop {
134 tokio::select! {
135 _ = stop_rx.changed() => {
137 break;
138 }
139
140 Some(update) = depth_rx.recv() => {
142 if update.code != symbol {
144 continue;
145 }
146
147 let process_start = clock.raw();
149
150 let parsed_data = ParsedOrderbookData::from_message(&update);
152
153 let bids_vec: SmallVec<[PriceLevel; 64]> = parsed_data.bids
156 .iter()
157 .map(|&(price, size)| PriceLevel::new(price, size))
158 .collect();
159
160 let asks_vec: SmallVec<[PriceLevel; 64]> = parsed_data.asks
161 .iter()
162 .map(|&(price, size)| PriceLevel::new(price, size))
163 .collect();
164
165 let mut bids_arr = [PriceLevel::new(Decimal::ZERO, Decimal::ZERO); MAX_ORDERBOOK_LEVELS];
167 let mut asks_arr = [PriceLevel::new(Decimal::ZERO, Decimal::ZERO); MAX_ORDERBOOK_LEVELS];
168 for (i, lvl) in bids_vec.iter().enumerate().take(MAX_ORDERBOOK_LEVELS) {
169 bids_arr[i] = *lvl;
170 }
171 for (i, lvl) in asks_vec.iter().enumerate().take(MAX_ORDERBOOK_LEVELS) {
172 asks_arr[i] = *lvl;
173 }
174 let bids_levels: SmallVec<[PriceLevel; 64]> = parsed_data.bids
175 .iter()
176 .map(|&(price, quantity)| PriceLevel::new(price, quantity))
177 .collect();
178 let asks_levels: SmallVec<[PriceLevel; 64]> = parsed_data.asks
179 .iter()
180 .map(|&(price, quantity)| PriceLevel::new(price, quantity))
181 .collect();
182
183 let model_orderbook = rusty_model::data::orderbook::OrderBook::<64>::new(
184 instrument_id.symbol.as_str(),
185 parsed_data.timestamp_ns,
186 clock.raw(),
187 bids_levels,
188 asks_levels,
189 );
190 shared_orderbook.write(|ob| {
191 *ob = SimdOrderBook::from_orderbook(&model_orderbook);
192 });
193
194 let mut depth = OrderBookSnapshot::new_empty(
196 instrument_id.clone(),
197 process_start,
198 parsed_data.sequence,
199 );
200 for &(price, qty) in &parsed_data.bids {
201 depth.add_bid(price, qty);
202 }
203 for &(price, qty) in &parsed_data.asks {
204 depth.add_ask(price, qty);
205 }
206
207 let process_end = clock.raw();
209 let latency_ns = process_end.saturating_sub(process_start);
210
211 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
213 feed_stats.messages_processed += 1;
214 feed_stats.avg_process_latency_ns =
215 (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; feed_stats.max_process_latency_ns =
217 feed_stats.max_process_latency_ns.max(latency_ns);
218 feed_stats.last_update_time = process_end;
219 }
220
221 if tx.send(depth).await.is_err() {
223 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
225 feed_stats.dropped_messages += 1;
226 }
227 break;
228 }
229 }
230 }
231 }
232 });
233
234 Ok(rx)
235 }
236
237 async fn stop_feed_depth(&self, instrument_id: &InstrumentId) -> Result<()> {
238 let symbol = &instrument_id.symbol;
240 let key = String::from(format!("depth:{symbol}"));
241
242 self.orderbooks.write().remove(symbol);
244
245 if let Some(tx) = self.stop_signals.write().remove(&key) {
247 let _ = tx.send(true);
248 }
249
250 Ok(())
251 }
252
253 async fn start_feed_trades(
254 &self,
255 instrument_id: InstrumentId,
256 mut trade_rx: mpsc::Receiver<Self::TradeMessage>,
257 _options: Option<FeederOptions>,
258 ) -> Result<mpsc::Receiver<MarketTrade>> {
259 let symbol = instrument_id.symbol.clone();
261 let (tx, rx) = mpsc::channel(1024);
262
263 let key = String::from(format!("trades:{symbol}"));
265 let mut stop_rx = self.create_stop_signal_subscription(key)?;
266
267 let clock = self.clock.clone();
269
270 let stats = self.stats.clone();
272 let stats_key = String::from(format!("trades:{symbol}"));
273
274 stats.write().entry(stats_key.clone()).or_default();
276
277 tokio::spawn(async move {
278 loop {
279 tokio::select! {
280 _ = stop_rx.changed() => {
282 break;
283 }
284
285 Some(trade_msg) = trade_rx.recv() => {
287 if trade_msg.code != symbol {
289 continue;
290 }
291
292 let process_start = clock.raw();
294
295 let transaction = Transaction::from_trade_message(&trade_msg, process_start);
297
298 let trade = MarketTrade {
300 timestamp: clock.now(),
301 exchange_time_ns: transaction.trade_timestamp_ns,
302 price: transaction.price,
303 quantity: transaction.volume,
304 direction: transaction.side,
305 instrument_id: instrument_id.clone(),
306 };
307
308 let process_end = clock.raw();
310 let latency_ns = process_end.saturating_sub(process_start);
311
312 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
314 feed_stats.messages_processed += 1;
315 feed_stats.avg_process_latency_ns =
316 (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; feed_stats.max_process_latency_ns =
318 feed_stats.max_process_latency_ns.max(latency_ns);
319 feed_stats.last_update_time = process_end;
320 }
321
322 if tx.send(trade).await.is_err() {
324 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
326 feed_stats.dropped_messages += 1;
327 }
328 break;
329 }
330 }
331 }
332 }
333 });
334
335 Ok(rx)
336 }
337
338 async fn stop_feed_trades(&self, instrument_id: &InstrumentId) -> Result<()> {
339 let symbol = &instrument_id.symbol;
341 let key = String::from(format!("trades:{symbol}"));
342 if let Some(tx) = self.stop_signals.write().remove(&key) {
343 let _ = tx.send(true);
344 }
345
346 Ok(())
347 }
348
349 async fn start_feed_bars(
350 &self,
351 instrument_id: InstrumentId,
352 bar_type: BarType,
353 mut trade_rx: mpsc::Receiver<MarketTrade>,
354 _options: Option<FeederOptions>,
355 ) -> Result<mpsc::Receiver<Bar>> {
356 let symbol = instrument_id.symbol.clone();
358 let interval_sec = match bar_type.get_spec().aggregation {
359 BarAggregation::Second => bar_type.get_spec().step,
360 BarAggregation::Minute => bar_type.get_spec().step * 60,
361 BarAggregation::Hour => bar_type.get_spec().step * 3600,
362 BarAggregation::Day => bar_type.get_spec().step * 86400,
363 _ => return Err(anyhow!("Only time-based bars are supported")),
364 };
365
366 let cache_key = String::from(format!("{symbol}:{bar_type}"));
368
369 let bar_cache = {
371 let mut caches = self.bar_caches.write();
372 if !caches.contains_key(&cache_key) {
373 caches.insert(cache_key.clone(), Arc::new(RwLock::new(BarCache::new())));
374 }
375 caches
376 .get(&cache_key)
377 .ok_or_else(|| anyhow!("Cache not found for key: {}", cache_key))?
378 .clone()
379 };
380
381 let (tx, rx) = mpsc::channel(1024);
382
383 let key = String::from(format!("bars:{symbol}:{interval_sec}"));
385 let mut stop_rx = self.create_stop_signal_subscription(key)?;
386
387 let clock = self.clock.clone();
389
390 let stats = self.stats.clone();
392 let stats_key = String::from(format!("bars:{symbol}:{interval_sec}"));
393
394 stats.write().entry(stats_key.clone()).or_default();
396
397 let mut open = None;
399 let mut high = None;
400 let mut low = None;
401 let mut close = None;
402 let mut volume = Decimal::ZERO;
403 let mut buy_volume = Decimal::ZERO;
404 let mut sell_volume = Decimal::ZERO;
405 let mut trade_count = 0;
406 let mut last_bar_time = 0u64;
407
408 tokio::spawn(async move {
409 loop {
410 tokio::select! {
411 _ = stop_rx.changed() => {
413 break;
414 }
415
416 Some(trade) = trade_rx.recv() => {
418 let process_start = clock.raw();
420
421 let now = clock.raw();
422 let current_interval = now / 1_000_000_000 / interval_sec;
423
424 if last_bar_time == 0 {
426 open = Some(trade.price);
428 high = Some(trade.price);
429 low = Some(trade.price);
430 close = Some(trade.price);
431 volume = trade.quantity;
432
433 match trade.direction {
435 OrderSide::Buy => buy_volume = trade.quantity,
436 OrderSide::Sell => sell_volume = trade.quantity,
437 }
438
439 trade_count = 1;
440 last_bar_time = current_interval;
441 } else if current_interval > last_bar_time {
442 if let (Some(o), Some(h), Some(l), Some(c)) = (open, high, low, close) {
444 let bar = Bar {
445 bar_type: bar_type.clone(),
446 open: o,
447 high: h,
448 low: l,
449 close: c,
450 volume,
451 timestamp_ns: clock.raw(),
452 };
453
454 bar_cache.write().add_bar(bar.clone());
456
457 if tx.send(bar).await.is_err() {
459 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
461 feed_stats.dropped_messages += 1;
462 }
463 break;
464 }
465 }
466
467 open = Some(trade.price);
469 high = Some(trade.price);
470 low = Some(trade.price);
471 close = Some(trade.price);
472 volume = trade.quantity;
473
474 buy_volume = Decimal::ZERO;
476 sell_volume = Decimal::ZERO;
477
478 match trade.direction {
480 OrderSide::Buy => buy_volume = trade.quantity,
481 OrderSide::Sell => sell_volume = trade.quantity,
482 }
483
484 trade_count = 1;
485 last_bar_time = current_interval;
486 } else {
487 high = Some(std::cmp::max(high.unwrap_or(trade.price), trade.price));
489 low = Some(std::cmp::min(low.unwrap_or(trade.price), trade.price));
490 close = Some(trade.price);
491 volume += trade.quantity;
492
493 match trade.direction {
495 OrderSide::Buy => buy_volume += trade.quantity,
496 OrderSide::Sell => sell_volume += trade.quantity,
497 }
498
499 trade_count += 1;
500 }
501
502 let process_end = clock.raw();
504 let latency_ns = process_end.saturating_sub(process_start);
505
506 if let Some(feed_stats) = stats.write().get_mut(&stats_key) {
508 feed_stats.messages_processed += 1;
509 feed_stats.avg_process_latency_ns =
510 (feed_stats.avg_process_latency_ns * 9 + latency_ns) / 10; feed_stats.max_process_latency_ns =
512 feed_stats.max_process_latency_ns.max(latency_ns);
513 feed_stats.last_update_time = process_end;
514 }
515 }
516 }
517 }
518 });
519
520 Ok(rx)
521 }
522
523 async fn stop_feed_bars(&self, instrument_id: &InstrumentId, bar_type: &BarType) -> Result<()> {
524 let symbol = &instrument_id.symbol;
526 let interval_sec = match bar_type.get_spec().aggregation {
527 BarAggregation::Second => bar_type.get_spec().step,
528 BarAggregation::Minute => bar_type.get_spec().step * 60,
529 BarAggregation::Hour => bar_type.get_spec().step * 3600,
530 BarAggregation::Day => bar_type.get_spec().step * 86400,
531 _ => return Err(anyhow!("Only time-based bars are supported")),
532 };
533
534 let key = String::from(format!("bars:{symbol}:{interval_sec}"));
535 if let Some(tx) = self.stop_signals.write().remove(&key) {
536 let _ = tx.send(true);
537 }
538
539 Ok(())
540 }
541
542 async fn get_shared_orderbook(
543 &self,
544 instrument_id: &InstrumentId,
545 ) -> Result<SharedSimdOrderBook> {
546 let symbol = &instrument_id.symbol;
548
549 let orderbooks = self.orderbooks.read();
551 if let Some(orderbook) = orderbooks.get(symbol) {
552 Ok(orderbook.clone())
553 } else {
554 let ob =
556 rusty_model::data::orderbook::OrderBook::<64>::new_empty(instrument_id.clone());
557 let shared_orderbook = SharedSimdOrderBook::from_orderbook(&ob);
558 drop(orderbooks); self.orderbooks
562 .write()
563 .insert(symbol.clone(), shared_orderbook.clone());
564
565 Ok(shared_orderbook)
566 }
567 }
568
569 async fn get_bar_cache(
570 &self,
571 instrument_id: &InstrumentId,
572 bar_type: &BarType,
573 max_bars: usize,
574 ) -> Result<Arc<RwLock<BarCache>>> {
575 let key = String::from(format!("{}:{}", instrument_id.symbol, bar_type));
577
578 let caches = self.bar_caches.read();
580 if let Some(cache) = caches.get(&key) {
581 return Ok(cache.clone());
582 }
583 drop(caches); let cache = Arc::new(RwLock::new(BarCache::new()));
587 self.bar_caches.write().insert(key, cache.clone());
588
589 Ok(cache)
590 }
591
592 async fn get_stats(&self, instrument_id: &InstrumentId) -> Result<FeedStats> {
593 let key = String::from(format!("depth:{}", instrument_id.symbol));
595 if let Some(stats) = self.stats.read().get(&key) {
596 return Ok(stats.clone());
597 }
598
599 let key = String::from(format!("trades:{}", instrument_id.symbol));
601 if let Some(stats) = self.stats.read().get(&key) {
602 return Ok(stats.clone());
603 }
604
605 Ok(FeedStats::default())
607 }
608
609 async fn reset_stats(&self) -> Result<()> {
610 self.stats.write().clear();
611 Ok(())
612 }
613}