rusty_feeder/provider/
v2.rs

1//! Provider V2 - Next generation provider architecture
2//!
3//! This module defines the improved provider architecture that aligns with
4//! the consolidated WebSocket implementation using MessageHandler pattern.
5//!
6//! Key improvements over V1:
7//! - Separation of concerns: REST APIs vs real-time streaming
8//! - Event-driven architecture using callbacks instead of channels
9//! - Integration with WebSocketClient/MessageHandler pattern
10//! - Cleaner trait boundaries and reduced coupling
11//! - Better error handling and resource management
12
13use anyhow::Result;
14use async_trait::async_trait;
15use reqwest;
16use rusty_common::websocket::{ExchangeHandler, WebSocketClient, WebSocketConfig};
17use rusty_model::{
18    data::{book_snapshot::OrderBookSnapshot, market_trade::MarketTrade},
19    instruments::Instrument,
20    venues::Venue,
21};
22use smallvec::SmallVec;
23use smartstring::alias::String;
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use super::config::ConnectionConfig;
28use super::connection::{ConnectionState, ConnectionStats};
29use super::rate_limit::RateLimit;
30
31/// Data event callbacks for real-time streaming
32#[async_trait]
33pub trait DataEventHandler: Send + Sync {
34    /// Called when a new trade is received
35    async fn on_trade(&self, trade: MarketTrade) -> Result<()>;
36
37    /// Called when orderbook is updated
38    async fn on_orderbook_update(&self, snapshot: OrderBookSnapshot) -> Result<()>;
39
40    /// Called when an instrument is updated
41    async fn on_instrument_update(&self, instrument: Box<dyn Instrument>) -> Result<()>;
42
43    /// Called when connection state changes
44    async fn on_connection_state_changed(&self, state: ConnectionState) -> Result<()>;
45
46    /// Called when an error occurs
47    async fn on_error(&self, error: String) -> Result<()>;
48}
49
50/// REST API provider for exchange metadata and historical data
51/// This trait focuses purely on HTTP-based operations
52#[async_trait]
53pub trait RestProvider: Send + Sync + Debug {
54    /// Returns the exchange name as a static string
55    fn name(&self) -> &'static str;
56
57    /// Returns the venue enum value for this provider
58    fn venue(&self) -> Venue;
59
60    /// Returns reference to the connection configuration
61    fn config(&self) -> &ConnectionConfig;
62
63    /// Returns reference to the HTTP client
64    fn http_client(&self) -> &reqwest::Client;
65
66    /// Initialize the provider with any necessary setup steps
67    async fn init(&mut self) -> Result<()>;
68
69    /// Get all available trading instruments from the exchange
70    async fn get_instruments(&self) -> Result<Vec<Box<dyn Instrument>>>;
71
72    /// Get historical trades for a symbol
73    async fn get_historical_trades(
74        &self,
75        symbol: &str,
76        limit: Option<u32>,
77    ) -> Result<Vec<MarketTrade>>;
78
79    /// Get current orderbook snapshot for a symbol
80    async fn get_orderbook_snapshot(
81        &self,
82        symbol: &str,
83        depth: Option<u32>,
84    ) -> Result<OrderBookSnapshot>;
85
86    /// Get the exchange-specific rate limits
87    fn get_rate_limits(&self) -> Vec<RateLimit>;
88
89    /// Check provider health via REST API
90    async fn health_check(&self) -> Result<bool>;
91}
92
93/// Real-time streaming provider using WebSocket
94/// This trait handles live market data streaming
95#[async_trait]
96pub trait StreamProvider: Send + Sync + Debug {
97    /// Exchange-specific handler type
98    type Handler: ExchangeHandler + Send + Sync + 'static;
99
100    /// Returns the exchange name as a static string
101    fn name(&self) -> &'static str;
102
103    /// Returns the venue enum value for this provider
104    fn venue(&self) -> Venue;
105
106    /// Get WebSocket configuration for this exchange
107    fn websocket_config(&self) -> WebSocketConfig;
108
109    /// Create a message handler for this exchange
110    fn create_handler(&self, event_handler: Arc<dyn DataEventHandler>) -> Self::Handler;
111
112    /// Subscribe to trade data for multiple symbols
113    async fn subscribe_trades(
114        &self,
115        client: &WebSocketClient,
116        symbols: SmallVec<[String; 8]>,
117    ) -> Result<()>;
118
119    /// Subscribe to orderbook data for multiple symbols
120    async fn subscribe_orderbook(
121        &self,
122        client: &WebSocketClient,
123        symbols: SmallVec<[String; 8]>,
124        depth: Option<u32>,
125    ) -> Result<()>;
126
127    /// Subscribe to instrument updates
128    async fn subscribe_instruments(&self, client: &WebSocketClient) -> Result<()>;
129
130    /// Unsubscribe from trade data
131    async fn unsubscribe_trades(
132        &self,
133        client: &WebSocketClient,
134        symbols: SmallVec<[String; 8]>,
135    ) -> Result<()>;
136
137    /// Unsubscribe from orderbook data
138    async fn unsubscribe_orderbook(
139        &self,
140        client: &WebSocketClient,
141        symbols: SmallVec<[String; 8]>,
142    ) -> Result<()>;
143
144    /// Get connection statistics
145    async fn get_stats(&self) -> ConnectionStats;
146}
147
148/// Combined provider trait for exchanges that support both REST and streaming
149#[async_trait]
150pub trait HybridProvider: RestProvider + StreamProvider {
151    /// Create a unified connection that supports both REST and streaming
152    async fn create_unified_connection(
153        &self,
154        event_handler: Arc<dyn DataEventHandler>,
155    ) -> Result<HybridConnection<Self::Handler>>;
156}
157
158/// Unified connection managing both REST and WebSocket
159pub struct HybridConnection<H: ExchangeHandler> {
160    /// WebSocket client for real-time data
161    pub ws_client: WebSocketClient,
162
163    /// Message handler for WebSocket
164    pub handler: H,
165
166    /// Connection state
167    connection_state: Arc<std::sync::RwLock<ConnectionState>>,
168
169    /// Connection statistics
170    stats: Arc<std::sync::RwLock<ConnectionStats>>,
171}
172
173impl<H: ExchangeHandler> HybridConnection<H> {
174    /// Create a new hybrid connection
175    pub fn new(ws_client: WebSocketClient, handler: H) -> Self {
176        Self {
177            ws_client,
178            handler,
179            connection_state: Arc::new(std::sync::RwLock::new(ConnectionState::Disconnected)),
180            stats: Arc::new(std::sync::RwLock::new(ConnectionStats::default())),
181        }
182    }
183
184    /// Start the connection (connect WebSocket and begin message processing)
185    pub async fn start(&mut self) -> Result<()> {
186        // Implementation would start WebSocket connection with handler
187        // This is a placeholder for the actual implementation
188        *self.connection_state.write().unwrap() = ConnectionState::Connected;
189        Ok(())
190    }
191
192    /// Stop the connection and clean up resources
193    pub async fn stop(&mut self) -> Result<()> {
194        *self.connection_state.write().unwrap() = ConnectionState::Disconnected;
195        Ok(())
196    }
197
198    /// Get current connection state
199    pub fn connection_state(&self) -> ConnectionState {
200        *self.connection_state.read().unwrap()
201    }
202
203    /// Get connection statistics
204    pub fn stats(&self) -> ConnectionStats {
205        self.stats.read().unwrap().clone()
206    }
207}
208
209/// Factory trait for creating provider instances
210pub trait ProviderFactory {
211    /// Provider type this factory creates
212    type Provider: HybridProvider;
213
214    /// Create a new provider instance with default configuration
215    fn create_default() -> Self::Provider;
216
217    /// Create a new provider instance with custom configuration
218    fn create_with_config(config: ConnectionConfig) -> Self::Provider;
219}
220
221/// Registry for managing multiple providers
222pub struct ProviderRegistry {
223    providers: rusty_common::collections::FxHashMap<Venue, Box<dyn std::any::Any + Send + Sync>>,
224}
225
226impl ProviderRegistry {
227    /// Create a new provider registry
228    pub fn new() -> Self {
229        Self {
230            providers: rusty_common::collections::FxHashMap::default(),
231        }
232    }
233
234    /// Register a provider for a venue (simplified for now due to associated type constraints)
235    pub fn register<T: 'static + Send + Sync>(&mut self, venue: Venue, provider: T) {
236        self.providers.insert(venue, Box::new(provider));
237    }
238
239    /// Get a provider for a venue (returns Any due to associated type constraints)
240    pub fn get(&self, venue: &Venue) -> Option<&(dyn std::any::Any + Send + Sync)> {
241        self.providers.get(venue).map(|p| p.as_ref())
242    }
243
244    /// Get a mutable provider for a venue (returns Any due to associated type constraints)
245    pub fn get_mut(&mut self, venue: &Venue) -> Option<&mut (dyn std::any::Any + Send + Sync)> {
246        self.providers.get_mut(venue).map(|p| p.as_mut())
247    }
248
249    /// List all registered venues
250    pub fn venues(&self) -> Vec<Venue> {
251        self.providers.keys().copied().collect()
252    }
253}
254
255impl Default for ProviderRegistry {
256    fn default() -> Self {
257        Self::new()
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264    use std::sync::atomic::{AtomicBool, Ordering};
265
266    struct MockEventHandler {
267        trade_received: AtomicBool,
268        orderbook_received: AtomicBool,
269    }
270
271    impl MockEventHandler {
272        fn new() -> Self {
273            Self {
274                trade_received: AtomicBool::new(false),
275                orderbook_received: AtomicBool::new(false),
276            }
277        }
278    }
279
280    #[async_trait]
281    impl DataEventHandler for MockEventHandler {
282        async fn on_trade(&self, _trade: MarketTrade) -> Result<()> {
283            self.trade_received.store(true, Ordering::SeqCst);
284            Ok(())
285        }
286
287        async fn on_orderbook_update(&self, _snapshot: OrderBookSnapshot) -> Result<()> {
288            self.orderbook_received.store(true, Ordering::SeqCst);
289            Ok(())
290        }
291
292        async fn on_instrument_update(&self, _instrument: Box<dyn Instrument>) -> Result<()> {
293            Ok(())
294        }
295
296        async fn on_connection_state_changed(&self, _state: ConnectionState) -> Result<()> {
297            Ok(())
298        }
299
300        async fn on_error(&self, _error: String) -> Result<()> {
301            Ok(())
302        }
303    }
304
305    #[test]
306    fn test_provider_registry() {
307        let registry = ProviderRegistry::new();
308        assert_eq!(registry.venues().len(), 0);
309
310        // We can't easily test with actual providers here without more complex mocking
311        // But the structure validates that the API design is sound
312    }
313
314    #[test]
315    fn test_event_handler_creation() {
316        let handler = MockEventHandler::new();
317        assert!(!handler.trade_received.load(Ordering::SeqCst));
318        assert!(!handler.orderbook_received.load(Ordering::SeqCst));
319    }
320}