rusty_ems/
feeder_integration.rs

1//! Integration with rusty-feeder for market data
2//!
3//! This module provides utilities to integrate the Execution Management System (EMS)
4//! with the rusty-feeder crate for efficient market data access.
5
6use std::sync::Arc;
7
8use anyhow::{Result, anyhow};
9use async_trait::async_trait;
10use log::info;
11use parking_lot::RwLock;
12use quanta::Clock;
13use rust_decimal::Decimal;
14
15use rusty_common::collections::FxHashMap;
16use rusty_model::{
17    data::{MarketTrade, OrderBook},
18    instruments::InstrumentId,
19    venues::Venue,
20};
21
22/// Type alias for a shared orderbook
23type SharedOrderBook = OrderBook;
24
25// Optional dependency on rusty-feeder
26#[cfg(feature = "feeder")]
27mod with_feeder {
28    use super::{
29        Arc, Clock, FxHashMap, InstrumentId, MarketTrade, Result, RwLock, SharedOrderBook, info,
30    };
31
32    use anyhow::Context;
33    use log::{debug, warn};
34    use rusty_common::SmartString;
35    use rusty_feeder::Provider;
36    use smallvec::SmallVec;
37
38    /// A market data provider that uses rusty-feeder for connectivity
39    pub struct FeederMarketDataProvider<P: Provider> {
40        /// The underlying provider from rusty-feeder
41        provider: P,
42
43        /// Shared clock instance
44        clock: Clock,
45
46        /// Map of instrument IDs to orderbooks for quick access
47        orderbooks: Arc<RwLock<FxHashMap<InstrumentId, SharedOrderBook>>>,
48
49        /// Recent trades for each instrument
50        trades: Arc<RwLock<FxHashMap<InstrumentId, Vec<MarketTrade>>>>,
51
52        /// Maximum trade history to keep per instrument
53        max_trade_history: usize,
54    }
55
56    impl<P: Provider> FeederMarketDataProvider<P> {
57        /// Create a new market data provider using the given rusty-feeder provider
58        #[must_use]
59        pub fn new(provider: P) -> Self {
60            Self {
61                provider,
62                clock: Clock::new(),
63                orderbooks: Arc::new(RwLock::new(FxHashMap::default())),
64                trades: Arc::new(RwLock::new(FxHashMap::default())),
65                max_trade_history: 1000, // Store up to 1000 recent trades per instrument
66            }
67        }
68
69        /// Start receiving market data for the specified instruments
70        pub async fn start(&self, instruments: &[InstrumentId]) -> Result<()> {
71            // Only subscribe to the venue of the current provider
72            let venue = self.provider.venue();
73
74            // Collect symbols for this provider's venue
75            let symbols: SmallVec<[SmartString; 8]> = instruments
76                .iter()
77                .filter(|instr| instr.venue == venue)
78                .map(|instr| instr.symbol.clone())
79                .collect();
80
81            if !symbols.is_empty() {
82                // Subscribe to orderbook data
83                let mut orderbook_rx = self
84                    .provider
85                    .subscribe_orderbook(symbols.clone())
86                    .await
87                    .context("Failed to subscribe to orderbook data")?;
88
89                // Clone shared data for the task
90                let _orderbooks = self.orderbooks.clone();
91                let venue_clone = venue;
92
93                // Make a copy of symbols to move into the task
94                let symbols_copy = symbols.to_vec();
95
96                // Start processing orderbook updates
97                tokio::spawn(async move {
98                    info!(
99                        "Starting orderbook processing for {} symbols on {}",
100                        symbols_copy.len(),
101                        venue_clone.to_str()
102                    );
103
104                    while let Some(_depth_msg) = orderbook_rx.recv().await {
105                        // Process the depth message (specifics depend on the provider's message type)
106                        // In a real implementation, we'd extract the symbol, bids, and asks from the message
107                        // and update the relevant orderbook
108                        debug!(
109                            "Received orderbook update for venue: {}",
110                            venue_clone.to_str()
111                        );
112                    }
113
114                    warn!(
115                        "Orderbook processing stopped for venue: {}",
116                        venue_clone.to_str()
117                    );
118                });
119
120                // Subscribe to trade data
121                let mut trade_rx = self
122                    .provider
123                    .subscribe_trades(symbols.clone())
124                    .await
125                    .context("Failed to subscribe to trade data")?;
126
127                // Clone shared data for the task
128                let _trades = self.trades.clone();
129                let venue_clone2 = venue;
130                let _max_history = self.max_trade_history;
131
132                // Make another copy of symbols for the trade task
133                let symbols_copy2 = symbols.to_vec();
134
135                // Start processing trade updates
136                tokio::spawn(async move {
137                    info!(
138                        "Starting trade processing for {} symbols on {}",
139                        symbols_copy2.len(),
140                        venue_clone2.to_str()
141                    );
142
143                    while let Some(_trade_msg) = trade_rx.recv().await {
144                        // Process the trade message (specifics depend on the provider's message type)
145                        // In a real implementation, we'd extract the symbol and trade details
146                        // and update the trade history
147                        debug!("Received trade update for venue: {}", venue_clone2.to_str());
148                    }
149
150                    warn!(
151                        "Trade processing stopped for venue: {}",
152                        venue_clone2.to_str()
153                    );
154                });
155            }
156
157            Ok(())
158        }
159
160        /// Get the latest orderbook for an instrument
161        pub fn get_orderbook(&self, instrument: &InstrumentId) -> Option<SharedOrderBook> {
162            self.orderbooks.read().get(instrument).cloned()
163        }
164
165        /// Get the recent trades for an instrument
166        pub fn get_recent_trades(
167            &self,
168            instrument: &InstrumentId,
169            limit: Option<usize>,
170        ) -> Vec<MarketTrade> {
171            let trades_map = self.trades.read();
172            let limit = limit.unwrap_or(10).min(self.max_trade_history);
173
174            if let Some(instrument_trades) = trades_map.get(instrument) {
175                let start_idx = instrument_trades.len().saturating_sub(limit);
176                instrument_trades[start_idx..].to_vec()
177            } else {
178                Vec::new()
179            }
180        }
181    }
182}
183
184/// Interface for market data providers
185#[async_trait]
186pub trait MarketDataProvider: Send + Sync + 'static {
187    /// Initialize the provider
188    async fn init(&mut self) -> Result<()>;
189
190    /// Subscribe to market data for the specified instruments
191    async fn subscribe(&self, instruments: &[InstrumentId]) -> Result<()>;
192
193    /// Get the best bid and ask for an instrument
194    async fn get_best_bid_ask(&self, instrument: &InstrumentId) -> Result<(Decimal, Decimal)>;
195
196    /// Get the latest trade price for an instrument
197    async fn get_latest_trade_price(&self, instrument: &InstrumentId) -> Result<Decimal>;
198
199    /// Get the entire orderbook for an instrument
200    async fn get_orderbook(&self, instrument: &InstrumentId) -> Result<OrderBook>;
201
202    /// Get recent trades for an instrument
203    async fn get_recent_trades(
204        &self,
205        instrument: &InstrumentId,
206        limit: Option<usize>,
207    ) -> Result<Vec<MarketTrade>>;
208}
209
210#[cfg(feature = "feeder")]
211pub use with_feeder::FeederMarketDataProvider;
212
213/// A standalone market data provider that uses the EMS's own connectivity
214/// This is used when the feeder feature is not enabled
215pub struct StandaloneMarketDataProvider {
216    /// Venue this provider connects to
217    venue: Venue,
218
219    /// REST API base URL
220    api_base_url: String,
221
222    /// WebSocket base URL
223    ws_base_url: String,
224
225    /// API key for authentication
226    api_key: Option<String>,
227
228    /// API secret for authentication
229    api_secret: Option<String>,
230
231    /// Shared clock instance
232    clock: Clock,
233
234    /// Map of instrument IDs to orderbooks for quick access
235    orderbooks: Arc<RwLock<FxHashMap<InstrumentId, OrderBook>>>,
236
237    /// Recent trades for each instrument
238    trades: Arc<RwLock<FxHashMap<InstrumentId, Vec<MarketTrade>>>>,
239}
240
241impl StandaloneMarketDataProvider {
242    /// Create a new standalone market data provider
243    #[must_use]
244    pub fn new(
245        venue: Venue,
246        api_base_url: String,
247        ws_base_url: String,
248        api_key: Option<String>,
249        api_secret: Option<String>,
250    ) -> Self {
251        Self {
252            venue,
253            api_base_url,
254            ws_base_url,
255            api_key,
256            api_secret,
257            clock: Clock::new(),
258            orderbooks: Arc::new(RwLock::new(FxHashMap::default())),
259            trades: Arc::new(RwLock::new(FxHashMap::default())),
260        }
261    }
262}
263
264#[async_trait]
265impl MarketDataProvider for StandaloneMarketDataProvider {
266    async fn init(&mut self) -> Result<()> {
267        // In a real implementation, we'd initialize API clients here
268        Ok(())
269    }
270
271    async fn subscribe(&self, instruments: &[InstrumentId]) -> Result<()> {
272        // Filter instruments by venue
273        let instruments: Vec<&InstrumentId> = instruments
274            .iter()
275            .filter(|i| i.venue == self.venue)
276            .collect();
277
278        if instruments.is_empty() {
279            return Ok(());
280        }
281
282        // In a real implementation, we'd establish WebSocket connections
283        // and subscribe to market data
284        info!(
285            "Subscribing to {} instruments on {}",
286            instruments.len(),
287            self.venue.to_str()
288        );
289
290        Ok(())
291    }
292
293    async fn get_best_bid_ask(&self, instrument: &InstrumentId) -> Result<(Decimal, Decimal)> {
294        if instrument.venue != self.venue {
295            return Err(anyhow!("Instrument venue mismatch"));
296        }
297
298        // In a real implementation, we'd return data from our cached orderbook
299        // For now, return dummy values
300        Ok((
301            Decimal::new(10000, 2), // 100.00
302            Decimal::new(10001, 2), // 100.01
303        ))
304    }
305
306    async fn get_latest_trade_price(&self, instrument: &InstrumentId) -> Result<Decimal> {
307        if instrument.venue != self.venue {
308            return Err(anyhow!("Instrument venue mismatch"));
309        }
310
311        // In a real implementation, we'd return data from our cached trades
312        // For now, return a dummy value
313        Ok(Decimal::new(10000, 2)) // 100.00
314    }
315
316    async fn get_orderbook(&self, instrument: &InstrumentId) -> Result<OrderBook> {
317        if instrument.venue != self.venue {
318            return Err(anyhow!("Instrument venue mismatch"));
319        }
320
321        // Check if we have a cached orderbook
322        if let Some(orderbook) = self.orderbooks.read().get(instrument) {
323            return Ok(orderbook.clone());
324        }
325
326        // In a real implementation, we'd fetch the orderbook via REST API if not cached
327        Err(anyhow!(
328            "Orderbook not available for instrument: {}",
329            instrument.symbol
330        ))
331    }
332
333    async fn get_recent_trades(
334        &self,
335        instrument: &InstrumentId,
336        limit: Option<usize>,
337    ) -> Result<Vec<MarketTrade>> {
338        if instrument.venue != self.venue {
339            return Err(anyhow!("Instrument venue mismatch"));
340        }
341
342        let limit = limit.unwrap_or(10);
343
344        // Check if we have cached trades
345        if let Some(trades) = self.trades.read().get(instrument) {
346            let start_idx = trades.len().saturating_sub(limit);
347            return Ok(trades[start_idx..].to_vec());
348        }
349
350        // In a real implementation, we'd fetch trades via REST API if not cached
351        Err(anyhow!(
352            "Recent trades not available for instrument: {}",
353            instrument.symbol
354        ))
355    }
356}