rusty_feeder/exchange/coinbase/
feeder.rs

1use rusty_common::collections::FxHashMap;
2use smartstring::alias::String;
3use std::sync::Arc;
4
5use anyhow::Result;
6use async_trait::async_trait;
7use fmt::Debug;
8use parking_lot::RwLock;
9use quanta::Clock;
10// use rust_decimal::Decimal;
11use skiplist::SkipMap;
12use std::fmt;
13use tokio::sync::{mpsc, watch};
14
15use crate::feeder::{Feeder, FeederOptions};
16use rusty_model::data::bar::BarCache;
17use rusty_model::{
18    data::{
19        bar::{Bar, BarAggregation, BarType},
20        book_snapshot::OrderBookSnapshot,
21        market_trade::MarketTrade,
22        orderbook::PriceLevel,
23        simd_orderbook::SharedSimdOrderBook,
24    },
25    enums::OrderSide,
26    instruments::InstrumentId,
27    venues::Venue,
28};
29use smallvec::SmallVec;
30
31use super::data::{
32    orderbook::{Level2Update, ParsedLevel2Update},
33    trade::TradeMessage,
34};
35
36/// Coinbase market data feeder
37#[derive(Debug)]
38pub struct CoinbaseFeeder {
39    stop_signals: Arc<RwLock<FxHashMap<String, watch::Sender<bool>>>>,
40    clock: Clock,
41}
42
43impl Default for CoinbaseFeeder {
44    fn default() -> Self {
45        Self::new()
46    }
47}
48
49impl CoinbaseFeeder {
50    /// Create a new Coinbase feeder
51    #[must_use]
52    pub fn new() -> Self {
53        Self {
54            stop_signals: Arc::new(RwLock::new(FxHashMap::default())),
55            clock: Clock::new(),
56        }
57    }
58}
59
60#[async_trait]
61impl Feeder for CoinbaseFeeder {
62    type DepthMessage = Level2Update;
63    type TradeMessage = TradeMessage;
64
65    async fn start_feed_depth(
66        &self,
67        instrument_id: InstrumentId,
68        mut depth_rx: mpsc::Receiver<Self::DepthMessage>,
69        _options: Option<FeederOptions>,
70    ) -> Result<mpsc::Receiver<OrderBookSnapshot>> {
71        // Extract symbol from instrument_id
72        let symbol = instrument_id.symbol;
73        let (tx, rx) = mpsc::channel(1024);
74
75        // Create stop signal
76        let (stop_tx, _) = watch::channel(false);
77        let key = String::from(format!("depth:{symbol}"));
78        self.stop_signals.write().insert(key.clone(), stop_tx);
79        let mut stop_rx = self
80            .stop_signals
81            .read()
82            .get(key.as_str())
83            .unwrap()
84            .subscribe();
85
86        // Clone clock for use in the task
87        let clock = self.clock.clone();
88
89        // Create order book state
90        let bids = Arc::new(RwLock::new(SkipMap::new()));
91        let asks = Arc::new(RwLock::new(SkipMap::new()));
92        let sequence = Arc::new(RwLock::new(0u64));
93
94        tokio::spawn(async move {
95            loop {
96                tokio::select! {
97                    // Check for stop signal
98                    _ = stop_rx.changed() => {
99                        break;
100                    }
101
102                    // Process incoming depth updates
103                    Some(update) = depth_rx.recv() => {
104                        let parsed = ParsedLevel2Update::from_update(update);
105
106                        // First check if this is a valid sequence
107                        {
108                            let current_seq = *sequence.read();
109                            if current_seq > 0 && current_seq >= parsed.time.parse::<u64>().unwrap_or(0) {
110                                // Out of order update, ignore
111                                continue;
112                            }
113
114                            // Update sequence
115                            *sequence.write() = parsed.time.parse::<u64>().unwrap_or(0);
116                        }
117
118                        // Update the order book
119                        {
120                            let mut bids_lock = bids.write();
121                            for (price, size) in &parsed.bids {
122                                if size.is_zero() {
123                                    bids_lock.remove(price);
124                                } else {
125                                    bids_lock.insert(*price, *size);
126                                }
127                            }
128                        }
129
130                        {
131                            let mut asks_lock = asks.write();
132                            for (price, size) in &parsed.asks {
133                                if size.is_zero() {
134                                    asks_lock.remove(price);
135                                } else {
136                                    asks_lock.insert(*price, *size);
137                                }
138                            }
139                        }
140
141                        // Create a snapshot of the current order book by converting SkipMap to SmallVec
142                        let mut bids_snapshot = SmallVec::<[PriceLevel; 64]>::new();
143                        let mut asks_snapshot = SmallVec::<[PriceLevel; 64]>::new();
144                        let seq: u64;
145
146                        // Use a block to limit the lifetime of the guards
147                        {
148                            let bids_guard = bids.read();
149                            let asks_guard = asks.read();
150                            let sequence_guard = sequence.read();
151
152                            seq = *sequence_guard;
153
154                            // Convert SkipMap to SmallVec for bids (sorted by price descending)
155                            for (price, size) in &*bids_guard {
156                                bids_snapshot.push(PriceLevel::new(*price, *size));
157                            }
158
159                            // Convert SkipMap to SmallVec for asks (sorted by price ascending)
160                            for (price, size) in &*asks_guard {
161                                asks_snapshot.push(PriceLevel::new(*price, *size));
162                            }
163                        } // RwLockReadGuards are dropped here
164
165                        // Sort bids in descending order
166                        bids_snapshot.sort_unstable_by(|a, b| b.price.cmp(&a.price));
167
168                        // Sort asks in ascending order
169                        asks_snapshot.sort_unstable_by(|a, b| a.price.cmp(&b.price));
170
171                        // Create OrderBookSnapshot from the current state
172                        let depth = OrderBookSnapshot::new(
173                            InstrumentId::new(parsed.product_id.clone(), Venue::Coinbase),
174                            bids_snapshot,
175                            asks_snapshot,
176                            seq,
177                            parsed.time.parse::<u64>().unwrap_or(0),
178                            clock.raw(),
179                        );
180
181                        // Send the order book snapshot - guards are dropped before await
182                        if tx.send(depth).await.is_err() {
183                            break;
184                        }
185                    }
186                }
187            }
188        });
189
190        Ok(rx)
191    }
192
193    async fn stop_feed_depth(&self, instrument_id: &InstrumentId) -> Result<()> {
194        let key = String::from(format!("depth:{}", instrument_id.symbol));
195        if let Some(tx) = self.stop_signals.write().remove(key.as_str()) {
196            let _ = tx.send(true);
197        }
198
199        Ok(())
200    }
201
202    async fn start_feed_trades(
203        &self,
204        instrument_id: InstrumentId,
205        mut trade_rx: mpsc::Receiver<Self::TradeMessage>,
206        _options: Option<FeederOptions>,
207    ) -> Result<mpsc::Receiver<MarketTrade>> {
208        // Extract symbol from instrument_id
209        let symbol = instrument_id.symbol;
210        let (tx, rx) = mpsc::channel(1024);
211
212        // Create stop signal
213        let (stop_tx, _) = watch::channel(false);
214        let key = String::from(format!("trades:{symbol}"));
215        self.stop_signals.write().insert(key.clone(), stop_tx);
216        let mut stop_rx = self
217            .stop_signals
218            .read()
219            .get(key.as_str())
220            .unwrap()
221            .subscribe();
222
223        // Clone clock for use in the task
224        let clock = self.clock.clone();
225
226        tokio::spawn(async move {
227            loop {
228                tokio::select! {
229                    // Check for stop signal
230                    _ = stop_rx.changed() => {
231                        break;
232                    }
233
234                    // Process incoming trade messages
235                    Some(trade_msg) = trade_rx.recv() => {
236                        // Convert to Trade type
237                        let side = match trade_msg.side.as_str() {
238                            "buy" => OrderSide::Buy,
239                            "sell" => OrderSide::Sell,
240                            _ => continue,
241                        };
242
243                        // Convert exchange time to u64
244                        let exchange_time = trade_msg.time.parse::<u64>().unwrap_or(0);
245
246                        let trade = MarketTrade {
247                            timestamp: clock.now(),
248                            exchange_time_ns: exchange_time,
249                            price: trade_msg.price,
250                            quantity: trade_msg.size,
251                            direction: side,
252                            instrument_id: InstrumentId::new(trade_msg.product_id.clone(), Venue::Coinbase),
253                        };
254
255                        // Send the trade
256                        if tx.send(trade).await.is_err() {
257                            break;
258                        }
259                    }
260                }
261            }
262        });
263
264        Ok(rx)
265    }
266
267    async fn stop_feed_trades(&self, instrument_id: &InstrumentId) -> Result<()> {
268        let key = String::from(format!("trades:{}", instrument_id.symbol));
269        if let Some(tx) = self.stop_signals.write().remove(key.as_str()) {
270            let _ = tx.send(true);
271        }
272
273        Ok(())
274    }
275
276    async fn start_feed_bars(
277        &self,
278        instrument_id: InstrumentId,
279        bar_type: BarType,
280        mut trade_rx: mpsc::Receiver<MarketTrade>,
281        _options: Option<FeederOptions>,
282    ) -> Result<mpsc::Receiver<Bar>> {
283        // Extract symbol from instrument_id and interval from bar_type
284        let symbol = instrument_id.symbol.clone();
285
286        // Extract step and aggregation from bar_type
287        let spec = bar_type.get_spec();
288        let interval_sec = match spec.aggregation {
289            BarAggregation::Second => spec.step,
290            BarAggregation::Minute => spec.step * 60,
291            BarAggregation::Hour => spec.step * 3600,
292            BarAggregation::Day => spec.step * 86400,
293            _ => 60, // Default to 1 minute if not a time-based bar
294        };
295        let (tx, rx) = mpsc::channel(1024);
296
297        // Create stop signal
298        let (stop_tx, _) = watch::channel(false);
299        let key = String::from(format!("bars:{symbol}:{interval_sec}"));
300        self.stop_signals.write().insert(key.clone(), stop_tx);
301        let mut stop_rx = self
302            .stop_signals
303            .read()
304            .get(key.as_str())
305            .unwrap()
306            .subscribe();
307
308        // Clone clock for use in the task
309        let clock = self.clock.clone();
310
311        // Bar aggregation state
312        let mut open = None;
313        let mut high = None;
314        let mut low = None;
315        let mut close = None;
316        let mut volume = rust_decimal::Decimal::ZERO;
317        let mut last_bar_time = 0u64;
318
319        tokio::spawn(async move {
320            loop {
321                tokio::select! {
322                    // Check for stop signal
323                    _ = stop_rx.changed() => {
324                        break;
325                    }
326
327                    // Process incoming trades
328                    Some(trade) = trade_rx.recv() => {
329                        let now = clock.raw();
330                        let current_interval = now / 1_000_000_000 / interval_sec;
331
332                        // Check if we need to start a new bar
333                        if last_bar_time == 0 {
334                            // First trade
335                            open = Some(trade.price);
336                            high = Some(trade.price);
337                            low = Some(trade.price);
338                            close = Some(trade.price);
339                            volume = trade.quantity;
340                            last_bar_time = current_interval;
341                        } else if current_interval > last_bar_time {
342                            // New bar interval - send the current bar and start a new one
343                            if let (Some(o), Some(h), Some(l), Some(c)) = (open, high, low, close) {
344                                // Create a bar type from interval seconds using BarType::new_standard
345                                let instrument_id = InstrumentId::new(symbol.clone(), Venue::Coinbase);
346                                let bar_type = BarType::new_standard(
347                                    symbol.as_str().into(),
348                                    BarAggregation::Second,
349                                    interval_sec
350                                );
351
352                                let bar = Bar {
353                                    bar_type,
354                                    open: o,
355                                    high: h,
356                                    low: l,
357                                    close: c,
358                                    volume,
359                                    timestamp_ns: clock.raw(),
360                                };
361
362                                // Send the bar
363                                if tx.send(bar).await.is_err() {
364                                    break;
365                                }
366                            }
367
368                            // Start a new bar
369                            open = Some(trade.price);
370                            high = Some(trade.price);
371                            low = Some(trade.price);
372                            close = Some(trade.price);
373                            volume = trade.quantity;
374                            last_bar_time = current_interval;
375                        } else {
376                            // Update the current bar
377                            high = Some(std::cmp::max(high.unwrap_or(trade.price), trade.price));
378                            low = Some(std::cmp::min(low.unwrap_or(trade.price), trade.price));
379                            close = Some(trade.price);
380                            volume += trade.quantity;
381                        }
382                    }
383                }
384            }
385        });
386
387        Ok(rx)
388    }
389
390    async fn stop_feed_bars(&self, instrument_id: &InstrumentId, bar_type: &BarType) -> Result<()> {
391        let symbol = &instrument_id.symbol;
392
393        // Extract step and aggregation from bar_type
394        let spec = bar_type.get_spec();
395        let interval_sec = match spec.aggregation {
396            BarAggregation::Second => spec.step,
397            BarAggregation::Minute => spec.step * 60,
398            BarAggregation::Hour => spec.step * 3600,
399            BarAggregation::Day => spec.step * 86400,
400            _ => 60, // Default to 1 minute if not a time-based bar
401        };
402        let key = String::from(format!("bars:{symbol}:{interval_sec}"));
403        if let Some(tx) = self.stop_signals.write().remove(key.as_str()) {
404            let _ = tx.send(true);
405        }
406
407        Ok(())
408    }
409
410    async fn get_shared_orderbook(
411        &self,
412        instrument_id: &InstrumentId,
413    ) -> Result<SharedSimdOrderBook> {
414        // Create a new order book
415        let order_book = rusty_model::data::orderbook::OrderBook::<64>::new(
416            instrument_id.symbol.clone(),
417            0,                // exchange_timestamp_ns - will be updated on first message
418            self.clock.raw(), // system_timestamp_ns
419            SmallVec::<[PriceLevel; 64]>::new(), // empty bids
420            SmallVec::<[PriceLevel; 64]>::new(), // empty asks
421        );
422
423        // Create the shared order book
424        let shared_book = SharedSimdOrderBook::from_orderbook(&order_book);
425
426        Ok(shared_book)
427    }
428
429    async fn get_bar_cache(
430        &self,
431        _instrument_id: &InstrumentId,
432        _bar_type: &BarType,
433        max_bars: usize,
434    ) -> Result<Arc<RwLock<BarCache>>> {
435        // Create a new bar cache with specified capacity
436        let bar_cache = BarCache::new();
437
438        // Wrap it in Arc<RwLock<>>
439        let shared_cache = Arc::new(RwLock::new(bar_cache));
440
441        Ok(shared_cache)
442    }
443}