rusty_feeder/provider/
core.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use reqwest;
4use rusty_model::{
5    data::{
6        book_snapshot::OrderBookSnapshot, market_trade::MarketTrade,
7        simd_orderbook::SharedSimdOrderBook,
8    },
9    instruments::Instrument,
10    venues::Venue,
11};
12use smallvec::SmallVec;
13use smartstring::alias::String;
14use std::fmt::Debug;
15use std::time::Duration;
16use tokio::sync::mpsc;
17
18use super::config::ConnectionConfig;
19use super::connection::{ConnectionState, ConnectionStats};
20use super::rate_limit::RateLimit;
21use super::timestamp::{self, TimestampFormat};
22
23/// HTTP client provider trait for standardized REST API access
24/// Implementations should have an HTTP client and provide methods for creating one
25pub trait HttpClientProvider {
26    /// Returns a reference to the HTTP client
27    fn http_client(&self) -> &reqwest::Client;
28
29    /// Create a new HTTP client with appropriate settings from config
30    fn create_http_client(config: &ConnectionConfig) -> reqwest::Client {
31        // Create default headers
32        let mut headers = reqwest::header::HeaderMap::new();
33        headers.insert(
34            reqwest::header::USER_AGENT,
35            reqwest::header::HeaderValue::from_str(&config.rest_config.user_agent)
36                .unwrap_or_else(|_| reqwest::header::HeaderValue::from_static("RustyHFT/1.0")),
37        );
38
39        // Build client with performance-optimized settings
40        reqwest::Client::builder()
41            .timeout(Duration::from_millis(
42                config.rest_config.timeout_milliseconds,
43            ))
44            .connect_timeout(Duration::from_millis(
45                config.rest_config.timeout_milliseconds / 2,
46            ))
47            .pool_max_idle_per_host(config.rest_config.connection_pool_size)
48            .pool_idle_timeout(Duration::from_millis(
49                config.rest_config.keep_alive_milliseconds,
50            ))
51            .default_headers(headers)
52            // .http2(config.rest_config.use_http2) // This method is no longer available in recent reqwest versions
53            .build()
54            .unwrap_or_default()
55    }
56}
57
58/// High-performance provider interface for exchange connectivity
59/// Optimized for zero allocations and predictable low latency
60/// All implementations must adhere to the project's HFT performance guidelines
61#[async_trait]
62pub trait Provider: Send + Sync + 'static + Debug {
63    /// Exchange-specific raw message type for trades from WebSocket
64    type TradeMessage: Send + 'static;
65
66    /// Exchange-specific raw message type for orderbook updates from WebSocket
67    type DepthMessage: Send + 'static;
68
69    /// Exchange-specific raw message type for instrument info
70    type InstrumentMessage: Send + 'static;
71
72    /// Returns the exchange name as a static String
73    fn name(&self) -> &'static str;
74
75    /// Returns the venue enum value for this provider
76    fn venue(&self) -> Venue;
77
78    /// Returns reference to the connection configuration
79    fn config(&self) -> &ConnectionConfig;
80
81    /// Initialize the provider with any necessary setup steps
82    /// This method should be called before using other methods
83    async fn init(&mut self) -> Result<()>;
84
85    /// Subscribe to trade data stream for multiple symbols
86    /// Returns a channel receiver for trade messages
87    /// Uses SmallVec to avoid heap allocations for small numbers of symbols
88    async fn subscribe_trades(
89        &self,
90        symbols: SmallVec<[String; 8]>,
91    ) -> Result<mpsc::Receiver<Self::TradeMessage>>;
92
93    /// Unsubscribe from trade data
94    async fn unsubscribe_trades(&self) -> Result<()>;
95
96    /// Subscribe to orderbook depth data for multiple symbols
97    /// Returns a channel receiver for depth messages
98    /// Uses SmallVec to avoid heap allocations for small numbers of symbols
99    async fn subscribe_orderbook(
100        &self,
101        symbols: SmallVec<[String; 8]>,
102    ) -> Result<mpsc::Receiver<Self::DepthMessage>>;
103
104    /// Unsubscribe from orderbook depth data
105    async fn unsubscribe_orderbook(&self) -> Result<()>;
106
107    /// Get realtime shared orderbook for a symbol
108    /// Returns a thread-safe shared orderbook that's kept up-to-date
109    async fn get_realtime_orderbook(&self, symbol: &str) -> Result<SharedSimdOrderBook>;
110
111    /// Get all available trading instruments from the exchange
112    async fn get_instruments(&self) -> Result<Vec<Box<dyn Instrument>>>;
113
114    /// Get historical trades for a symbol
115    /// Optionally limit the number of trades returned
116    async fn get_historical_trades(
117        &self,
118        symbol: &str,
119        limit: Option<u32>,
120    ) -> Result<Vec<MarketTrade>>;
121
122    /// Get current orderbook snapshot for a symbol
123    /// Optionally specify the depth of the orderbook
124    async fn get_orderbook_snapshot(
125        &self,
126        symbol: &str,
127        depth: Option<u32>,
128    ) -> Result<OrderBookSnapshot>;
129
130    /// Start providing depth data
131    /// Returns a receiver for depth messages
132    /// Default implementation returns error - exchanges should override
133    async fn start_provide_depth(&self) -> Result<mpsc::Receiver<Self::DepthMessage>> {
134        Err(anyhow::anyhow!("Not implemented"))
135    }
136
137    /// Stop providing depth data
138    /// Default implementation does nothing
139    async fn stop_provide_depth(&self) -> Result<()> {
140        Ok(())
141    }
142
143    /// Start providing trade data
144    /// Returns a receiver for trade messages
145    /// Default implementation returns error - exchanges should override
146    async fn start_provide_trade(&self) -> Result<mpsc::Receiver<Self::TradeMessage>> {
147        Err(anyhow::anyhow!("Not implemented"))
148    }
149
150    /// Stop providing trade data
151    /// Default implementation does nothing
152    async fn stop_provide_trade(&self) -> Result<()> {
153        Ok(())
154    }
155
156    /// Check if provider is connected to exchange
157    /// Default implementation returns false - exchanges should override
158    async fn is_connected(&self) -> bool {
159        false
160    }
161
162    /// Get detailed connection status
163    /// Default implementation returns Disconnected - exchanges should override
164    async fn connection_status(&self) -> ConnectionState {
165        ConnectionState::Disconnected
166    }
167
168    /// Get connection statistics for monitoring
169    /// Default implementation returns empty stats - exchanges should override
170    async fn get_stats(&self) -> ConnectionStats {
171        ConnectionStats::default()
172    }
173
174    /// Ping the exchange to keep the connection alive
175    /// Returns round-trip time in nanoseconds
176    /// Default implementation returns 0 - exchanges should override
177    async fn ping(&self) -> Result<u64> {
178        Ok(0)
179    }
180
181    /// Reset connection (force reconnect)
182    /// Default implementation does nothing - exchanges should override
183    async fn reset_connection(&self) -> Result<()> {
184        Ok(())
185    }
186
187    /// Get the exchange-specific rate limits
188    /// Default implementation returns empty vector - exchanges should override
189    fn get_rate_limits(&self) -> Vec<RateLimit> {
190        Vec::new()
191    }
192
193    /// Check if provider is operational
194    /// Returns true if the provider is functioning correctly
195    async fn health_check(&self) -> Result<bool> {
196        Ok(self.is_connected().await)
197    }
198
199    /// Close all connections and clean up resources
200    async fn shutdown(&mut self) -> Result<()>;
201
202    /// High-performance timestamp conversion method
203    /// Uses cached timestamp conversions when possible and provides optimized conversion
204    /// when timestamps are a known format.
205    ///
206    /// This method should be used in all implementations when converting exchange timestamps
207    /// to ensure consistent high-performance timestamp handling across exchanges.
208    #[inline]
209    fn convert_exchange_timestamp(&self, exchange_timestamp: u64) -> u64 {
210        // Fast path: use format-specific conversion based on exchange's default format
211        match self.config().timestamp_format {
212            TimestampFormat::Milliseconds => {
213                // Try to use cache first for frequently seen timestamps
214                let mut cache = self.config().timestamp_cache.write();
215                cache.get_or_convert_ms(exchange_timestamp)
216            }
217            TimestampFormat::Seconds => timestamp::seconds_to_nanos(exchange_timestamp),
218            TimestampFormat::Microseconds => timestamp::us_to_nanos(exchange_timestamp),
219            TimestampFormat::Nanoseconds => exchange_timestamp,
220            _ => {
221                // For other formats, use the generic conversion function
222                timestamp::exchange_time_to_nanos(
223                    exchange_timestamp,
224                    self.config().timestamp_format,
225                )
226            }
227        }
228    }
229
230    /// Convert ISO8601 timestamp String to nanoseconds with caching for frequent values
231    /// This method should be used for exchanges that provide timestamps as ISO8601 strings
232    /// such as Coinbase.
233    #[inline]
234    fn convert_iso8601_timestamp(&self, iso_timestamp: &str) -> Option<u64> {
235        let mut cache = self.config().timestamp_cache.write();
236        cache.get_or_convert_iso8601(iso_timestamp)
237    }
238
239    /// Get current time in nanoseconds from the provider's clock
240    /// This method uses the shared clock to ensure consistent timekeeping
241    /// across all parts of the exchange provider.
242    #[inline]
243    fn current_time_nanos(&self) -> u64 {
244        self.config().clock.raw()
245    }
246}