rusty_feeder/exchange/coinbase/
feeder.rs1use rusty_common::collections::FxHashMap;
2use smartstring::alias::String;
3use std::sync::Arc;
4
5use anyhow::Result;
6use async_trait::async_trait;
7use fmt::Debug;
8use parking_lot::RwLock;
9use quanta::Clock;
10use skiplist::SkipMap;
12use std::fmt;
13use tokio::sync::{mpsc, watch};
14
15use crate::feeder::{Feeder, FeederOptions};
16use rusty_model::data::bar::BarCache;
17use rusty_model::{
18 data::{
19 bar::{Bar, BarAggregation, BarType},
20 book_snapshot::OrderBookSnapshot,
21 market_trade::MarketTrade,
22 orderbook::PriceLevel,
23 simd_orderbook::SharedSimdOrderBook,
24 },
25 enums::OrderSide,
26 instruments::InstrumentId,
27 venues::Venue,
28};
29use smallvec::SmallVec;
30
31use super::data::{
32 orderbook::{Level2Update, ParsedLevel2Update},
33 trade::TradeMessage,
34};
35
36#[derive(Debug)]
38pub struct CoinbaseFeeder {
39 stop_signals: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
40 clock: Clock,
41}
42
43impl Default for CoinbaseFeeder {
44 fn default() -> Self {
45 Self::new()
46 }
47}
48
49impl CoinbaseFeeder {
50 #[must_use]
52 pub fn new() -> Self {
53 Self {
54 stop_signals: Arc::new(RwLock::new(FxHashMap::default())),
55 clock: Clock::new(),
56 }
57 }
58}
59
60#[async_trait]
61impl Feeder for CoinbaseFeeder {
62 type DepthMessage = Level2Update;
63 type TradeMessage = TradeMessage;
64
65 async fn start_feed_depth(
66 &self,
67 instrument_id: InstrumentId,
68 mut depth_rx: mpsc::Receiver<Self::DepthMessage>,
69 _options: Option<FeederOptions>,
70 ) -> Result<mpsc::Receiver<OrderBookSnapshot>> {
71 let symbol = instrument_id.symbol;
73 let (tx, rx) = mpsc::channel(1024);
74
75 let (stop_tx, _) = watch::channel(false);
77 let key = String::from(format!("depth:{symbol}"));
78 self.stop_signals.write().insert(key.clone(), stop_tx);
79 let mut stop_rx = self
80 .stop_signals
81 .read()
82 .get(key.as_str())
83 .unwrap()
84 .subscribe();
85
86 let clock = self.clock.clone();
88
89 let bids = Arc::new(RwLock::new(SkipMap::new()));
91 let asks = Arc::new(RwLock::new(SkipMap::new()));
92 let sequence = Arc::new(RwLock::new(0u64));
93
94 tokio::spawn(async move {
95 loop {
96 tokio::select! {
97 _ = stop_rx.changed() => {
99 break;
100 }
101
102 Some(update) = depth_rx.recv() => {
104 let parsed = ParsedLevel2Update::from_update(update);
105
106 {
108 let current_seq = *sequence.read();
109 if current_seq > 0 && current_seq >= parsed.time.parse::<u64>().unwrap_or(0) {
110 continue;
112 }
113
114 *sequence.write() = parsed.time.parse::<u64>().unwrap_or(0);
116 }
117
118 {
120 let mut bids_lock = bids.write();
121 for (price, size) in &parsed.bids {
122 if size.is_zero() {
123 bids_lock.remove(price);
124 } else {
125 bids_lock.insert(*price, *size);
126 }
127 }
128 }
129
130 {
131 let mut asks_lock = asks.write();
132 for (price, size) in &parsed.asks {
133 if size.is_zero() {
134 asks_lock.remove(price);
135 } else {
136 asks_lock.insert(*price, *size);
137 }
138 }
139 }
140
141 let mut bids_snapshot = SmallVec::<[PriceLevel; 64]>::new();
143 let mut asks_snapshot = SmallVec::<[PriceLevel; 64]>::new();
144 let seq: u64;
145
146 {
148 let bids_guard = bids.read();
149 let asks_guard = asks.read();
150 let sequence_guard = sequence.read();
151
152 seq = *sequence_guard;
153
154 for (price, size) in &*bids_guard {
156 bids_snapshot.push(PriceLevel::new(*price, *size));
157 }
158
159 for (price, size) in &*asks_guard {
161 asks_snapshot.push(PriceLevel::new(*price, *size));
162 }
163 } bids_snapshot.sort_unstable_by(|a, b| b.price.cmp(&a.price));
167
168 asks_snapshot.sort_unstable_by(|a, b| a.price.cmp(&b.price));
170
171 let depth = OrderBookSnapshot::new(
173 InstrumentId::new(parsed.product_id.clone(), Venue::Coinbase),
174 bids_snapshot,
175 asks_snapshot,
176 seq,
177 parsed.time.parse::<u64>().unwrap_or(0),
178 clock.raw(),
179 );
180
181 if tx.send(depth).await.is_err() {
183 break;
184 }
185 }
186 }
187 }
188 });
189
190 Ok(rx)
191 }
192
193 async fn stop_feed_depth(&self, instrument_id: &InstrumentId) -> Result<()> {
194 let key = String::from(format!("depth:{}", instrument_id.symbol));
195 if let Some(tx) = self.stop_signals.write().remove(key.as_str()) {
196 let _ = tx.send(true);
197 }
198
199 Ok(())
200 }
201
202 async fn start_feed_trades(
203 &self,
204 instrument_id: InstrumentId,
205 mut trade_rx: mpsc::Receiver<Self::TradeMessage>,
206 _options: Option<FeederOptions>,
207 ) -> Result<mpsc::Receiver<MarketTrade>> {
208 let symbol = instrument_id.symbol;
210 let (tx, rx) = mpsc::channel(1024);
211
212 let (stop_tx, _) = watch::channel(false);
214 let key = String::from(format!("trades:{symbol}"));
215 self.stop_signals.write().insert(key.clone(), stop_tx);
216 let mut stop_rx = self
217 .stop_signals
218 .read()
219 .get(key.as_str())
220 .unwrap()
221 .subscribe();
222
223 let clock = self.clock.clone();
225
226 tokio::spawn(async move {
227 loop {
228 tokio::select! {
229 _ = stop_rx.changed() => {
231 break;
232 }
233
234 Some(trade_msg) = trade_rx.recv() => {
236 let side = match trade_msg.side.as_str() {
238 "buy" => OrderSide::Buy,
239 "sell" => OrderSide::Sell,
240 _ => continue,
241 };
242
243 let exchange_time = trade_msg.time.parse::<u64>().unwrap_or(0);
245
246 let trade = MarketTrade {
247 timestamp: clock.now(),
248 exchange_time_ns: exchange_time,
249 price: trade_msg.price,
250 quantity: trade_msg.size,
251 direction: side,
252 instrument_id: InstrumentId::new(trade_msg.product_id.clone(), Venue::Coinbase),
253 };
254
255 if tx.send(trade).await.is_err() {
257 break;
258 }
259 }
260 }
261 }
262 });
263
264 Ok(rx)
265 }
266
267 async fn stop_feed_trades(&self, instrument_id: &InstrumentId) -> Result<()> {
268 let key = String::from(format!("trades:{}", instrument_id.symbol));
269 if let Some(tx) = self.stop_signals.write().remove(key.as_str()) {
270 let _ = tx.send(true);
271 }
272
273 Ok(())
274 }
275
276 async fn start_feed_bars(
277 &self,
278 instrument_id: InstrumentId,
279 bar_type: BarType,
280 mut trade_rx: mpsc::Receiver<MarketTrade>,
281 _options: Option<FeederOptions>,
282 ) -> Result<mpsc::Receiver<Bar>> {
283 let symbol = instrument_id.symbol.clone();
285
286 let spec = bar_type.get_spec();
288 let interval_sec = match spec.aggregation {
289 BarAggregation::Second => spec.step,
290 BarAggregation::Minute => spec.step * 60,
291 BarAggregation::Hour => spec.step * 3600,
292 BarAggregation::Day => spec.step * 86400,
293 _ => 60, };
295 let (tx, rx) = mpsc::channel(1024);
296
297 let (stop_tx, _) = watch::channel(false);
299 let key = String::from(format!("bars:{symbol}:{interval_sec}"));
300 self.stop_signals.write().insert(key.clone(), stop_tx);
301 let mut stop_rx = self
302 .stop_signals
303 .read()
304 .get(key.as_str())
305 .unwrap()
306 .subscribe();
307
308 let clock = self.clock.clone();
310
311 let mut open = None;
313 let mut high = None;
314 let mut low = None;
315 let mut close = None;
316 let mut volume = rust_decimal::Decimal::ZERO;
317 let mut last_bar_time = 0u64;
318
319 tokio::spawn(async move {
320 loop {
321 tokio::select! {
322 _ = stop_rx.changed() => {
324 break;
325 }
326
327 Some(trade) = trade_rx.recv() => {
329 let now = clock.raw();
330 let current_interval = now / 1_000_000_000 / interval_sec;
331
332 if last_bar_time == 0 {
334 open = Some(trade.price);
336 high = Some(trade.price);
337 low = Some(trade.price);
338 close = Some(trade.price);
339 volume = trade.quantity;
340 last_bar_time = current_interval;
341 } else if current_interval > last_bar_time {
342 if let (Some(o), Some(h), Some(l), Some(c)) = (open, high, low, close) {
344 let instrument_id = InstrumentId::new(symbol.clone(), Venue::Coinbase);
346 let bar_type = BarType::new_standard(
347 symbol.as_str().into(),
348 BarAggregation::Second,
349 interval_sec
350 );
351
352 let bar = Bar {
353 bar_type,
354 open: o,
355 high: h,
356 low: l,
357 close: c,
358 volume,
359 timestamp_ns: clock.raw(),
360 };
361
362 if tx.send(bar).await.is_err() {
364 break;
365 }
366 }
367
368 open = Some(trade.price);
370 high = Some(trade.price);
371 low = Some(trade.price);
372 close = Some(trade.price);
373 volume = trade.quantity;
374 last_bar_time = current_interval;
375 } else {
376 high = Some(std::cmp::max(high.unwrap_or(trade.price), trade.price));
378 low = Some(std::cmp::min(low.unwrap_or(trade.price), trade.price));
379 close = Some(trade.price);
380 volume += trade.quantity;
381 }
382 }
383 }
384 }
385 });
386
387 Ok(rx)
388 }
389
390 async fn stop_feed_bars(&self, instrument_id: &InstrumentId, bar_type: &BarType) -> Result<()> {
391 let symbol = &instrument_id.symbol;
392
393 let spec = bar_type.get_spec();
395 let interval_sec = match spec.aggregation {
396 BarAggregation::Second => spec.step,
397 BarAggregation::Minute => spec.step * 60,
398 BarAggregation::Hour => spec.step * 3600,
399 BarAggregation::Day => spec.step * 86400,
400 _ => 60, };
402 let key = String::from(format!("bars:{symbol}:{interval_sec}"));
403 if let Some(tx) = self.stop_signals.write().remove(key.as_str()) {
404 let _ = tx.send(true);
405 }
406
407 Ok(())
408 }
409
410 async fn get_shared_orderbook(
411 &self,
412 instrument_id: &InstrumentId,
413 ) -> Result<SharedSimdOrderBook> {
414 let order_book = rusty_model::data::orderbook::OrderBook::<64>::new(
416 instrument_id.symbol.clone(),
417 0, self.clock.raw(), SmallVec::<[PriceLevel; 64]>::new(), SmallVec::<[PriceLevel; 64]>::new(), );
422
423 let shared_book = SharedSimdOrderBook::from_orderbook(&order_book);
425
426 Ok(shared_book)
427 }
428
429 async fn get_bar_cache(
430 &self,
431 _instrument_id: &InstrumentId,
432 _bar_type: &BarType,
433 max_bars: usize,
434 ) -> Result<Arc<RwLock<BarCache>>> {
435 let bar_cache = BarCache::new();
437
438 let shared_cache = Arc::new(RwLock::new(bar_cache));
440
441 Ok(shared_cache)
442 }
443}