rusty_feeder/provider/core.rs
1use anyhow::Result;
2use async_trait::async_trait;
3use reqwest;
4use rusty_model::{
5 data::{
6 book_snapshot::OrderBookSnapshot, market_trade::MarketTrade,
7 simd_orderbook::SharedSimdOrderBook,
8 },
9 instruments::Instrument,
10 venues::Venue,
11};
12use smallvec::SmallVec;
13use smartstring::alias::String;
14use std::fmt::Debug;
15use std::time::Duration;
16use tokio::sync::mpsc;
17
18use super::config::ConnectionConfig;
19use super::connection::{ConnectionState, ConnectionStats};
20use super::rate_limit::RateLimit;
21use super::timestamp::{self, TimestampFormat};
22
23/// HTTP client provider trait for standardized REST API access
24/// Implementations should have an HTTP client and provide methods for creating one
25pub trait HttpClientProvider {
26 /// Returns a reference to the HTTP client
27 fn http_client(&self) -> &reqwest::Client;
28
29 /// Create a new HTTP client with appropriate settings from config
30 fn create_http_client(config: &ConnectionConfig) -> reqwest::Client {
31 // Create default headers
32 let mut headers = reqwest::header::HeaderMap::new();
33 headers.insert(
34 reqwest::header::USER_AGENT,
35 reqwest::header::HeaderValue::from_str(&config.rest_config.user_agent)
36 .unwrap_or_else(|_| reqwest::header::HeaderValue::from_static("RustyHFT/1.0")),
37 );
38
39 // Build client with performance-optimized settings
40 reqwest::Client::builder()
41 .timeout(Duration::from_millis(
42 config.rest_config.timeout_milliseconds,
43 ))
44 .connect_timeout(Duration::from_millis(
45 config.rest_config.timeout_milliseconds / 2,
46 ))
47 .pool_max_idle_per_host(config.rest_config.connection_pool_size)
48 .pool_idle_timeout(Duration::from_millis(
49 config.rest_config.keep_alive_milliseconds,
50 ))
51 .default_headers(headers)
52 // .http2(config.rest_config.use_http2) // This method is no longer available in recent reqwest versions
53 .build()
54 .unwrap_or_default()
55 }
56}
57
58/// High-performance provider interface for exchange connectivity
59/// Optimized for zero allocations and predictable low latency
60/// All implementations must adhere to the project's HFT performance guidelines
61#[async_trait]
62pub trait Provider: Send + Sync + 'static + Debug {
63 /// Exchange-specific raw message type for trades from WebSocket
64 type TradeMessage: Send + 'static;
65
66 /// Exchange-specific raw message type for orderbook updates from WebSocket
67 type DepthMessage: Send + 'static;
68
69 /// Exchange-specific raw message type for instrument info
70 type InstrumentMessage: Send + 'static;
71
72 /// Returns the exchange name as a static String
73 fn name(&self) -> &'static str;
74
75 /// Returns the venue enum value for this provider
76 fn venue(&self) -> Venue;
77
78 /// Returns reference to the connection configuration
79 fn config(&self) -> &ConnectionConfig;
80
81 /// Initialize the provider with any necessary setup steps
82 /// This method should be called before using other methods
83 async fn init(&mut self) -> Result<()>;
84
85 /// Subscribe to trade data stream for multiple symbols
86 /// Returns a channel receiver for trade messages
87 /// Uses SmallVec to avoid heap allocations for small numbers of symbols
88 async fn subscribe_trades(
89 &self,
90 symbols: SmallVec<[String; 8]>,
91 ) -> Result<mpsc::Receiver<Self::TradeMessage>>;
92
93 /// Unsubscribe from trade data
94 async fn unsubscribe_trades(&self) -> Result<()>;
95
96 /// Subscribe to orderbook depth data for multiple symbols
97 /// Returns a channel receiver for depth messages
98 /// Uses SmallVec to avoid heap allocations for small numbers of symbols
99 async fn subscribe_orderbook(
100 &self,
101 symbols: SmallVec<[String; 8]>,
102 ) -> Result<mpsc::Receiver<Self::DepthMessage>>;
103
104 /// Unsubscribe from orderbook depth data
105 async fn unsubscribe_orderbook(&self) -> Result<()>;
106
107 /// Get realtime shared orderbook for a symbol
108 /// Returns a thread-safe shared orderbook that's kept up-to-date
109 async fn get_realtime_orderbook(&self, symbol: &str) -> Result<SharedSimdOrderBook>;
110
111 /// Get all available trading instruments from the exchange
112 async fn get_instruments(&self) -> Result<Vec<Box<dyn Instrument>>>;
113
114 /// Get historical trades for a symbol
115 /// Optionally limit the number of trades returned
116 async fn get_historical_trades(
117 &self,
118 symbol: &str,
119 limit: Option<u32>,
120 ) -> Result<Vec<MarketTrade>>;
121
122 /// Get current orderbook snapshot for a symbol
123 /// Optionally specify the depth of the orderbook
124 async fn get_orderbook_snapshot(
125 &self,
126 symbol: &str,
127 depth: Option<u32>,
128 ) -> Result<OrderBookSnapshot>;
129
130 /// Start providing depth data
131 /// Returns a receiver for depth messages
132 /// Default implementation returns error - exchanges should override
133 async fn start_provide_depth(&self) -> Result<mpsc::Receiver<Self::DepthMessage>> {
134 Err(anyhow::anyhow!("Not implemented"))
135 }
136
137 /// Stop providing depth data
138 /// Default implementation does nothing
139 async fn stop_provide_depth(&self) -> Result<()> {
140 Ok(())
141 }
142
143 /// Start providing trade data
144 /// Returns a receiver for trade messages
145 /// Default implementation returns error - exchanges should override
146 async fn start_provide_trade(&self) -> Result<mpsc::Receiver<Self::TradeMessage>> {
147 Err(anyhow::anyhow!("Not implemented"))
148 }
149
150 /// Stop providing trade data
151 /// Default implementation does nothing
152 async fn stop_provide_trade(&self) -> Result<()> {
153 Ok(())
154 }
155
156 /// Check if provider is connected to exchange
157 /// Default implementation returns false - exchanges should override
158 async fn is_connected(&self) -> bool {
159 false
160 }
161
162 /// Get detailed connection status
163 /// Default implementation returns Disconnected - exchanges should override
164 async fn connection_status(&self) -> ConnectionState {
165 ConnectionState::Disconnected
166 }
167
168 /// Get connection statistics for monitoring
169 /// Default implementation returns empty stats - exchanges should override
170 async fn get_stats(&self) -> ConnectionStats {
171 ConnectionStats::default()
172 }
173
174 /// Ping the exchange to keep the connection alive
175 /// Returns round-trip time in nanoseconds
176 /// Default implementation returns 0 - exchanges should override
177 async fn ping(&self) -> Result<u64> {
178 Ok(0)
179 }
180
181 /// Reset connection (force reconnect)
182 /// Default implementation does nothing - exchanges should override
183 async fn reset_connection(&self) -> Result<()> {
184 Ok(())
185 }
186
187 /// Get the exchange-specific rate limits
188 /// Default implementation returns empty vector - exchanges should override
189 fn get_rate_limits(&self) -> Vec<RateLimit> {
190 Vec::new()
191 }
192
193 /// Check if provider is operational
194 /// Returns true if the provider is functioning correctly
195 async fn health_check(&self) -> Result<bool> {
196 Ok(self.is_connected().await)
197 }
198
199 /// Close all connections and clean up resources
200 async fn shutdown(&mut self) -> Result<()>;
201
202 /// High-performance timestamp conversion method
203 /// Uses cached timestamp conversions when possible and provides optimized conversion
204 /// when timestamps are a known format.
205 ///
206 /// This method should be used in all implementations when converting exchange timestamps
207 /// to ensure consistent high-performance timestamp handling across exchanges.
208 #[inline]
209 fn convert_exchange_timestamp(&self, exchange_timestamp: u64) -> u64 {
210 // Fast path: use format-specific conversion based on exchange's default format
211 match self.config().timestamp_format {
212 TimestampFormat::Milliseconds => {
213 // Try to use cache first for frequently seen timestamps
214 let mut cache = self.config().timestamp_cache.write();
215 cache.get_or_convert_ms(exchange_timestamp)
216 }
217 TimestampFormat::Seconds => timestamp::seconds_to_nanos(exchange_timestamp),
218 TimestampFormat::Microseconds => timestamp::us_to_nanos(exchange_timestamp),
219 TimestampFormat::Nanoseconds => exchange_timestamp,
220 _ => {
221 // For other formats, use the generic conversion function
222 timestamp::exchange_time_to_nanos(
223 exchange_timestamp,
224 self.config().timestamp_format,
225 )
226 }
227 }
228 }
229
230 /// Convert ISO8601 timestamp String to nanoseconds with caching for frequent values
231 /// This method should be used for exchanges that provide timestamps as ISO8601 strings
232 /// such as Coinbase.
233 #[inline]
234 fn convert_iso8601_timestamp(&self, iso_timestamp: &str) -> Option<u64> {
235 let mut cache = self.config().timestamp_cache.write();
236 cache.get_or_convert_iso8601(iso_timestamp)
237 }
238
239 /// Get current time in nanoseconds from the provider's clock
240 /// This method uses the shared clock to ensure consistent timekeeping
241 /// across all parts of the exchange provider.
242 #[inline]
243 fn current_time_nanos(&self) -> u64 {
244 self.config().clock.raw()
245 }
246}