rusty_feeder/provider/
v2.rs1use anyhow::Result;
14use async_trait::async_trait;
15use reqwest;
16use rusty_common::websocket::{ExchangeHandler, WebSocketClient, WebSocketConfig};
17use rusty_model::{
18 data::{book_snapshot::OrderBookSnapshot, market_trade::MarketTrade},
19 instruments::Instrument,
20 venues::Venue,
21};
22use smallvec::SmallVec;
23use smartstring::alias::String;
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use super::config::ConnectionConfig;
28use super::connection::{ConnectionState, ConnectionStats};
29use super::rate_limit::RateLimit;
30
31#[async_trait]
33pub trait DataEventHandler: Send + Sync {
34 async fn on_trade(&self, trade: MarketTrade) -> Result<()>;
36
37 async fn on_orderbook_update(&self, snapshot: OrderBookSnapshot) -> Result<()>;
39
40 async fn on_instrument_update(&self, instrument: Box<dyn Instrument>) -> Result<()>;
42
43 async fn on_connection_state_changed(&self, state: ConnectionState) -> Result<()>;
45
46 async fn on_error(&self, error: String) -> Result<()>;
48}
49
50#[async_trait]
53pub trait RestProvider: Send + Sync + Debug {
54 fn name(&self) -> &'static str;
56
57 fn venue(&self) -> Venue;
59
60 fn config(&self) -> &ConnectionConfig;
62
63 fn http_client(&self) -> &reqwest::Client;
65
66 async fn init(&mut self) -> Result<()>;
68
69 async fn get_instruments(&self) -> Result<Vec<Box<dyn Instrument>>>;
71
72 async fn get_historical_trades(
74 &self,
75 symbol: &str,
76 limit: Option<u32>,
77 ) -> Result<Vec<MarketTrade>>;
78
79 async fn get_orderbook_snapshot(
81 &self,
82 symbol: &str,
83 depth: Option<u32>,
84 ) -> Result<OrderBookSnapshot>;
85
86 fn get_rate_limits(&self) -> Vec<RateLimit>;
88
89 async fn health_check(&self) -> Result<bool>;
91}
92
93#[async_trait]
96pub trait StreamProvider: Send + Sync + Debug {
97 type Handler: ExchangeHandler + Send + Sync + 'static;
99
100 fn name(&self) -> &'static str;
102
103 fn venue(&self) -> Venue;
105
106 fn websocket_config(&self) -> WebSocketConfig;
108
109 fn create_handler(&self, event_handler: Arc<dyn DataEventHandler>) -> Self::Handler;
111
112 async fn subscribe_trades(
114 &self,
115 client: &WebSocketClient,
116 symbols: SmallVec<[String; 8]>,
117 ) -> Result<()>;
118
119 async fn subscribe_orderbook(
121 &self,
122 client: &WebSocketClient,
123 symbols: SmallVec<[String; 8]>,
124 depth: Option<u32>,
125 ) -> Result<()>;
126
127 async fn subscribe_instruments(&self, client: &WebSocketClient) -> Result<()>;
129
130 async fn unsubscribe_trades(
132 &self,
133 client: &WebSocketClient,
134 symbols: SmallVec<[String; 8]>,
135 ) -> Result<()>;
136
137 async fn unsubscribe_orderbook(
139 &self,
140 client: &WebSocketClient,
141 symbols: SmallVec<[String; 8]>,
142 ) -> Result<()>;
143
144 async fn get_stats(&self) -> ConnectionStats;
146}
147
148#[async_trait]
150pub trait HybridProvider: RestProvider + StreamProvider {
151 async fn create_unified_connection(
153 &self,
154 event_handler: Arc<dyn DataEventHandler>,
155 ) -> Result<HybridConnection<Self::Handler>>;
156}
157
158pub struct HybridConnection<H: ExchangeHandler> {
160 pub ws_client: WebSocketClient,
162
163 pub handler: H,
165
166 connection_state: Arc<std::sync::RwLock<ConnectionState>>,
168
169 stats: Arc<std::sync::RwLock<ConnectionStats>>,
171}
172
173impl<H: ExchangeHandler> HybridConnection<H> {
174 pub fn new(ws_client: WebSocketClient, handler: H) -> Self {
176 Self {
177 ws_client,
178 handler,
179 connection_state: Arc::new(std::sync::RwLock::new(ConnectionState::Disconnected)),
180 stats: Arc::new(std::sync::RwLock::new(ConnectionStats::default())),
181 }
182 }
183
184 pub async fn start(&mut self) -> Result<()> {
186 *self.connection_state.write().unwrap() = ConnectionState::Connected;
189 Ok(())
190 }
191
192 pub async fn stop(&mut self) -> Result<()> {
194 *self.connection_state.write().unwrap() = ConnectionState::Disconnected;
195 Ok(())
196 }
197
198 pub fn connection_state(&self) -> ConnectionState {
200 *self.connection_state.read().unwrap()
201 }
202
203 pub fn stats(&self) -> ConnectionStats {
205 self.stats.read().unwrap().clone()
206 }
207}
208
209pub trait ProviderFactory {
211 type Provider: HybridProvider;
213
214 fn create_default() -> Self::Provider;
216
217 fn create_with_config(config: ConnectionConfig) -> Self::Provider;
219}
220
221pub struct ProviderRegistry {
223 providers: rusty_common::collections::FxHashMap<Venue, Box<dyn std::any::Any + Send + Sync>>,
224}
225
226impl ProviderRegistry {
227 pub fn new() -> Self {
229 Self {
230 providers: rusty_common::collections::FxHashMap::default(),
231 }
232 }
233
234 pub fn register<T: 'static + Send + Sync>(&mut self, venue: Venue, provider: T) {
236 self.providers.insert(venue, Box::new(provider));
237 }
238
239 pub fn get(&self, venue: &Venue) -> Option<&(dyn std::any::Any + Send + Sync)> {
241 self.providers.get(venue).map(|p| p.as_ref())
242 }
243
244 pub fn get_mut(&mut self, venue: &Venue) -> Option<&mut (dyn std::any::Any + Send + Sync)> {
246 self.providers.get_mut(venue).map(|p| p.as_mut())
247 }
248
249 pub fn venues(&self) -> Vec<Venue> {
251 self.providers.keys().copied().collect()
252 }
253}
254
255impl Default for ProviderRegistry {
256 fn default() -> Self {
257 Self::new()
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264 use std::sync::atomic::{AtomicBool, Ordering};
265
266 struct MockEventHandler {
267 trade_received: AtomicBool,
268 orderbook_received: AtomicBool,
269 }
270
271 impl MockEventHandler {
272 fn new() -> Self {
273 Self {
274 trade_received: AtomicBool::new(false),
275 orderbook_received: AtomicBool::new(false),
276 }
277 }
278 }
279
280 #[async_trait]
281 impl DataEventHandler for MockEventHandler {
282 async fn on_trade(&self, _trade: MarketTrade) -> Result<()> {
283 self.trade_received.store(true, Ordering::SeqCst);
284 Ok(())
285 }
286
287 async fn on_orderbook_update(&self, _snapshot: OrderBookSnapshot) -> Result<()> {
288 self.orderbook_received.store(true, Ordering::SeqCst);
289 Ok(())
290 }
291
292 async fn on_instrument_update(&self, _instrument: Box<dyn Instrument>) -> Result<()> {
293 Ok(())
294 }
295
296 async fn on_connection_state_changed(&self, _state: ConnectionState) -> Result<()> {
297 Ok(())
298 }
299
300 async fn on_error(&self, _error: String) -> Result<()> {
301 Ok(())
302 }
303 }
304
305 #[test]
306 fn test_provider_registry() {
307 let registry = ProviderRegistry::new();
308 assert_eq!(registry.venues().len(), 0);
309
310 }
313
314 #[test]
315 fn test_event_handler_creation() {
316 let handler = MockEventHandler::new();
317 assert!(!handler.trade_received.load(Ordering::SeqCst));
318 assert!(!handler.orderbook_received.load(Ordering::SeqCst));
319 }
320}