1use std::sync::Arc;
7
8use anyhow::{Result, anyhow};
9use async_trait::async_trait;
10use log::info;
11use parking_lot::RwLock;
12use quanta::Clock;
13use rust_decimal::Decimal;
14
15use rusty_common::collections::FxHashMap;
16use rusty_model::{
17 data::{MarketTrade, OrderBook},
18 instruments::InstrumentId,
19 venues::Venue,
20};
21
22type SharedOrderBook = OrderBook;
24
25#[cfg(feature = "feeder")]
27mod with_feeder {
28 use super::{
29 Arc, Clock, FxHashMap, InstrumentId, MarketTrade, Result, RwLock, SharedOrderBook, info,
30 };
31
32 use anyhow::Context;
33 use log::{debug, warn};
34 use rusty_common::SmartString;
35 use rusty_feeder::Provider;
36 use smallvec::SmallVec;
37
38 pub struct FeederMarketDataProvider<P: Provider> {
40 provider: P,
42
43 clock: Clock,
45
46 orderbooks: Arc<RwLock<FxHashMap<InstrumentId, SharedOrderBook>>>,
48
49 trades: Arc<RwLock<FxHashMap<InstrumentId, Vec<MarketTrade>>>>,
51
52 max_trade_history: usize,
54 }
55
56 impl<P: Provider> FeederMarketDataProvider<P> {
57 #[must_use]
59 pub fn new(provider: P) -> Self {
60 Self {
61 provider,
62 clock: Clock::new(),
63 orderbooks: Arc::new(RwLock::new(FxHashMap::default())),
64 trades: Arc::new(RwLock::new(FxHashMap::default())),
65 max_trade_history: 1000, }
67 }
68
69 pub async fn start(&self, instruments: &[InstrumentId]) -> Result<()> {
71 let venue = self.provider.venue();
73
74 let symbols: SmallVec<[SmartString; 8]> = instruments
76 .iter()
77 .filter(|instr| instr.venue == venue)
78 .map(|instr| instr.symbol.clone())
79 .collect();
80
81 if !symbols.is_empty() {
82 let mut orderbook_rx = self
84 .provider
85 .subscribe_orderbook(symbols.clone())
86 .await
87 .context("Failed to subscribe to orderbook data")?;
88
89 let _orderbooks = self.orderbooks.clone();
91 let venue_clone = venue;
92
93 let symbols_copy = symbols.to_vec();
95
96 tokio::spawn(async move {
98 info!(
99 "Starting orderbook processing for {} symbols on {}",
100 symbols_copy.len(),
101 venue_clone.to_str()
102 );
103
104 while let Some(_depth_msg) = orderbook_rx.recv().await {
105 debug!(
109 "Received orderbook update for venue: {}",
110 venue_clone.to_str()
111 );
112 }
113
114 warn!(
115 "Orderbook processing stopped for venue: {}",
116 venue_clone.to_str()
117 );
118 });
119
120 let mut trade_rx = self
122 .provider
123 .subscribe_trades(symbols.clone())
124 .await
125 .context("Failed to subscribe to trade data")?;
126
127 let _trades = self.trades.clone();
129 let venue_clone2 = venue;
130 let _max_history = self.max_trade_history;
131
132 let symbols_copy2 = symbols.to_vec();
134
135 tokio::spawn(async move {
137 info!(
138 "Starting trade processing for {} symbols on {}",
139 symbols_copy2.len(),
140 venue_clone2.to_str()
141 );
142
143 while let Some(_trade_msg) = trade_rx.recv().await {
144 debug!("Received trade update for venue: {}", venue_clone2.to_str());
148 }
149
150 warn!(
151 "Trade processing stopped for venue: {}",
152 venue_clone2.to_str()
153 );
154 });
155 }
156
157 Ok(())
158 }
159
160 pub fn get_orderbook(&self, instrument: &InstrumentId) -> Option<SharedOrderBook> {
162 self.orderbooks.read().get(instrument).cloned()
163 }
164
165 pub fn get_recent_trades(
167 &self,
168 instrument: &InstrumentId,
169 limit: Option<usize>,
170 ) -> Vec<MarketTrade> {
171 let trades_map = self.trades.read();
172 let limit = limit.unwrap_or(10).min(self.max_trade_history);
173
174 if let Some(instrument_trades) = trades_map.get(instrument) {
175 let start_idx = instrument_trades.len().saturating_sub(limit);
176 instrument_trades[start_idx..].to_vec()
177 } else {
178 Vec::new()
179 }
180 }
181 }
182}
183
184#[async_trait]
186pub trait MarketDataProvider: Send + Sync + 'static {
187 async fn init(&mut self) -> Result<()>;
189
190 async fn subscribe(&self, instruments: &[InstrumentId]) -> Result<()>;
192
193 async fn get_best_bid_ask(&self, instrument: &InstrumentId) -> Result<(Decimal, Decimal)>;
195
196 async fn get_latest_trade_price(&self, instrument: &InstrumentId) -> Result<Decimal>;
198
199 async fn get_orderbook(&self, instrument: &InstrumentId) -> Result<OrderBook>;
201
202 async fn get_recent_trades(
204 &self,
205 instrument: &InstrumentId,
206 limit: Option<usize>,
207 ) -> Result<Vec<MarketTrade>>;
208}
209
210#[cfg(feature = "feeder")]
211pub use with_feeder::FeederMarketDataProvider;
212
213pub struct StandaloneMarketDataProvider {
216 venue: Venue,
218
219 api_base_url: String,
221
222 ws_base_url: String,
224
225 api_key: Option<String>,
227
228 api_secret: Option<String>,
230
231 clock: Clock,
233
234 orderbooks: Arc<RwLock<FxHashMap<InstrumentId, OrderBook>>>,
236
237 trades: Arc<RwLock<FxHashMap<InstrumentId, Vec<MarketTrade>>>>,
239}
240
241impl StandaloneMarketDataProvider {
242 #[must_use]
244 pub fn new(
245 venue: Venue,
246 api_base_url: String,
247 ws_base_url: String,
248 api_key: Option<String>,
249 api_secret: Option<String>,
250 ) -> Self {
251 Self {
252 venue,
253 api_base_url,
254 ws_base_url,
255 api_key,
256 api_secret,
257 clock: Clock::new(),
258 orderbooks: Arc::new(RwLock::new(FxHashMap::default())),
259 trades: Arc::new(RwLock::new(FxHashMap::default())),
260 }
261 }
262}
263
264#[async_trait]
265impl MarketDataProvider for StandaloneMarketDataProvider {
266 async fn init(&mut self) -> Result<()> {
267 Ok(())
269 }
270
271 async fn subscribe(&self, instruments: &[InstrumentId]) -> Result<()> {
272 let instruments: Vec<&InstrumentId> = instruments
274 .iter()
275 .filter(|i| i.venue == self.venue)
276 .collect();
277
278 if instruments.is_empty() {
279 return Ok(());
280 }
281
282 info!(
285 "Subscribing to {} instruments on {}",
286 instruments.len(),
287 self.venue.to_str()
288 );
289
290 Ok(())
291 }
292
293 async fn get_best_bid_ask(&self, instrument: &InstrumentId) -> Result<(Decimal, Decimal)> {
294 if instrument.venue != self.venue {
295 return Err(anyhow!("Instrument venue mismatch"));
296 }
297
298 Ok((
301 Decimal::new(10000, 2), Decimal::new(10001, 2), ))
304 }
305
306 async fn get_latest_trade_price(&self, instrument: &InstrumentId) -> Result<Decimal> {
307 if instrument.venue != self.venue {
308 return Err(anyhow!("Instrument venue mismatch"));
309 }
310
311 Ok(Decimal::new(10000, 2)) }
315
316 async fn get_orderbook(&self, instrument: &InstrumentId) -> Result<OrderBook> {
317 if instrument.venue != self.venue {
318 return Err(anyhow!("Instrument venue mismatch"));
319 }
320
321 if let Some(orderbook) = self.orderbooks.read().get(instrument) {
323 return Ok(orderbook.clone());
324 }
325
326 Err(anyhow!(
328 "Orderbook not available for instrument: {}",
329 instrument.symbol
330 ))
331 }
332
333 async fn get_recent_trades(
334 &self,
335 instrument: &InstrumentId,
336 limit: Option<usize>,
337 ) -> Result<Vec<MarketTrade>> {
338 if instrument.venue != self.venue {
339 return Err(anyhow!("Instrument venue mismatch"));
340 }
341
342 let limit = limit.unwrap_or(10);
343
344 if let Some(trades) = self.trades.read().get(instrument) {
346 let start_idx = trades.len().saturating_sub(limit);
347 return Ok(trades[start_idx..].to_vec());
348 }
349
350 Err(anyhow!(
352 "Recent trades not available for instrument: {}",
353 instrument.symbol
354 ))
355 }
356}