rusty_feeder/provider/ext.rs
1//! Extension trait for Provider to provide additional functionality
2//!
3//! This module provides an extension trait that adds convenience methods
4//! to all Provider implementations, making them easier to use in a unified way.
5
6use anyhow::Result;
7use async_trait::async_trait;
8use rusty_model::{
9 data::{book_snapshot::OrderBookSnapshot, market_trade::MarketTrade},
10 instruments::{Instrument, InstrumentId},
11};
12
13use super::connection::ConnectionStats;
14use crate::exchange::binance::futures::provider::{
15 BinanceFuturesMarketType, BinanceFuturesProvider,
16};
17
18// Re-export all sub-modules
19pub mod alerts;
20pub mod health;
21pub mod metrics;
22
23// Re-export all types from sub-modules for convenience
24pub use alerts::{AlertCondition, AlertConfig, AlertHandle, AnomalyAlgorithm, AnomalyConfig};
25pub use health::{ComponentHealth, HealthIssue, HealthState, HealthStatus, IssueSeverity};
26pub use metrics::{
27 AggregatedMetrics, CustomMetric, ExecutionQualityMetrics, InstrumentMetrics, LatencyBucket,
28 LatencyDistribution, LatencyMetrics, MetricsCollector, MicrostructureSignals, OrderFlowMetrics,
29 TradingMetrics,
30};
31
32/// Data handler trait for processing market data
33#[async_trait]
34pub trait DataHandler: Send + Sync {
35 /// Handle incoming market trade
36 async fn handle_market_trade(&self, trade: MarketTrade) -> Result<()>;
37
38 /// Handle incoming orderbook snapshot
39 async fn handle_orderbook_snapshot(&self, snapshot: OrderBookSnapshot) -> Result<()>;
40}
41
42/// Advanced monitoring capabilities for High-Frequency Trading (HFT) systems
43///
44/// The `MonitorExt` trait provides comprehensive monitoring, alerting, and observability features
45/// specifically designed for HFT systems that require nanosecond precision and real-time performance
46/// tracking. This trait extends basic monitoring with advanced analytics including:
47///
48/// - **Real-time Performance Metrics**: Track latency distributions, throughput, and system health
49/// - **Trading Analytics**: Monitor order flow, market impact, and microstructure signals
50/// - **Anomaly Detection**: Identify unusual patterns in trading data and system behavior
51/// - **Alerting System**: Configure alerts for critical conditions with customizable thresholds
52/// - **Health Monitoring**: Component-level health tracking with degradation detection
53/// - **Custom Metrics**: Extensible framework for domain-specific metric collection
54///
55/// # Design Philosophy
56///
57/// The trait is designed with HFT requirements in mind:
58/// - All timestamps use nanosecond precision
59/// - Zero-allocation operations where possible
60/// - Lock-free metrics collection for minimal overhead
61/// - Configurable aggregation intervals to balance accuracy vs performance
62///
63/// # Usage Example
64///
65/// ```rust,no_run
66/// use rusty_feeder::provider::ext::{MonitorExt, AlertConfig, AlertCondition, IssueSeverity};
67/// use rusty_model::instruments::InstrumentId;
68/// use rusty_model::venues::Venue;
69///
70/// async fn monitor_trading_system(monitor: &impl MonitorExt) -> anyhow::Result<()> {
71/// // Configure latency alert
72/// let alert_config = AlertConfig {
73/// name: "High Latency Alert".to_string(),
74/// condition: AlertCondition::LatencyThreshold { instrument_id: None },
75/// threshold: 1_000_000.0, // 1ms in nanoseconds
76/// duration_ms: 5000, // Trigger after 5 seconds
77/// severity: IssueSeverity::Warning,
78/// enabled: true,
79/// };
80///
81/// let alert_handle = monitor.enable_alert(alert_config).await?;
82///
83/// // Get real-time metrics
84/// let btc_usdt = InstrumentId::new("BTC-USDT", Venue::Binance);
85/// let metrics = monitor.get_instrument_metrics(&btc_usdt).await?;
86/// println!("Current latency: {}ns", metrics.latency_metrics.total_latency_ns);
87///
88/// // Export for Prometheus
89/// let prometheus_data = monitor.export_prometheus_metrics().await?;
90///
91/// Ok(())
92/// }
93/// ```
94#[async_trait]
95pub trait MonitorExt {
96 /// Retrieves comprehensive real-time metrics for a specific trading instrument
97 ///
98 /// This method provides a complete snapshot of all monitoring data for a single instrument,
99 /// including connection health, data feed statistics, order flow metrics, and latency
100 /// measurements. All metrics use nanosecond precision timestamps.
101 ///
102 /// # Arguments
103 ///
104 /// * `instrument_id` - The unique identifier of the instrument to query
105 ///
106 /// # Returns
107 ///
108 /// * `Ok(InstrumentMetrics)` - Complete metrics snapshot for the instrument
109 /// * `Err` - If the instrument is not found or monitoring is not available
110 ///
111 /// # Example
112 ///
113 /// ```rust,no_run
114 /// # use rusty_model::instruments::InstrumentId;
115 /// # use rusty_model::venues::Venue;
116 /// # async fn example(monitor: &impl rusty_feeder::provider::ext::MonitorExt) -> anyhow::Result<()> {
117 /// let eth_usdt = InstrumentId::new("ETH-USDT", Venue::Binance);
118 /// let metrics = monitor.get_instrument_metrics(ð_usdt).await?;
119 ///
120 /// // Check order flow imbalance
121 /// if metrics.order_flow_metrics.volume_imbalance.abs() > 0.7 {
122 /// println!("High volume imbalance detected: {}", metrics.order_flow_metrics.volume_imbalance);
123 /// }
124 ///
125 /// // Monitor latency
126 /// println!("Network latency: {}µs", metrics.latency_metrics.network_latency_ns / 1000);
127 /// println!("Total latency: {}µs", metrics.latency_metrics.total_latency_ns / 1000);
128 /// # Ok(())
129 /// # }
130 /// ```
131 async fn get_instrument_metrics(
132 &self,
133 instrument_id: &InstrumentId,
134 ) -> Result<InstrumentMetrics>;
135
136 /// Retrieves aggregated metrics across all actively monitored instruments
137 ///
138 /// This method provides system-wide performance metrics, useful for monitoring overall
139 /// system health and identifying cross-instrument issues. Aggregated metrics include
140 /// total message throughput, average latencies, memory usage, and health scores.
141 ///
142 /// # Returns
143 ///
144 /// * `Ok(AggregatedMetrics)` - System-wide aggregated metrics
145 /// * `Err` - If monitoring subsystem is unavailable
146 ///
147 /// # Example
148 ///
149 /// ```rust,no_run
150 /// # async fn example(monitor: &impl rusty_feeder::provider::ext::MonitorExt) -> anyhow::Result<()> {
151 /// let system_metrics = monitor.get_aggregated_metrics().await?;
152 ///
153 /// // Check system health
154 /// if system_metrics.overall_health_score < 0.8 {
155 /// println!("System health degraded: {:.2}", system_metrics.overall_health_score);
156 /// println!("Active alerts: {}", system_metrics.active_alerts);
157 /// }
158 ///
159 /// // Monitor performance
160 /// println!("Processing {} messages/sec",
161 /// system_metrics.total_messages_processed / (system_metrics.timestamp / 1_000_000_000));
162 /// println!("P99 latency: {}µs", system_metrics.p99_latency_ns / 1000);
163 /// # Ok(())
164 /// # }
165 /// ```
166 async fn get_aggregated_metrics(&self) -> Result<AggregatedMetrics>;
167
168 /// Registers a custom metrics collector for domain-specific monitoring
169 ///
170 /// This method allows extending the monitoring system with custom metrics collectors
171 /// that implement the `MetricsCollector` trait. Collectors are invoked periodically
172 /// and their metrics are included in aggregated reports and exports.
173 ///
174 /// # Arguments
175 ///
176 /// * `collector` - A boxed implementation of the `MetricsCollector` trait
177 ///
178 /// # Returns
179 ///
180 /// * `Ok(())` - If the collector was successfully registered
181 /// * `Err` - If registration failed (e.g., duplicate name)
182 ///
183 /// # Example
184 ///
185 /// ```rust,no_run
186 /// # use rusty_feeder::provider::ext::{MetricsCollector, CustomMetric};
187 /// # use async_trait::async_trait;
188 /// # use anyhow::Result;
189 /// # use rustc_hash::FxHashMap;
190 ///
191 /// struct SpreadMonitor;
192 ///
193 /// #[async_trait]
194 /// impl MetricsCollector for SpreadMonitor {
195 /// async fn collect(&self) -> Result<Vec<CustomMetric>> {
196 /// // Calculate custom spread metrics
197 /// let mut tags = FxHashMap::default();
198 /// tags.insert("exchange".to_string(), "binance".to_string());
199 ///
200 /// Ok(vec![
201 /// CustomMetric {
202 /// name: "custom_spread_bps".to_string(),
203 /// value: 2.5,
204 /// tags,
205 /// timestamp: quanta::Instant::now().as_nanos() as u64,
206 /// }
207 /// ])
208 /// }
209 ///
210 /// fn name(&self) -> &str {
211 /// "spread_monitor"
212 /// }
213 /// }
214 ///
215 /// # async fn example(monitor: &mut impl rusty_feeder::provider::ext::MonitorExt) -> Result<()> {
216 /// monitor.register_metrics_collector(Box::new(SpreadMonitor)).await?;
217 /// # Ok(())
218 /// # }
219 /// ```
220 async fn register_metrics_collector(
221 &mut self,
222 collector: Box<dyn MetricsCollector>,
223 ) -> Result<()>;
224
225 /// Enables a real-time alert for specific monitoring conditions
226 ///
227 /// This method configures and activates an alert that will trigger when the specified
228 /// condition is met for the configured duration. Alerts can monitor various conditions
229 /// including latency thresholds, error rates, connection status, and custom metrics.
230 ///
231 /// # Arguments
232 ///
233 /// * `alert_config` - Configuration specifying the alert condition, threshold, and behavior
234 ///
235 /// # Returns
236 ///
237 /// * `Ok(AlertHandle)` - A handle for managing the active alert
238 /// * `Err` - If the alert configuration is invalid
239 ///
240 /// # Example
241 ///
242 /// ```rust,no_run
243 /// # use rusty_feeder::provider::ext::{AlertConfig, AlertCondition, IssueSeverity};
244 /// # use rusty_model::instruments::InstrumentId;
245 /// # use rusty_model::venues::Venue;
246 /// # async fn example(monitor: &mut impl rusty_feeder::provider::ext::MonitorExt) -> anyhow::Result<()> {
247 /// // Alert for high latency on specific instrument
248 /// let btc_instrument = InstrumentId::new("BTC-USDT", Venue::Binance);
249 /// let latency_alert = AlertConfig {
250 /// name: "BTC-USDT High Latency".to_string(),
251 /// condition: AlertCondition::LatencyThreshold {
252 /// instrument_id: Some(btc_instrument),
253 /// },
254 /// threshold: 5_000_000.0, // 5ms in nanoseconds
255 /// duration_ms: 10_000, // Sustained for 10 seconds
256 /// severity: IssueSeverity::Error,
257 /// enabled: true,
258 /// };
259 ///
260 /// let alert_handle = monitor.enable_alert(latency_alert).await?;
261 ///
262 /// // Alert for connection loss
263 /// let connection_alert = AlertConfig {
264 /// name: "Binance Connection Lost".to_string(),
265 /// condition: AlertCondition::ConnectionLoss {
266 /// exchange: "binance".to_string(),
267 /// },
268 /// threshold: 1.0, // Binary condition
269 /// duration_ms: 0, // Immediate
270 /// severity: IssueSeverity::Critical,
271 /// enabled: true,
272 /// };
273 ///
274 /// let conn_handle = monitor.enable_alert(connection_alert).await?;
275 /// # Ok(())
276 /// # }
277 /// ```
278 async fn enable_alert(&mut self, alert_config: AlertConfig) -> Result<AlertHandle>;
279
280 /// Disables an active alert using its handle
281 ///
282 /// This method deactivates a previously configured alert. The alert will stop
283 /// evaluating its condition and no new notifications will be generated.
284 ///
285 /// # Arguments
286 ///
287 /// * `handle` - The handle returned when the alert was enabled
288 ///
289 /// # Returns
290 ///
291 /// * `Ok(())` - If the alert was successfully disabled
292 /// * `Err` - If the handle is invalid or the alert was already disabled
293 ///
294 /// # Example
295 ///
296 /// ```rust,no_run
297 /// # use rusty_feeder::provider::ext::AlertHandle;
298 /// # async fn example(monitor: &mut impl rusty_feeder::provider::ext::MonitorExt, alert_handle: AlertHandle) -> anyhow::Result<()> {
299 /// // Disable an alert after market close
300 /// monitor.disable_alert(alert_handle).await?;
301 /// # Ok(())
302 /// # }
303 /// ```
304 async fn disable_alert(&mut self, handle: AlertHandle) -> Result<()>;
305
306 /// Retrieves the current health status with detailed component diagnostics
307 ///
308 /// This method provides a comprehensive health check of the monitoring system and
309 /// all monitored components. It includes overall system status, individual component
310 /// health, active issues, performance grading, and uptime information.
311 ///
312 /// # Returns
313 ///
314 /// * `Ok(HealthStatus)` - Detailed health information
315 /// * `Err` - If health check fails
316 ///
317 /// # Example
318 ///
319 /// ```rust,no_run
320 /// # use rusty_feeder::provider::ext::HealthState;
321 /// # async fn example(monitor: &impl rusty_feeder::provider::ext::MonitorExt) -> anyhow::Result<()> {
322 /// let health = monitor.get_health_status().await?;
323 ///
324 /// match health.overall_status {
325 /// HealthState::Healthy => println!("System healthy - Grade: {}", health.performance_grade),
326 /// HealthState::Degraded => {
327 /// println!("System degraded - Active issues: {}", health.active_issues.len());
328 /// for issue in &health.active_issues {
329 /// println!(" - {}: {}", issue.component, issue.description);
330 /// }
331 /// }
332 /// HealthState::Critical => println!("CRITICAL: System requires immediate attention"),
333 /// HealthState::Offline => println!("System offline"),
334 /// }
335 ///
336 /// // Check component health
337 /// for component in &health.component_health {
338 /// if component.error_rate > 0.01 {
339 /// println!("{} has high error rate: {:.2}%",
340 /// component.component_name, component.error_rate * 100.0);
341 /// }
342 /// }
343 /// # Ok(())
344 /// # }
345 /// ```
346 async fn get_health_status(&self) -> Result<HealthStatus>;
347
348 /// Starts anomaly detection for trading patterns and system behavior
349 ///
350 /// This method activates anomaly detection algorithms that monitor for unusual
351 /// patterns in trading data, latency spikes, volume anomalies, and other
352 /// deviations from normal behavior. Multiple algorithms can run concurrently.
353 ///
354 /// # Arguments
355 ///
356 /// * `config` - Configuration specifying instruments, algorithms, and sensitivity
357 ///
358 /// # Returns
359 ///
360 /// * `Ok(())` - If anomaly detection was successfully started
361 /// * `Err` - If configuration is invalid or detection is already running
362 ///
363 /// # Example
364 ///
365 /// ```rust,no_run
366 /// # use rusty_feeder::provider::ext::{AnomalyConfig, AnomalyAlgorithm};
367 /// # use rusty_model::instruments::InstrumentId;
368 /// # use rusty_model::venues::Venue;
369 /// # async fn example(monitor: &mut impl rusty_feeder::provider::ext::MonitorExt) -> anyhow::Result<()> {
370 /// let config = AnomalyConfig {
371 /// instruments: vec![
372 /// InstrumentId::new("BTC-USDT", Venue::Binance),
373 /// InstrumentId::new("ETH-USDT", Venue::Binance),
374 /// ],
375 /// sensitivity: 0.8, // High sensitivity
376 /// window_size_ms: 60_000, // 1 minute windows
377 /// algorithms: vec![
378 /// AnomalyAlgorithm::StatisticalOutlier {
379 /// threshold_std_dev: 3.0,
380 /// },
381 /// AnomalyAlgorithm::VolumeAnomaly {
382 /// volume_threshold: 5.0, // 5x normal volume
383 /// },
384 /// AnomalyAlgorithm::LatencyAnomaly {
385 /// latency_threshold_ns: 10_000_000, // 10ms
386 /// },
387 /// ],
388 /// };
389 ///
390 /// monitor.start_anomaly_detection(config).await?;
391 /// # Ok(())
392 /// # }
393 /// ```
394 async fn start_anomaly_detection(&mut self, config: AnomalyConfig) -> Result<()>;
395
396 /// Stops all active anomaly detection algorithms
397 ///
398 /// This method deactivates all running anomaly detection algorithms and clears
399 /// any accumulated detection state.
400 ///
401 /// # Returns
402 ///
403 /// * `Ok(())` - If anomaly detection was successfully stopped
404 /// * `Err` - If no detection was running
405 ///
406 /// # Example
407 ///
408 /// ```rust,no_run
409 /// # async fn example(monitor: &mut impl rusty_feeder::provider::ext::MonitorExt) -> anyhow::Result<()> {
410 /// // Stop detection during maintenance window
411 /// monitor.stop_anomaly_detection().await?;
412 /// # Ok(())
413 /// # }
414 /// ```
415 async fn stop_anomaly_detection(&mut self) -> Result<()>;
416
417 /// Retrieves detailed latency distribution statistics for an instrument
418 ///
419 /// This method provides a comprehensive breakdown of latency measurements including
420 /// percentiles (p50, p90, p95, p99, p99.9), maximum values, and histogram buckets.
421 /// Useful for identifying latency patterns and optimizing performance.
422 ///
423 /// # Arguments
424 ///
425 /// * `instrument_id` - The instrument to analyze
426 ///
427 /// # Returns
428 ///
429 /// * `Ok(LatencyDistribution)` - Detailed latency statistics
430 /// * `Err` - If instrument is not found or has insufficient data
431 ///
432 /// # Example
433 ///
434 /// ```rust,no_run
435 /// # use rusty_model::instruments::InstrumentId;
436 /// # use rusty_model::venues::Venue;
437 /// # async fn example(monitor: &impl rusty_feeder::provider::ext::MonitorExt) -> anyhow::Result<()> {
438 /// let btc_usdt = InstrumentId::new("BTC-USDT", Venue::Binance);
439 /// let latency = monitor.get_latency_distribution(&btc_usdt).await?;
440 ///
441 /// println!("Latency percentiles for BTC-USDT:");
442 /// println!(" P50: {}µs", latency.p50_ns / 1000);
443 /// println!(" P90: {}µs", latency.p90_ns / 1000);
444 /// println!(" P95: {}µs", latency.p95_ns / 1000);
445 /// println!(" P99: {}µs", latency.p99_ns / 1000);
446 /// println!(" P99.9: {}µs", latency.p99_9_ns / 1000);
447 /// println!(" Max: {}µs", latency.max_ns / 1000);
448 ///
449 /// // Analyze histogram
450 /// for bucket in &latency.histogram {
451 /// println!(" {}µs-{}µs: {} samples",
452 /// bucket.min_ns / 1000, bucket.max_ns / 1000, bucket.count);
453 /// }
454 /// # Ok(())
455 /// # }
456 /// ```
457 async fn get_latency_distribution(
458 &self,
459 instrument_id: &InstrumentId,
460 ) -> Result<LatencyDistribution>;
461
462 /// Exports all metrics in Prometheus exposition format
463 ///
464 /// This method generates a text representation of all metrics following the
465 /// Prometheus exposition format. The output can be scraped by Prometheus or
466 /// compatible monitoring systems. Includes all standard metrics, custom metrics,
467 /// and alert states.
468 ///
469 /// # Returns
470 ///
471 /// * `Ok(String)` - Prometheus-formatted metrics text
472 /// * `Err` - If export fails
473 ///
474 /// # Output Format
475 ///
476 /// ```text
477 /// # HELP hft_latency_nanoseconds End-to-end latency in nanoseconds
478 /// # TYPE hft_latency_nanoseconds histogram
479 /// hft_latency_nanoseconds_bucket{exchange="binance",instrument="BTC-USDT",le="1000"} 245
480 /// hft_latency_nanoseconds_bucket{exchange="binance",instrument="BTC-USDT",le="5000"} 1823
481 /// hft_latency_nanoseconds_sum{exchange="binance",instrument="BTC-USDT"} 4523000
482 /// hft_latency_nanoseconds_count{exchange="binance",instrument="BTC-USDT"} 2068
483 ///
484 /// # HELP hft_messages_total Total messages processed
485 /// # TYPE hft_messages_total counter
486 /// hft_messages_total{exchange="binance",type="trade"} 150234
487 /// hft_messages_total{exchange="binance",type="orderbook"} 892341
488 /// ```
489 ///
490 /// # Example
491 ///
492 /// ```rust,no_run
493 /// # async fn example(monitor: &impl rusty_feeder::provider::ext::MonitorExt) -> anyhow::Result<()> {
494 /// let prometheus_data = monitor.export_prometheus_metrics().await?;
495 ///
496 /// // Write to file for Prometheus to scrape
497 /// std::fs::write("/var/lib/prometheus/hft_metrics.prom", prometheus_data)?;
498 ///
499 /// // Or serve via HTTP endpoint
500 /// // warp::reply::with_header(prometheus_data, "Content-Type", "text/plain; version=0.0.4")
501 /// # Ok(())
502 /// # }
503 /// ```
504 async fn export_prometheus_metrics(&self) -> Result<String>;
505
506 /// Resets all monitoring statistics to their initial state
507 ///
508 /// This method clears all accumulated metrics, counters, and statistics while
509 /// preserving configuration (alerts, collectors, etc.). Useful for starting
510 /// fresh measurements or after system maintenance.
511 ///
512 /// # Returns
513 ///
514 /// * `Ok(())` - If statistics were successfully reset
515 /// * `Err` - If reset fails
516 ///
517 /// # Example
518 ///
519 /// ```rust,no_run
520 /// # async fn example(monitor: &mut impl rusty_feeder::provider::ext::MonitorExt) -> anyhow::Result<()> {
521 /// // Reset stats at market open
522 /// monitor.reset_statistics().await?;
523 /// println!("Statistics reset for new trading session");
524 /// # Ok(())
525 /// # }
526 /// ```
527 async fn reset_statistics(&mut self) -> Result<()>;
528
529 /// Retrieves advanced trading-specific metrics for an instrument
530 ///
531 /// This method provides sophisticated trading analytics including order flow
532 /// imbalance, VWAP/TWAP calculations, microstructure signals, and execution
533 /// quality metrics. These metrics are essential for HFT strategy development
534 /// and performance analysis.
535 ///
536 /// # Arguments
537 ///
538 /// * `instrument_id` - The instrument to analyze
539 ///
540 /// # Returns
541 ///
542 /// * `Ok(TradingMetrics)` - Comprehensive trading analytics
543 /// * `Err` - If instrument is not found or has insufficient data
544 ///
545 /// # Example
546 ///
547 /// ```rust,no_run
548 /// # use rusty_model::instruments::InstrumentId;
549 /// # use rusty_model::venues::Venue;
550 /// # async fn example(monitor: &impl rusty_feeder::provider::ext::MonitorExt) -> anyhow::Result<()> {
551 /// let eth_usdt = InstrumentId::new("ETH-USDT", Venue::Binance);
552 /// let trading_metrics = monitor.get_trading_metrics(ð_usdt).await?;
553 ///
554 /// // Check microstructure signals
555 /// let signals = &trading_metrics.microstructure_signals;
556 /// if signals.book_imbalance > 0.7 && signals.momentum > 0.5 {
557 /// println!("Strong bullish signal detected");
558 /// println!(" Book imbalance: {:.3}", signals.book_imbalance);
559 /// println!(" Momentum: {:.3}", signals.momentum);
560 /// println!(" Alpha signal: {:.3}", signals.alpha_signal);
561 /// }
562 ///
563 /// // Analyze execution quality
564 /// let exec_quality = &trading_metrics.execution_quality;
565 /// println!("Execution metrics:");
566 /// println!(" Slippage: {:.2} bps", exec_quality.slippage_bps);
567 /// println!(" Market impact: {:.2} bps", exec_quality.market_impact_bps);
568 /// println!(" Fill rate: {:.1}%", exec_quality.fill_rate * 100.0);
569 ///
570 /// // Price analysis
571 /// println!("VWAP: {}", trading_metrics.vwap);
572 /// println!("TWAP: {}", trading_metrics.twap);
573 /// # Ok(())
574 /// # }
575 /// ```
576 async fn get_trading_metrics(&self, instrument_id: &InstrumentId) -> Result<TradingMetrics>;
577}
578
579/// Extension trait for new provider implementations
580#[async_trait]
581pub trait ProviderExt {
582 /// Connect to the exchange
583 async fn connect(&mut self) -> Result<()>;
584
585 /// Disconnect from the exchange
586 async fn disconnect(&mut self) -> Result<()>;
587
588 /// Subscribe to orderbook updates for an instrument
589 async fn subscribe_orderbook(
590 &mut self,
591 instrument_id: &InstrumentId,
592 depth: usize,
593 ) -> Result<()>;
594
595 /// Subscribe to trade updates for an instrument
596 async fn subscribe_trades(&mut self, instrument_id: &InstrumentId) -> Result<()>;
597
598 /// Unsubscribe from orderbook updates
599 async fn unsubscribe_orderbook(&mut self, instrument_id: &InstrumentId) -> Result<()>;
600
601 /// Unsubscribe from trade updates
602 async fn unsubscribe_trades(&mut self, instrument_id: &InstrumentId) -> Result<()>;
603
604 /// Check if connected
605 fn is_connected(&self) -> bool;
606
607 /// Get connection statistics
608 fn connection_stats(&self) -> ConnectionStats;
609
610 /// Run the provider with a data handler
611 async fn run_with_handler(&mut self, handler: Box<dyn DataHandler + Send + Sync>)
612 -> Result<()>;
613
614 /// Fetch available instruments
615 async fn fetch_instruments(&self) -> Result<Vec<Box<dyn Instrument>>>;
616}
617
618// Note: Since the new provider system doesn't follow the old Provider trait,
619// we need manual implementations for each provider type.
620// This is actually better for the refactoring - we can have explicit control.
621
622/// Unified provider interface for the monitor system
623pub enum UnifiedProvider {
624 /// Binance futures market provider
625 BinanceFutures(Box<BinanceFuturesProvider>),
626 // Add other providers as they're migrated to the new system
627}
628
629impl UnifiedProvider {
630 /// Create a new provider based on exchange name
631 pub fn new(exchange_name: &str) -> Result<Self> {
632 match exchange_name.to_lowercase().as_str() {
633 "binance" | "binance_futures" => {
634 use crate::ConnectionConfig;
635 let provider = BinanceFuturesProvider::new(
636 ConnectionConfig::default(),
637 BinanceFuturesMarketType::UsdM,
638 );
639 Ok(Self::BinanceFutures(Box::new(provider)))
640 }
641 _ => Err(anyhow::anyhow!("Unsupported exchange: {}", exchange_name)),
642 }
643 }
644
645 /// Get the exchange name
646 pub fn name(&self) -> &str {
647 match self {
648 Self::BinanceFutures(p) => p.name(),
649 }
650 }
651}
652
653#[async_trait]
654impl ProviderExt for UnifiedProvider {
655 async fn connect(&mut self) -> Result<()> {
656 match self {
657 Self::BinanceFutures(p) => p.connect().await,
658 }
659 }
660
661 async fn disconnect(&mut self) -> Result<()> {
662 match self {
663 Self::BinanceFutures(p) => p.disconnect().await,
664 }
665 }
666
667 async fn subscribe_orderbook(
668 &mut self,
669 instrument_id: &InstrumentId,
670 depth: usize,
671 ) -> Result<()> {
672 match self {
673 Self::BinanceFutures(p) => p.subscribe_orderbook(instrument_id, depth).await,
674 }
675 }
676
677 async fn subscribe_trades(&mut self, instrument_id: &InstrumentId) -> Result<()> {
678 match self {
679 Self::BinanceFutures(p) => p.subscribe_trades(instrument_id).await,
680 }
681 }
682
683 async fn unsubscribe_orderbook(&mut self, instrument_id: &InstrumentId) -> Result<()> {
684 match self {
685 Self::BinanceFutures(p) => p.unsubscribe_orderbook(instrument_id).await,
686 }
687 }
688
689 async fn unsubscribe_trades(&mut self, instrument_id: &InstrumentId) -> Result<()> {
690 match self {
691 Self::BinanceFutures(p) => p.unsubscribe_trades(instrument_id).await,
692 }
693 }
694
695 fn is_connected(&self) -> bool {
696 match self {
697 Self::BinanceFutures(p) => p.is_connected(),
698 }
699 }
700
701 fn connection_stats(&self) -> ConnectionStats {
702 match self {
703 Self::BinanceFutures(p) => p.connection_stats(),
704 }
705 }
706
707 async fn run_with_handler(
708 &mut self,
709 handler: Box<dyn DataHandler + Send + Sync>,
710 ) -> Result<()> {
711 match self {
712 Self::BinanceFutures(p) => p.run_with_handler(handler).await,
713 }
714 }
715
716 async fn fetch_instruments(&self) -> Result<Vec<Box<dyn Instrument>>> {
717 match self {
718 Self::BinanceFutures(p) => p.fetch_instruments().await,
719 }
720 }
721}
722
723#[cfg(test)]
724mod tests {
725 use super::*;
726 use crate::provider::connection::ConnectionState;
727 use anyhow::Result;
728 use async_trait::async_trait;
729 use parking_lot::RwLock;
730 use quanta::Clock;
731 use rust_decimal::Decimal;
732 use rustc_hash::FxHashMap;
733 use rusty_model::venues::Venue;
734 use std::sync::Arc;
735 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
736
737 /// Test implementation of MonitorExt for unit testing
738 struct TestMonitor {
739 /// Simulated metrics data for testing
740 instrument_metrics: Arc<RwLock<FxHashMap<InstrumentId, InstrumentMetrics>>>,
741 /// Track enabled alerts
742 alerts: Arc<RwLock<FxHashMap<AlertHandle, AlertConfig>>>,
743 /// Next alert handle ID
744 next_alert_id: AtomicU64,
745 /// Track registered metrics collectors
746 collectors: Arc<RwLock<Vec<Box<dyn MetricsCollector>>>>,
747 /// Track if anomaly detection is active
748 anomaly_detection_active: AtomicBool,
749 /// Track anomaly config
750 anomaly_config: Arc<RwLock<Option<AnomalyConfig>>>,
751 /// Health state for testing
752 health_state: Arc<RwLock<HealthState>>,
753 /// Connection errors for testing
754 connection_errors: AtomicU64,
755 /// Clock for consistent time
756 clock: Clock,
757 /// Simulated stats that can be reset
758 stats: Arc<RwLock<TestStats>>,
759 }
760
761 #[derive(Default)]
762 struct TestStats {
763 messages_received: u64,
764 messages_dropped: u64,
765 total_latency_ns: u64,
766 latency_samples: u64,
767 }
768
769 impl TestMonitor {
770 fn new() -> Self {
771 let clock = Clock::new();
772
773 // Create default metrics for testing
774 let mut metrics_map = FxHashMap::default();
775
776 // Add sample BTC-USDT metrics
777 let btc_usdt = InstrumentId::new("BTC-USDT", Venue::Binance);
778 metrics_map.insert(
779 btc_usdt.clone(),
780 Self::create_sample_metrics(btc_usdt, &clock),
781 );
782
783 // Add sample ETH-USDT metrics
784 let eth_usdt = InstrumentId::new("ETH-USDT", Venue::Binance);
785 metrics_map.insert(
786 eth_usdt.clone(),
787 Self::create_sample_metrics(eth_usdt, &clock),
788 );
789
790 Self {
791 instrument_metrics: Arc::new(RwLock::new(metrics_map)),
792 alerts: Arc::new(RwLock::new(FxHashMap::default())),
793 next_alert_id: AtomicU64::new(1),
794 collectors: Arc::new(RwLock::new(Vec::new())),
795 anomaly_detection_active: AtomicBool::new(false),
796 anomaly_config: Arc::new(RwLock::new(None)),
797 health_state: Arc::new(RwLock::new(HealthState::Healthy)),
798 connection_errors: AtomicU64::new(0),
799 clock,
800 stats: Arc::new(RwLock::new(TestStats::default())),
801 }
802 }
803
804 fn create_sample_metrics(instrument_id: InstrumentId, clock: &Clock) -> InstrumentMetrics {
805 use crate::feeder::FeedStats;
806
807 InstrumentMetrics {
808 instrument_id,
809 feed_stats: {
810 let mut stats = FeedStats::default();
811 stats.messages_processed = 10000;
812 stats.messages_per_second = 150.0;
813 stats.avg_process_latency_ns = 500_000; // 0.5ms
814 stats.max_process_latency_ns = 2_000_000; // 2ms
815 stats.p99_process_latency_ns = 1_500_000; // 1.5ms
816 stats.dropped_messages = 5;
817 stats.last_update_time = clock.raw();
818 stats.memory_usage_bytes = 1024 * 1024; // 1MB
819 stats
820 },
821 connection_stats: ConnectionStats {
822 messages_received: 10000,
823 messages_sent: 0,
824 reconnections: 0,
825 bytes_received: 1_000_000,
826 bytes_sent: 0,
827 last_message_time: clock.raw() - 100_000_000, // 100ms ago
828 last_ping_time: clock.raw() - 30_000_000_000, // 30s ago
829 last_pong_time: clock.raw() - 30_000_000_000, // 30s ago
830 avg_latency_ns: 1_000_000, // 1ms
831 connected_time: clock.raw() - 3_600_000_000_000, // 1 hour ago
832 connection_state: ConnectionState::Connected,
833 errors: 0,
834 },
835 order_flow_metrics: OrderFlowMetrics {
836 volume_imbalance: 0.15,
837 book_pressure: 1.2,
838 trade_intensity: 25.5,
839 avg_trade_size: Decimal::new(1500, 2), // 15.00
840 price_volatility: 0.02,
841 avg_spread_bps: 2.5,
842 max_spread_bps: 10.0,
843 market_impact_bps: 0.5,
844 },
845 latency_metrics: LatencyMetrics {
846 network_latency_ns: 800_000, // 0.8ms
847 parsing_latency_ns: 100_000, // 0.1ms
848 processing_latency_ns: 100_000, // 0.1ms
849 total_latency_ns: 1_000_000, // 1ms
850 latency_jitter_ns: 200_000, // 0.2ms
851 },
852 anomaly_score: 0.05,
853 health_score: 0.95,
854 last_updated: clock.raw(),
855 }
856 }
857
858 fn simulate_degraded_health(&self) {
859 *self.health_state.write() = HealthState::Degraded;
860 self.connection_errors.store(100, Ordering::SeqCst);
861 }
862
863 fn simulate_high_latency(&self, instrument_id: &InstrumentId) {
864 let mut metrics = self.instrument_metrics.write();
865 if let Some(m) = metrics.get_mut(instrument_id) {
866 m.latency_metrics.total_latency_ns = 10_000_000; // 10ms
867 m.latency_metrics.network_latency_ns = 8_000_000; // 8ms
868 m.health_score = 0.7;
869 m.anomaly_score = 0.8;
870 }
871 }
872 }
873
874 #[async_trait]
875 impl MonitorExt for TestMonitor {
876 async fn get_instrument_metrics(
877 &self,
878 instrument_id: &InstrumentId,
879 ) -> Result<InstrumentMetrics> {
880 self.instrument_metrics
881 .read()
882 .get(instrument_id)
883 .cloned()
884 .ok_or_else(|| anyhow::anyhow!("Instrument {} not found", instrument_id))
885 }
886
887 async fn get_aggregated_metrics(&self) -> Result<AggregatedMetrics> {
888 let metrics = self.instrument_metrics.read();
889 let stats = self.stats.read();
890
891 let total_messages: u64 = metrics
892 .values()
893 .map(|m| m.feed_stats.messages_processed)
894 .sum();
895
896 let total_dropped: u64 = metrics
897 .values()
898 .map(|m| m.feed_stats.dropped_messages)
899 .sum();
900
901 let avg_latency = if stats.latency_samples > 0 {
902 stats.total_latency_ns / stats.latency_samples
903 } else {
904 metrics
905 .values()
906 .map(|m| m.latency_metrics.total_latency_ns)
907 .sum::<u64>()
908 / metrics.len().max(1) as u64
909 };
910
911 let max_latency = metrics
912 .values()
913 .map(|m| m.latency_metrics.total_latency_ns)
914 .max()
915 .unwrap_or(0);
916
917 let p99_latency = metrics
918 .values()
919 .map(|m| m.feed_stats.p99_process_latency_ns)
920 .max()
921 .unwrap_or(0);
922
923 let overall_health =
924 metrics.values().map(|m| m.health_score).sum::<f64>() / metrics.len().max(1) as f64;
925
926 Ok(AggregatedMetrics {
927 total_instruments: metrics.len(),
928 total_messages_processed: total_messages,
929 total_dropped_messages: total_dropped,
930 avg_latency_ns: avg_latency,
931 max_latency_ns: max_latency,
932 p99_latency_ns: p99_latency,
933 total_memory_usage_bytes: metrics
934 .values()
935 .map(|m| m.feed_stats.memory_usage_bytes)
936 .sum(),
937 total_errors: self.connection_errors.load(Ordering::SeqCst) as u32,
938 overall_health_score: overall_health,
939 active_alerts: self.alerts.read().len() as u32,
940 timestamp: self.clock.raw(),
941 })
942 }
943
944 async fn register_metrics_collector(
945 &mut self,
946 collector: Box<dyn MetricsCollector>,
947 ) -> Result<()> {
948 let mut collectors = self.collectors.write();
949
950 // Check for duplicate names
951 let new_name = collector.name();
952 if collectors.iter().any(|c| c.name() == new_name) {
953 return Err(anyhow::anyhow!(
954 "Collector '{}' already registered",
955 new_name
956 ));
957 }
958
959 collectors.push(collector);
960 Ok(())
961 }
962
963 async fn enable_alert(&mut self, alert_config: AlertConfig) -> Result<AlertHandle> {
964 if !alert_config.enabled {
965 return Err(anyhow::anyhow!(
966 "Cannot enable a disabled alert configuration"
967 ));
968 }
969
970 if alert_config.threshold < 0.0
971 && !matches!(
972 alert_config.condition,
973 AlertCondition::ConnectionLoss { .. }
974 )
975 {
976 return Err(anyhow::anyhow!(
977 "Invalid threshold: {} (must be non-negative)",
978 alert_config.threshold
979 ));
980 }
981
982 let handle = AlertHandle(self.next_alert_id.fetch_add(1, Ordering::SeqCst));
983 self.alerts.write().insert(handle, alert_config);
984 Ok(handle)
985 }
986
987 async fn disable_alert(&mut self, handle: AlertHandle) -> Result<()> {
988 self.alerts
989 .write()
990 .remove(&handle)
991 .ok_or_else(|| anyhow::anyhow!("Alert handle {:?} not found", handle))
992 .map(|_| ())
993 }
994
995 async fn get_health_status(&self) -> Result<HealthStatus> {
996 let health_state = self.health_state.read().clone();
997 let metrics = self.instrument_metrics.read();
998 let errors = self.connection_errors.load(Ordering::SeqCst);
999
1000 let mut component_health = vec![];
1001 let mut active_issues = vec![];
1002
1003 // Check WebSocket health
1004 let ws_health = if errors == 0 {
1005 HealthState::Healthy
1006 } else if errors < 10 {
1007 HealthState::Degraded
1008 } else {
1009 HealthState::Critical
1010 };
1011
1012 component_health.push(ComponentHealth {
1013 component_name: "WebSocket Connection".to_string(),
1014 status: ws_health.clone(),
1015 last_heartbeat: self.clock.raw() - 1_000_000_000, // 1 second ago
1016 error_rate: errors as f64 / 1000.0,
1017 latency_p99_ns: 2_000_000, // 2ms
1018 });
1019
1020 // Add data parser health
1021 component_health.push(ComponentHealth {
1022 component_name: "Data Parser".to_string(),
1023 status: HealthState::Healthy,
1024 last_heartbeat: self.clock.raw() - 500_000_000, // 0.5 seconds ago
1025 error_rate: 0.001,
1026 latency_p99_ns: 100_000, // 0.1ms
1027 });
1028
1029 // Check for issues
1030 if matches!(ws_health, HealthState::Critical) {
1031 active_issues.push(HealthIssue {
1032 severity: IssueSeverity::Critical,
1033 component: "WebSocket Connection".to_string(),
1034 description: "High error rate detected".to_string(),
1035 first_seen: self.clock.raw() - 300_000_000_000, // 5 minutes ago
1036 last_seen: self.clock.raw(),
1037 count: errors as u32,
1038 });
1039 }
1040
1041 // Check for high latency issues
1042 for (instrument_id, metrics) in metrics.iter() {
1043 if metrics.latency_metrics.total_latency_ns > 5_000_000 {
1044 // 5ms
1045 active_issues.push(HealthIssue {
1046 severity: IssueSeverity::Warning,
1047 component: format!("{instrument_id} Feed"),
1048 description: format!(
1049 "High latency: {}ms",
1050 metrics.latency_metrics.total_latency_ns / 1_000_000
1051 ),
1052 first_seen: self.clock.raw() - 60_000_000_000, // 1 minute ago
1053 last_seen: self.clock.raw(),
1054 count: 1,
1055 });
1056 }
1057 }
1058
1059 let performance_grade = match health_state {
1060 HealthState::Healthy => 'A',
1061 HealthState::Degraded => 'B',
1062 HealthState::Critical => 'D',
1063 HealthState::Offline => 'F',
1064 };
1065
1066 Ok(HealthStatus {
1067 overall_status: health_state,
1068 component_health,
1069 active_issues,
1070 performance_grade,
1071 uptime_seconds: 3600, // 1 hour for testing
1072 last_check_time: self.clock.raw(),
1073 })
1074 }
1075
1076 async fn start_anomaly_detection(&mut self, config: AnomalyConfig) -> Result<()> {
1077 if self.anomaly_detection_active.load(Ordering::SeqCst) {
1078 return Err(anyhow::anyhow!("Anomaly detection is already running"));
1079 }
1080
1081 if config.instruments.is_empty() {
1082 return Err(anyhow::anyhow!(
1083 "No instruments specified for anomaly detection"
1084 ));
1085 }
1086
1087 if config.sensitivity < 0.0 || config.sensitivity > 1.0 {
1088 return Err(anyhow::anyhow!("Sensitivity must be between 0.0 and 1.0"));
1089 }
1090
1091 *self.anomaly_config.write() = Some(config);
1092 self.anomaly_detection_active.store(true, Ordering::SeqCst);
1093 Ok(())
1094 }
1095
1096 async fn stop_anomaly_detection(&mut self) -> Result<()> {
1097 if !self.anomaly_detection_active.load(Ordering::SeqCst) {
1098 return Err(anyhow::anyhow!("Anomaly detection is not running"));
1099 }
1100
1101 self.anomaly_detection_active.store(false, Ordering::SeqCst);
1102 *self.anomaly_config.write() = None;
1103 Ok(())
1104 }
1105
1106 async fn get_latency_distribution(
1107 &self,
1108 instrument_id: &InstrumentId,
1109 ) -> Result<LatencyDistribution> {
1110 let metrics = self.instrument_metrics.read();
1111 let instrument_metrics = metrics
1112 .get(instrument_id)
1113 .ok_or_else(|| anyhow::anyhow!("Instrument {} not found", instrument_id))?;
1114
1115 let base_latency = instrument_metrics.latency_metrics.total_latency_ns;
1116
1117 // Create realistic distribution based on base latency
1118 let histogram = vec![
1119 LatencyBucket {
1120 min_ns: 0,
1121 max_ns: base_latency / 2,
1122 count: 100,
1123 },
1124 LatencyBucket {
1125 min_ns: base_latency / 2,
1126 max_ns: base_latency,
1127 count: 500,
1128 },
1129 LatencyBucket {
1130 min_ns: base_latency,
1131 max_ns: base_latency * 2,
1132 count: 300,
1133 },
1134 LatencyBucket {
1135 min_ns: base_latency * 2,
1136 max_ns: base_latency * 5,
1137 count: 90,
1138 },
1139 LatencyBucket {
1140 min_ns: base_latency * 5,
1141 max_ns: base_latency * 10,
1142 count: 10,
1143 },
1144 ];
1145
1146 Ok(LatencyDistribution {
1147 p50_ns: base_latency,
1148 p90_ns: base_latency * 2,
1149 p95_ns: (base_latency as f64 * 2.5) as u64,
1150 p99_ns: base_latency * 5,
1151 p99_9_ns: base_latency * 8,
1152 max_ns: base_latency * 10,
1153 histogram,
1154 })
1155 }
1156
1157 async fn export_prometheus_metrics(&self) -> Result<String> {
1158 let aggregated = self.get_aggregated_metrics().await?;
1159 let metrics = self.instrument_metrics.read();
1160 let alerts = self.alerts.read();
1161
1162 let mut output = String::new();
1163
1164 // System-wide metrics
1165 output.push_str("# HELP hft_instruments_total Total number of monitored instruments\n");
1166 output.push_str("# TYPE hft_instruments_total gauge\n");
1167 output.push_str(&format!(
1168 "hft_instruments_total {}\n\n",
1169 aggregated.total_instruments
1170 ));
1171
1172 output.push_str("# HELP hft_messages_processed_total Total messages processed\n");
1173 output.push_str("# TYPE hft_messages_processed_total counter\n");
1174 output.push_str(&format!(
1175 "hft_messages_processed_total {}\n\n",
1176 aggregated.total_messages_processed
1177 ));
1178
1179 output.push_str("# HELP hft_messages_dropped_total Total messages dropped\n");
1180 output.push_str("# TYPE hft_messages_dropped_total counter\n");
1181 output.push_str(&format!(
1182 "hft_messages_dropped_total {}\n\n",
1183 aggregated.total_dropped_messages
1184 ));
1185
1186 output.push_str("# HELP hft_latency_nanoseconds End-to-end latency in nanoseconds\n");
1187 output.push_str("# TYPE hft_latency_nanoseconds histogram\n");
1188
1189 // Per-instrument metrics
1190 for (instrument_id, m) in metrics.iter() {
1191 let labels = format!(
1192 "exchange=\"{}\",instrument=\"{}\"",
1193 instrument_id.venue.to_str(),
1194 instrument_id.symbol
1195 );
1196
1197 // Latency histogram
1198 output.push_str(&format!(
1199 "hft_latency_nanoseconds_bucket{{{}le=\"1000000\"}} {}\n",
1200 labels,
1201 if m.latency_metrics.total_latency_ns <= 1_000_000 {
1202 1000
1203 } else {
1204 0
1205 }
1206 ));
1207 output.push_str(&format!(
1208 "hft_latency_nanoseconds_bucket{{{}le=\"5000000\"}} {}\n",
1209 labels,
1210 if m.latency_metrics.total_latency_ns <= 5_000_000 {
1211 1000
1212 } else {
1213 500
1214 }
1215 ));
1216 output.push_str(&format!(
1217 "hft_latency_nanoseconds_bucket{{{labels}le=\"+Inf\"}} 1000\n"
1218 ));
1219 output.push_str(&format!(
1220 "hft_latency_nanoseconds_sum{{{}}} {}\n",
1221 labels,
1222 m.latency_metrics.total_latency_ns * 1000 // Total across samples
1223 ));
1224 output.push_str(&format!(
1225 "hft_latency_nanoseconds_count{{{labels}}} 1000\n\n"
1226 ));
1227
1228 // Health score
1229 output.push_str(&format!(
1230 "hft_health_score{{{}}} {}\n",
1231 labels, m.health_score
1232 ));
1233
1234 // Anomaly score
1235 output.push_str(&format!(
1236 "hft_anomaly_score{{{}}} {}\n",
1237 labels, m.anomaly_score
1238 ));
1239 }
1240
1241 // Active alerts
1242 output.push_str("\n# HELP hft_alerts_active Number of active alerts\n");
1243 output.push_str("# TYPE hft_alerts_active gauge\n");
1244 output.push_str(&format!("hft_alerts_active {}\n", alerts.len()));
1245
1246 Ok(output)
1247 }
1248
1249 async fn reset_statistics(&mut self) -> Result<()> {
1250 // Reset stats
1251 *self.stats.write() = TestStats::default();
1252
1253 // Reset error counter
1254 self.connection_errors.store(0, Ordering::SeqCst);
1255
1256 // Reset metrics to baseline
1257 let mut metrics = self.instrument_metrics.write();
1258 for (instrument_id, metric) in metrics.iter_mut() {
1259 *metric = Self::create_sample_metrics(instrument_id.clone(), &self.clock);
1260 }
1261
1262 Ok(())
1263 }
1264
1265 async fn get_trading_metrics(
1266 &self,
1267 instrument_id: &InstrumentId,
1268 ) -> Result<TradingMetrics> {
1269 let metrics = self.instrument_metrics.read();
1270 let _ = metrics
1271 .get(instrument_id)
1272 .ok_or_else(|| anyhow::anyhow!("Instrument {} not found", instrument_id))?;
1273
1274 Ok(TradingMetrics {
1275 order_flow_imbalance: 0.25,
1276 vwap: Decimal::new(4523450, 2), // 45234.50
1277 twap: Decimal::new(4520000, 2), // 45200.00
1278 microstructure_signals: MicrostructureSignals {
1279 book_imbalance: 0.15,
1280 trade_sign_accuracy: 0.72,
1281 alpha_signal: 0.08,
1282 momentum: 0.35,
1283 mean_reversion: 0.20,
1284 },
1285 execution_quality: ExecutionQualityMetrics {
1286 slippage_bps: 1.5,
1287 market_impact_bps: 0.8,
1288 fill_rate: 0.95,
1289 avg_fill_time_ns: 2_500_000, // 2.5ms
1290 implementation_shortfall_bps: 2.3,
1291 },
1292 })
1293 }
1294 }
1295
1296 // Custom test metrics collector
1297 struct TestMetricsCollector {
1298 name: String,
1299 metrics_to_return: Vec<CustomMetric>,
1300 }
1301
1302 #[async_trait]
1303 impl MetricsCollector for TestMetricsCollector {
1304 async fn collect(&self) -> Result<Vec<CustomMetric>> {
1305 Ok(self.metrics_to_return.clone())
1306 }
1307
1308 fn name(&self) -> &str {
1309 &self.name
1310 }
1311 }
1312
1313 // Tests start here
1314
1315 #[tokio::test]
1316 async fn test_get_instrument_metrics_success() {
1317 let monitor = TestMonitor::new();
1318 let btc_usdt = InstrumentId::new("BTC-USDT", Venue::Binance);
1319
1320 let metrics = monitor.get_instrument_metrics(&btc_usdt).await.unwrap();
1321
1322 assert_eq!(metrics.instrument_id, btc_usdt);
1323 assert_eq!(metrics.feed_stats.messages_processed, 10000);
1324 assert_eq!(metrics.latency_metrics.total_latency_ns, 1_000_000);
1325 assert_eq!(metrics.order_flow_metrics.avg_spread_bps, 2.5);
1326 assert_eq!(metrics.health_score, 0.95);
1327 }
1328
1329 #[tokio::test]
1330 async fn test_get_instrument_metrics_not_found() {
1331 let monitor = TestMonitor::new();
1332 let unknown = InstrumentId::new("UNKNOWN-USDT", Venue::Binance);
1333
1334 let result = monitor.get_instrument_metrics(&unknown).await;
1335
1336 assert!(result.is_err());
1337 assert!(result.unwrap_err().to_string().contains("not found"));
1338 }
1339
1340 #[tokio::test]
1341 async fn test_export_prometheus_metrics() {
1342 let monitor = TestMonitor::new();
1343
1344 let prometheus_output = monitor.export_prometheus_metrics().await.unwrap();
1345
1346 // Check for required metric types
1347 assert!(prometheus_output.contains("# HELP hft_instruments_total"));
1348 assert!(prometheus_output.contains("# TYPE hft_instruments_total gauge"));
1349 assert!(prometheus_output.contains("hft_instruments_total 2"));
1350
1351 assert!(prometheus_output.contains("# HELP hft_messages_processed_total"));
1352 assert!(prometheus_output.contains("# TYPE hft_messages_processed_total counter"));
1353
1354 assert!(prometheus_output.contains("# HELP hft_latency_nanoseconds"));
1355 assert!(prometheus_output.contains("# TYPE hft_latency_nanoseconds histogram"));
1356
1357 // Check for instrument-specific metrics - fixed to match actual format
1358 assert!(prometheus_output.contains("exchange=\"Binance\",instrument=\"BTC-USDT\""));
1359 assert!(prometheus_output.contains("hft_latency_nanoseconds_bucket"));
1360 assert!(prometheus_output.contains("hft_health_score"));
1361 assert!(prometheus_output.contains("hft_anomaly_score"));
1362
1363 // Check for alerts metric
1364 assert!(prometheus_output.contains("# HELP hft_alerts_active"));
1365 assert!(prometheus_output.contains("hft_alerts_active 0"));
1366 }
1367
1368 #[tokio::test]
1369 async fn test_get_health_status_healthy() {
1370 let monitor = TestMonitor::new();
1371
1372 let health = monitor.get_health_status().await.unwrap();
1373
1374 assert_eq!(health.overall_status, HealthState::Healthy);
1375 assert_eq!(health.performance_grade, 'A');
1376 assert!(health.active_issues.is_empty());
1377 assert_eq!(health.component_health.len(), 2);
1378
1379 let ws_health = &health.component_health[0];
1380 assert_eq!(ws_health.component_name, "WebSocket Connection");
1381 assert_eq!(ws_health.status, HealthState::Healthy);
1382 assert_eq!(ws_health.error_rate, 0.0);
1383 }
1384
1385 #[tokio::test]
1386 async fn test_get_health_status_degraded() {
1387 let monitor = TestMonitor::new();
1388 monitor.simulate_degraded_health();
1389
1390 let health = monitor.get_health_status().await.unwrap();
1391
1392 assert_eq!(health.overall_status, HealthState::Degraded);
1393 assert_eq!(health.performance_grade, 'B');
1394 assert!(!health.active_issues.is_empty());
1395
1396 let issue = &health.active_issues[0];
1397 assert_eq!(issue.severity, IssueSeverity::Critical);
1398 assert!(issue.description.contains("High error rate"));
1399 }
1400
1401 #[tokio::test]
1402 async fn test_get_health_status_with_high_latency() {
1403 let monitor = TestMonitor::new();
1404 let btc_usdt = InstrumentId::new("BTC-USDT", Venue::Binance);
1405 monitor.simulate_high_latency(&btc_usdt);
1406
1407 let health = monitor.get_health_status().await.unwrap();
1408
1409 // Should have a latency warning
1410 let latency_issues: Vec<_> = health
1411 .active_issues
1412 .iter()
1413 .filter(|i| i.description.contains("High latency"))
1414 .collect();
1415
1416 assert_eq!(latency_issues.len(), 1);
1417 assert_eq!(latency_issues[0].severity, IssueSeverity::Warning);
1418 assert!(latency_issues[0].component.contains("BTC-USDT"));
1419 }
1420
1421 #[tokio::test]
1422 async fn test_enable_alert_success() {
1423 let mut monitor = TestMonitor::new();
1424
1425 let alert_config = AlertConfig {
1426 name: "Test Alert".to_string(),
1427 condition: AlertCondition::LatencyThreshold {
1428 instrument_id: None,
1429 },
1430 threshold: 5_000_000.0, // 5ms
1431 duration_ms: 1000,
1432 severity: IssueSeverity::Warning,
1433 enabled: true,
1434 };
1435
1436 let handle = monitor.enable_alert(alert_config.clone()).await.unwrap();
1437
1438 assert_eq!(handle.0, 1);
1439 assert_eq!(monitor.alerts.read().len(), 1);
1440 assert_eq!(
1441 monitor.alerts.read().get(&handle).unwrap().name,
1442 "Test Alert"
1443 );
1444 }
1445}