rusty_feeder/provider/
ext.rs

1//! Extension trait for Provider to provide additional functionality
2//!
3//! This module provides an extension trait that adds convenience methods
4//! to all Provider implementations, making them easier to use in a unified way.
5
6use anyhow::Result;
7use async_trait::async_trait;
8use rusty_model::{
9    data::{book_snapshot::OrderBookSnapshot, market_trade::MarketTrade},
10    instruments::{Instrument, InstrumentId},
11};
12
13use super::connection::ConnectionStats;
14use crate::exchange::binance::futures::provider::{
15    BinanceFuturesMarketType, BinanceFuturesProvider,
16};
17
18// Re-export all sub-modules
19pub mod alerts;
20pub mod health;
21pub mod metrics;
22
23// Re-export all types from sub-modules for convenience
24pub use alerts::{AlertCondition, AlertConfig, AlertHandle, AnomalyAlgorithm, AnomalyConfig};
25pub use health::{ComponentHealth, HealthIssue, HealthState, HealthStatus, IssueSeverity};
26pub use metrics::{
27    AggregatedMetrics, CustomMetric, ExecutionQualityMetrics, InstrumentMetrics, LatencyBucket,
28    LatencyDistribution, LatencyMetrics, MetricsCollector, MicrostructureSignals, OrderFlowMetrics,
29    TradingMetrics,
30};
31
32/// Data handler trait for processing market data
33#[async_trait]
34pub trait DataHandler: Send + Sync {
35    /// Handle incoming market trade
36    async fn handle_market_trade(&self, trade: MarketTrade) -> Result<()>;
37
38    /// Handle incoming orderbook snapshot
39    async fn handle_orderbook_snapshot(&self, snapshot: OrderBookSnapshot) -> Result<()>;
40}
41
42/// Advanced monitoring capabilities for High-Frequency Trading (HFT) systems
43///
44/// The `MonitorExt` trait provides comprehensive monitoring, alerting, and observability features
45/// specifically designed for HFT systems that require nanosecond precision and real-time performance
46/// tracking. This trait extends basic monitoring with advanced analytics including:
47///
48/// - **Real-time Performance Metrics**: Track latency distributions, throughput, and system health
49/// - **Trading Analytics**: Monitor order flow, market impact, and microstructure signals
50/// - **Anomaly Detection**: Identify unusual patterns in trading data and system behavior
51/// - **Alerting System**: Configure alerts for critical conditions with customizable thresholds
52/// - **Health Monitoring**: Component-level health tracking with degradation detection
53/// - **Custom Metrics**: Extensible framework for domain-specific metric collection
54///
55/// # Design Philosophy
56///
57/// The trait is designed with HFT requirements in mind:
58/// - All timestamps use nanosecond precision
59/// - Zero-allocation operations where possible
60/// - Lock-free metrics collection for minimal overhead
61/// - Configurable aggregation intervals to balance accuracy vs performance
62///
63/// # Usage Example
64///
65/// ```rust,no_run
66/// use rusty_feeder::provider::ext::{MonitorExt, AlertConfig, AlertCondition, IssueSeverity};
67/// use rusty_model::instruments::InstrumentId;
68/// use rusty_model::venues::Venue;
69///
70/// async fn monitor_trading_system(monitor: &impl MonitorExt) -> anyhow::Result<()> {
71///     // Configure latency alert
72///     let alert_config = AlertConfig {
73///         name: "High Latency Alert".to_string(),
74///         condition: AlertCondition::LatencyThreshold { instrument_id: None },
75///         threshold: 1_000_000.0, // 1ms in nanoseconds
76///         duration_ms: 5000,      // Trigger after 5 seconds
77///         severity: IssueSeverity::Warning,
78///         enabled: true,
79///     };
80///
81///     let alert_handle = monitor.enable_alert(alert_config).await?;
82///
83///     // Get real-time metrics
84///     let btc_usdt = InstrumentId::new("BTC-USDT", Venue::Binance);
85///     let metrics = monitor.get_instrument_metrics(&btc_usdt).await?;
86///     println!("Current latency: {}ns", metrics.latency_metrics.total_latency_ns);
87///
88///     // Export for Prometheus
89///     let prometheus_data = monitor.export_prometheus_metrics().await?;
90///
91///     Ok(())
92/// }
93/// ```
94#[async_trait]
95pub trait MonitorExt {
96    /// Retrieves comprehensive real-time metrics for a specific trading instrument
97    ///
98    /// This method provides a complete snapshot of all monitoring data for a single instrument,
99    /// including connection health, data feed statistics, order flow metrics, and latency
100    /// measurements. All metrics use nanosecond precision timestamps.
101    ///
102    /// # Arguments
103    ///
104    /// * `instrument_id` - The unique identifier of the instrument to query
105    ///
106    /// # Returns
107    ///
108    /// * `Ok(InstrumentMetrics)` - Complete metrics snapshot for the instrument
109    /// * `Err` - If the instrument is not found or monitoring is not available
110    ///
111    /// # Example
112    ///
113    /// ```rust,no_run
114    /// # use rusty_model::instruments::InstrumentId;
115    /// # use rusty_model::venues::Venue;
116    /// # async fn example(monitor: &impl rusty_feeder::provider::ext::MonitorExt) -> anyhow::Result<()> {
117    /// let eth_usdt = InstrumentId::new("ETH-USDT", Venue::Binance);
118    /// let metrics = monitor.get_instrument_metrics(&eth_usdt).await?;
119    ///
120    /// // Check order flow imbalance
121    /// if metrics.order_flow_metrics.volume_imbalance.abs() > 0.7 {
122    ///     println!("High volume imbalance detected: {}", metrics.order_flow_metrics.volume_imbalance);
123    /// }
124    ///
125    /// // Monitor latency
126    /// println!("Network latency: {}µs", metrics.latency_metrics.network_latency_ns / 1000);
127    /// println!("Total latency: {}µs", metrics.latency_metrics.total_latency_ns / 1000);
128    /// # Ok(())
129    /// # }
130    /// ```
131    async fn get_instrument_metrics(
132        &self,
133        instrument_id: &InstrumentId,
134    ) -> Result<InstrumentMetrics>;
135
136    /// Retrieves aggregated metrics across all actively monitored instruments
137    ///
138    /// This method provides system-wide performance metrics, useful for monitoring overall
139    /// system health and identifying cross-instrument issues. Aggregated metrics include
140    /// total message throughput, average latencies, memory usage, and health scores.
141    ///
142    /// # Returns
143    ///
144    /// * `Ok(AggregatedMetrics)` - System-wide aggregated metrics
145    /// * `Err` - If monitoring subsystem is unavailable
146    ///
147    /// # Example
148    ///
149    /// ```rust,no_run
150    /// # async fn example(monitor: &impl rusty_feeder::provider::ext::MonitorExt) -> anyhow::Result<()> {
151    /// let system_metrics = monitor.get_aggregated_metrics().await?;
152    ///
153    /// // Check system health
154    /// if system_metrics.overall_health_score < 0.8 {
155    ///     println!("System health degraded: {:.2}", system_metrics.overall_health_score);
156    ///     println!("Active alerts: {}", system_metrics.active_alerts);
157    /// }
158    ///
159    /// // Monitor performance
160    /// println!("Processing {} messages/sec",
161    ///     system_metrics.total_messages_processed / (system_metrics.timestamp / 1_000_000_000));
162    /// println!("P99 latency: {}µs", system_metrics.p99_latency_ns / 1000);
163    /// # Ok(())
164    /// # }
165    /// ```
166    async fn get_aggregated_metrics(&self) -> Result<AggregatedMetrics>;
167
168    /// Registers a custom metrics collector for domain-specific monitoring
169    ///
170    /// This method allows extending the monitoring system with custom metrics collectors
171    /// that implement the `MetricsCollector` trait. Collectors are invoked periodically
172    /// and their metrics are included in aggregated reports and exports.
173    ///
174    /// # Arguments
175    ///
176    /// * `collector` - A boxed implementation of the `MetricsCollector` trait
177    ///
178    /// # Returns
179    ///
180    /// * `Ok(())` - If the collector was successfully registered
181    /// * `Err` - If registration failed (e.g., duplicate name)
182    ///
183    /// # Example
184    ///
185    /// ```rust,no_run
186    /// # use rusty_feeder::provider::ext::{MetricsCollector, CustomMetric};
187    /// # use async_trait::async_trait;
188    /// # use anyhow::Result;
189    /// # use rustc_hash::FxHashMap;
190    ///
191    /// struct SpreadMonitor;
192    ///
193    /// #[async_trait]
194    /// impl MetricsCollector for SpreadMonitor {
195    ///     async fn collect(&self) -> Result<Vec<CustomMetric>> {
196    ///         // Calculate custom spread metrics
197    ///         let mut tags = FxHashMap::default();
198    ///         tags.insert("exchange".to_string(), "binance".to_string());
199    ///
200    ///         Ok(vec![
201    ///             CustomMetric {
202    ///                 name: "custom_spread_bps".to_string(),
203    ///                 value: 2.5,
204    ///                 tags,
205    ///                 timestamp: quanta::Instant::now().as_nanos() as u64,
206    ///             }
207    ///         ])
208    ///     }
209    ///
210    ///     fn name(&self) -> &str {
211    ///         "spread_monitor"
212    ///     }
213    /// }
214    ///
215    /// # async fn example(monitor: &mut impl rusty_feeder::provider::ext::MonitorExt) -> Result<()> {
216    /// monitor.register_metrics_collector(Box::new(SpreadMonitor)).await?;
217    /// # Ok(())
218    /// # }
219    /// ```
220    async fn register_metrics_collector(
221        &mut self,
222        collector: Box<dyn MetricsCollector>,
223    ) -> Result<()>;
224
225    /// Enables a real-time alert for specific monitoring conditions
226    ///
227    /// This method configures and activates an alert that will trigger when the specified
228    /// condition is met for the configured duration. Alerts can monitor various conditions
229    /// including latency thresholds, error rates, connection status, and custom metrics.
230    ///
231    /// # Arguments
232    ///
233    /// * `alert_config` - Configuration specifying the alert condition, threshold, and behavior
234    ///
235    /// # Returns
236    ///
237    /// * `Ok(AlertHandle)` - A handle for managing the active alert
238    /// * `Err` - If the alert configuration is invalid
239    ///
240    /// # Example
241    ///
242    /// ```rust,no_run
243    /// # use rusty_feeder::provider::ext::{AlertConfig, AlertCondition, IssueSeverity};
244    /// # use rusty_model::instruments::InstrumentId;
245    /// # use rusty_model::venues::Venue;
246    /// # async fn example(monitor: &mut impl rusty_feeder::provider::ext::MonitorExt) -> anyhow::Result<()> {
247    /// // Alert for high latency on specific instrument
248    /// let btc_instrument = InstrumentId::new("BTC-USDT", Venue::Binance);
249    /// let latency_alert = AlertConfig {
250    ///     name: "BTC-USDT High Latency".to_string(),
251    ///     condition: AlertCondition::LatencyThreshold {
252    ///         instrument_id: Some(btc_instrument),
253    ///     },
254    ///     threshold: 5_000_000.0,  // 5ms in nanoseconds
255    ///     duration_ms: 10_000,     // Sustained for 10 seconds
256    ///     severity: IssueSeverity::Error,
257    ///     enabled: true,
258    /// };
259    ///
260    /// let alert_handle = monitor.enable_alert(latency_alert).await?;
261    ///
262    /// // Alert for connection loss
263    /// let connection_alert = AlertConfig {
264    ///     name: "Binance Connection Lost".to_string(),
265    ///     condition: AlertCondition::ConnectionLoss {
266    ///         exchange: "binance".to_string(),
267    ///     },
268    ///     threshold: 1.0,          // Binary condition
269    ///     duration_ms: 0,          // Immediate
270    ///     severity: IssueSeverity::Critical,
271    ///     enabled: true,
272    /// };
273    ///
274    /// let conn_handle = monitor.enable_alert(connection_alert).await?;
275    /// # Ok(())
276    /// # }
277    /// ```
278    async fn enable_alert(&mut self, alert_config: AlertConfig) -> Result<AlertHandle>;
279
280    /// Disables an active alert using its handle
281    ///
282    /// This method deactivates a previously configured alert. The alert will stop
283    /// evaluating its condition and no new notifications will be generated.
284    ///
285    /// # Arguments
286    ///
287    /// * `handle` - The handle returned when the alert was enabled
288    ///
289    /// # Returns
290    ///
291    /// * `Ok(())` - If the alert was successfully disabled
292    /// * `Err` - If the handle is invalid or the alert was already disabled
293    ///
294    /// # Example
295    ///
296    /// ```rust,no_run
297    /// # use rusty_feeder::provider::ext::AlertHandle;
298    /// # async fn example(monitor: &mut impl rusty_feeder::provider::ext::MonitorExt, alert_handle: AlertHandle) -> anyhow::Result<()> {
299    /// // Disable an alert after market close
300    /// monitor.disable_alert(alert_handle).await?;
301    /// # Ok(())
302    /// # }
303    /// ```
304    async fn disable_alert(&mut self, handle: AlertHandle) -> Result<()>;
305
306    /// Retrieves the current health status with detailed component diagnostics
307    ///
308    /// This method provides a comprehensive health check of the monitoring system and
309    /// all monitored components. It includes overall system status, individual component
310    /// health, active issues, performance grading, and uptime information.
311    ///
312    /// # Returns
313    ///
314    /// * `Ok(HealthStatus)` - Detailed health information
315    /// * `Err` - If health check fails
316    ///
317    /// # Example
318    ///
319    /// ```rust,no_run
320    /// # use rusty_feeder::provider::ext::HealthState;
321    /// # async fn example(monitor: &impl rusty_feeder::provider::ext::MonitorExt) -> anyhow::Result<()> {
322    /// let health = monitor.get_health_status().await?;
323    ///
324    /// match health.overall_status {
325    ///     HealthState::Healthy => println!("System healthy - Grade: {}", health.performance_grade),
326    ///     HealthState::Degraded => {
327    ///         println!("System degraded - Active issues: {}", health.active_issues.len());
328    ///         for issue in &health.active_issues {
329    ///             println!("  - {}: {}", issue.component, issue.description);
330    ///         }
331    ///     }
332    ///     HealthState::Critical => println!("CRITICAL: System requires immediate attention"),
333    ///     HealthState::Offline => println!("System offline"),
334    /// }
335    ///
336    /// // Check component health
337    /// for component in &health.component_health {
338    ///     if component.error_rate > 0.01 {
339    ///         println!("{} has high error rate: {:.2}%",
340    ///             component.component_name, component.error_rate * 100.0);
341    ///     }
342    /// }
343    /// # Ok(())
344    /// # }
345    /// ```
346    async fn get_health_status(&self) -> Result<HealthStatus>;
347
348    /// Starts anomaly detection for trading patterns and system behavior
349    ///
350    /// This method activates anomaly detection algorithms that monitor for unusual
351    /// patterns in trading data, latency spikes, volume anomalies, and other
352    /// deviations from normal behavior. Multiple algorithms can run concurrently.
353    ///
354    /// # Arguments
355    ///
356    /// * `config` - Configuration specifying instruments, algorithms, and sensitivity
357    ///
358    /// # Returns
359    ///
360    /// * `Ok(())` - If anomaly detection was successfully started
361    /// * `Err` - If configuration is invalid or detection is already running
362    ///
363    /// # Example
364    ///
365    /// ```rust,no_run
366    /// # use rusty_feeder::provider::ext::{AnomalyConfig, AnomalyAlgorithm};
367    /// # use rusty_model::instruments::InstrumentId;
368    /// # use rusty_model::venues::Venue;
369    /// # async fn example(monitor: &mut impl rusty_feeder::provider::ext::MonitorExt) -> anyhow::Result<()> {
370    /// let config = AnomalyConfig {
371    ///     instruments: vec![
372    ///         InstrumentId::new("BTC-USDT", Venue::Binance),
373    ///         InstrumentId::new("ETH-USDT", Venue::Binance),
374    ///     ],
375    ///     sensitivity: 0.8,  // High sensitivity
376    ///     window_size_ms: 60_000,  // 1 minute windows
377    ///     algorithms: vec![
378    ///         AnomalyAlgorithm::StatisticalOutlier {
379    ///             threshold_std_dev: 3.0,
380    ///         },
381    ///         AnomalyAlgorithm::VolumeAnomaly {
382    ///             volume_threshold: 5.0,  // 5x normal volume
383    ///         },
384    ///         AnomalyAlgorithm::LatencyAnomaly {
385    ///             latency_threshold_ns: 10_000_000,  // 10ms
386    ///         },
387    ///     ],
388    /// };
389    ///
390    /// monitor.start_anomaly_detection(config).await?;
391    /// # Ok(())
392    /// # }
393    /// ```
394    async fn start_anomaly_detection(&mut self, config: AnomalyConfig) -> Result<()>;
395
396    /// Stops all active anomaly detection algorithms
397    ///
398    /// This method deactivates all running anomaly detection algorithms and clears
399    /// any accumulated detection state.
400    ///
401    /// # Returns
402    ///
403    /// * `Ok(())` - If anomaly detection was successfully stopped
404    /// * `Err` - If no detection was running
405    ///
406    /// # Example
407    ///
408    /// ```rust,no_run
409    /// # async fn example(monitor: &mut impl rusty_feeder::provider::ext::MonitorExt) -> anyhow::Result<()> {
410    /// // Stop detection during maintenance window
411    /// monitor.stop_anomaly_detection().await?;
412    /// # Ok(())
413    /// # }
414    /// ```
415    async fn stop_anomaly_detection(&mut self) -> Result<()>;
416
417    /// Retrieves detailed latency distribution statistics for an instrument
418    ///
419    /// This method provides a comprehensive breakdown of latency measurements including
420    /// percentiles (p50, p90, p95, p99, p99.9), maximum values, and histogram buckets.
421    /// Useful for identifying latency patterns and optimizing performance.
422    ///
423    /// # Arguments
424    ///
425    /// * `instrument_id` - The instrument to analyze
426    ///
427    /// # Returns
428    ///
429    /// * `Ok(LatencyDistribution)` - Detailed latency statistics
430    /// * `Err` - If instrument is not found or has insufficient data
431    ///
432    /// # Example
433    ///
434    /// ```rust,no_run
435    /// # use rusty_model::instruments::InstrumentId;
436    /// # use rusty_model::venues::Venue;
437    /// # async fn example(monitor: &impl rusty_feeder::provider::ext::MonitorExt) -> anyhow::Result<()> {
438    /// let btc_usdt = InstrumentId::new("BTC-USDT", Venue::Binance);
439    /// let latency = monitor.get_latency_distribution(&btc_usdt).await?;
440    ///
441    /// println!("Latency percentiles for BTC-USDT:");
442    /// println!("  P50: {}µs", latency.p50_ns / 1000);
443    /// println!("  P90: {}µs", latency.p90_ns / 1000);
444    /// println!("  P95: {}µs", latency.p95_ns / 1000);
445    /// println!("  P99: {}µs", latency.p99_ns / 1000);
446    /// println!("  P99.9: {}µs", latency.p99_9_ns / 1000);
447    /// println!("  Max: {}µs", latency.max_ns / 1000);
448    ///
449    /// // Analyze histogram
450    /// for bucket in &latency.histogram {
451    ///     println!("  {}µs-{}µs: {} samples",
452    ///         bucket.min_ns / 1000, bucket.max_ns / 1000, bucket.count);
453    /// }
454    /// # Ok(())
455    /// # }
456    /// ```
457    async fn get_latency_distribution(
458        &self,
459        instrument_id: &InstrumentId,
460    ) -> Result<LatencyDistribution>;
461
462    /// Exports all metrics in Prometheus exposition format
463    ///
464    /// This method generates a text representation of all metrics following the
465    /// Prometheus exposition format. The output can be scraped by Prometheus or
466    /// compatible monitoring systems. Includes all standard metrics, custom metrics,
467    /// and alert states.
468    ///
469    /// # Returns
470    ///
471    /// * `Ok(String)` - Prometheus-formatted metrics text
472    /// * `Err` - If export fails
473    ///
474    /// # Output Format
475    ///
476    /// ```text
477    /// # HELP hft_latency_nanoseconds End-to-end latency in nanoseconds
478    /// # TYPE hft_latency_nanoseconds histogram
479    /// hft_latency_nanoseconds_bucket{exchange="binance",instrument="BTC-USDT",le="1000"} 245
480    /// hft_latency_nanoseconds_bucket{exchange="binance",instrument="BTC-USDT",le="5000"} 1823
481    /// hft_latency_nanoseconds_sum{exchange="binance",instrument="BTC-USDT"} 4523000
482    /// hft_latency_nanoseconds_count{exchange="binance",instrument="BTC-USDT"} 2068
483    ///
484    /// # HELP hft_messages_total Total messages processed
485    /// # TYPE hft_messages_total counter
486    /// hft_messages_total{exchange="binance",type="trade"} 150234
487    /// hft_messages_total{exchange="binance",type="orderbook"} 892341
488    /// ```
489    ///
490    /// # Example
491    ///
492    /// ```rust,no_run
493    /// # async fn example(monitor: &impl rusty_feeder::provider::ext::MonitorExt) -> anyhow::Result<()> {
494    /// let prometheus_data = monitor.export_prometheus_metrics().await?;
495    ///
496    /// // Write to file for Prometheus to scrape
497    /// std::fs::write("/var/lib/prometheus/hft_metrics.prom", prometheus_data)?;
498    ///
499    /// // Or serve via HTTP endpoint
500    /// // warp::reply::with_header(prometheus_data, "Content-Type", "text/plain; version=0.0.4")
501    /// # Ok(())
502    /// # }
503    /// ```
504    async fn export_prometheus_metrics(&self) -> Result<String>;
505
506    /// Resets all monitoring statistics to their initial state
507    ///
508    /// This method clears all accumulated metrics, counters, and statistics while
509    /// preserving configuration (alerts, collectors, etc.). Useful for starting
510    /// fresh measurements or after system maintenance.
511    ///
512    /// # Returns
513    ///
514    /// * `Ok(())` - If statistics were successfully reset
515    /// * `Err` - If reset fails
516    ///
517    /// # Example
518    ///
519    /// ```rust,no_run
520    /// # async fn example(monitor: &mut impl rusty_feeder::provider::ext::MonitorExt) -> anyhow::Result<()> {
521    /// // Reset stats at market open
522    /// monitor.reset_statistics().await?;
523    /// println!("Statistics reset for new trading session");
524    /// # Ok(())
525    /// # }
526    /// ```
527    async fn reset_statistics(&mut self) -> Result<()>;
528
529    /// Retrieves advanced trading-specific metrics for an instrument
530    ///
531    /// This method provides sophisticated trading analytics including order flow
532    /// imbalance, VWAP/TWAP calculations, microstructure signals, and execution
533    /// quality metrics. These metrics are essential for HFT strategy development
534    /// and performance analysis.
535    ///
536    /// # Arguments
537    ///
538    /// * `instrument_id` - The instrument to analyze
539    ///
540    /// # Returns
541    ///
542    /// * `Ok(TradingMetrics)` - Comprehensive trading analytics
543    /// * `Err` - If instrument is not found or has insufficient data
544    ///
545    /// # Example
546    ///
547    /// ```rust,no_run
548    /// # use rusty_model::instruments::InstrumentId;
549    /// # use rusty_model::venues::Venue;
550    /// # async fn example(monitor: &impl rusty_feeder::provider::ext::MonitorExt) -> anyhow::Result<()> {
551    /// let eth_usdt = InstrumentId::new("ETH-USDT", Venue::Binance);
552    /// let trading_metrics = monitor.get_trading_metrics(&eth_usdt).await?;
553    ///
554    /// // Check microstructure signals
555    /// let signals = &trading_metrics.microstructure_signals;
556    /// if signals.book_imbalance > 0.7 && signals.momentum > 0.5 {
557    ///     println!("Strong bullish signal detected");
558    ///     println!("  Book imbalance: {:.3}", signals.book_imbalance);
559    ///     println!("  Momentum: {:.3}", signals.momentum);
560    ///     println!("  Alpha signal: {:.3}", signals.alpha_signal);
561    /// }
562    ///
563    /// // Analyze execution quality
564    /// let exec_quality = &trading_metrics.execution_quality;
565    /// println!("Execution metrics:");
566    /// println!("  Slippage: {:.2} bps", exec_quality.slippage_bps);
567    /// println!("  Market impact: {:.2} bps", exec_quality.market_impact_bps);
568    /// println!("  Fill rate: {:.1}%", exec_quality.fill_rate * 100.0);
569    ///
570    /// // Price analysis
571    /// println!("VWAP: {}", trading_metrics.vwap);
572    /// println!("TWAP: {}", trading_metrics.twap);
573    /// # Ok(())
574    /// # }
575    /// ```
576    async fn get_trading_metrics(&self, instrument_id: &InstrumentId) -> Result<TradingMetrics>;
577}
578
579/// Extension trait for new provider implementations
580#[async_trait]
581pub trait ProviderExt {
582    /// Connect to the exchange
583    async fn connect(&mut self) -> Result<()>;
584
585    /// Disconnect from the exchange
586    async fn disconnect(&mut self) -> Result<()>;
587
588    /// Subscribe to orderbook updates for an instrument
589    async fn subscribe_orderbook(
590        &mut self,
591        instrument_id: &InstrumentId,
592        depth: usize,
593    ) -> Result<()>;
594
595    /// Subscribe to trade updates for an instrument
596    async fn subscribe_trades(&mut self, instrument_id: &InstrumentId) -> Result<()>;
597
598    /// Unsubscribe from orderbook updates
599    async fn unsubscribe_orderbook(&mut self, instrument_id: &InstrumentId) -> Result<()>;
600
601    /// Unsubscribe from trade updates
602    async fn unsubscribe_trades(&mut self, instrument_id: &InstrumentId) -> Result<()>;
603
604    /// Check if connected
605    fn is_connected(&self) -> bool;
606
607    /// Get connection statistics
608    fn connection_stats(&self) -> ConnectionStats;
609
610    /// Run the provider with a data handler
611    async fn run_with_handler(&mut self, handler: Box<dyn DataHandler + Send + Sync>)
612    -> Result<()>;
613
614    /// Fetch available instruments
615    async fn fetch_instruments(&self) -> Result<Vec<Box<dyn Instrument>>>;
616}
617
618// Note: Since the new provider system doesn't follow the old Provider trait,
619// we need manual implementations for each provider type.
620// This is actually better for the refactoring - we can have explicit control.
621
622/// Unified provider interface for the monitor system
623pub enum UnifiedProvider {
624    /// Binance futures market provider
625    BinanceFutures(Box<BinanceFuturesProvider>),
626    // Add other providers as they're migrated to the new system
627}
628
629impl UnifiedProvider {
630    /// Create a new provider based on exchange name
631    pub fn new(exchange_name: &str) -> Result<Self> {
632        match exchange_name.to_lowercase().as_str() {
633            "binance" | "binance_futures" => {
634                use crate::ConnectionConfig;
635                let provider = BinanceFuturesProvider::new(
636                    ConnectionConfig::default(),
637                    BinanceFuturesMarketType::UsdM,
638                );
639                Ok(Self::BinanceFutures(Box::new(provider)))
640            }
641            _ => Err(anyhow::anyhow!("Unsupported exchange: {}", exchange_name)),
642        }
643    }
644
645    /// Get the exchange name
646    pub fn name(&self) -> &str {
647        match self {
648            Self::BinanceFutures(p) => p.name(),
649        }
650    }
651}
652
653#[async_trait]
654impl ProviderExt for UnifiedProvider {
655    async fn connect(&mut self) -> Result<()> {
656        match self {
657            Self::BinanceFutures(p) => p.connect().await,
658        }
659    }
660
661    async fn disconnect(&mut self) -> Result<()> {
662        match self {
663            Self::BinanceFutures(p) => p.disconnect().await,
664        }
665    }
666
667    async fn subscribe_orderbook(
668        &mut self,
669        instrument_id: &InstrumentId,
670        depth: usize,
671    ) -> Result<()> {
672        match self {
673            Self::BinanceFutures(p) => p.subscribe_orderbook(instrument_id, depth).await,
674        }
675    }
676
677    async fn subscribe_trades(&mut self, instrument_id: &InstrumentId) -> Result<()> {
678        match self {
679            Self::BinanceFutures(p) => p.subscribe_trades(instrument_id).await,
680        }
681    }
682
683    async fn unsubscribe_orderbook(&mut self, instrument_id: &InstrumentId) -> Result<()> {
684        match self {
685            Self::BinanceFutures(p) => p.unsubscribe_orderbook(instrument_id).await,
686        }
687    }
688
689    async fn unsubscribe_trades(&mut self, instrument_id: &InstrumentId) -> Result<()> {
690        match self {
691            Self::BinanceFutures(p) => p.unsubscribe_trades(instrument_id).await,
692        }
693    }
694
695    fn is_connected(&self) -> bool {
696        match self {
697            Self::BinanceFutures(p) => p.is_connected(),
698        }
699    }
700
701    fn connection_stats(&self) -> ConnectionStats {
702        match self {
703            Self::BinanceFutures(p) => p.connection_stats(),
704        }
705    }
706
707    async fn run_with_handler(
708        &mut self,
709        handler: Box<dyn DataHandler + Send + Sync>,
710    ) -> Result<()> {
711        match self {
712            Self::BinanceFutures(p) => p.run_with_handler(handler).await,
713        }
714    }
715
716    async fn fetch_instruments(&self) -> Result<Vec<Box<dyn Instrument>>> {
717        match self {
718            Self::BinanceFutures(p) => p.fetch_instruments().await,
719        }
720    }
721}
722
723#[cfg(test)]
724mod tests {
725    use super::*;
726    use crate::provider::connection::ConnectionState;
727    use anyhow::Result;
728    use async_trait::async_trait;
729    use parking_lot::RwLock;
730    use quanta::Clock;
731    use rust_decimal::Decimal;
732    use rustc_hash::FxHashMap;
733    use rusty_model::venues::Venue;
734    use std::sync::Arc;
735    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
736
737    /// Test implementation of MonitorExt for unit testing
738    struct TestMonitor {
739        /// Simulated metrics data for testing
740        instrument_metrics: Arc<RwLock<FxHashMap<InstrumentId, InstrumentMetrics>>>,
741        /// Track enabled alerts
742        alerts: Arc<RwLock<FxHashMap<AlertHandle, AlertConfig>>>,
743        /// Next alert handle ID
744        next_alert_id: AtomicU64,
745        /// Track registered metrics collectors
746        collectors: Arc<RwLock<Vec<Box<dyn MetricsCollector>>>>,
747        /// Track if anomaly detection is active
748        anomaly_detection_active: AtomicBool,
749        /// Track anomaly config
750        anomaly_config: Arc<RwLock<Option<AnomalyConfig>>>,
751        /// Health state for testing
752        health_state: Arc<RwLock<HealthState>>,
753        /// Connection errors for testing
754        connection_errors: AtomicU64,
755        /// Clock for consistent time
756        clock: Clock,
757        /// Simulated stats that can be reset
758        stats: Arc<RwLock<TestStats>>,
759    }
760
761    #[derive(Default)]
762    struct TestStats {
763        messages_received: u64,
764        messages_dropped: u64,
765        total_latency_ns: u64,
766        latency_samples: u64,
767    }
768
769    impl TestMonitor {
770        fn new() -> Self {
771            let clock = Clock::new();
772
773            // Create default metrics for testing
774            let mut metrics_map = FxHashMap::default();
775
776            // Add sample BTC-USDT metrics
777            let btc_usdt = InstrumentId::new("BTC-USDT", Venue::Binance);
778            metrics_map.insert(
779                btc_usdt.clone(),
780                Self::create_sample_metrics(btc_usdt, &clock),
781            );
782
783            // Add sample ETH-USDT metrics
784            let eth_usdt = InstrumentId::new("ETH-USDT", Venue::Binance);
785            metrics_map.insert(
786                eth_usdt.clone(),
787                Self::create_sample_metrics(eth_usdt, &clock),
788            );
789
790            Self {
791                instrument_metrics: Arc::new(RwLock::new(metrics_map)),
792                alerts: Arc::new(RwLock::new(FxHashMap::default())),
793                next_alert_id: AtomicU64::new(1),
794                collectors: Arc::new(RwLock::new(Vec::new())),
795                anomaly_detection_active: AtomicBool::new(false),
796                anomaly_config: Arc::new(RwLock::new(None)),
797                health_state: Arc::new(RwLock::new(HealthState::Healthy)),
798                connection_errors: AtomicU64::new(0),
799                clock,
800                stats: Arc::new(RwLock::new(TestStats::default())),
801            }
802        }
803
804        fn create_sample_metrics(instrument_id: InstrumentId, clock: &Clock) -> InstrumentMetrics {
805            use crate::feeder::FeedStats;
806
807            InstrumentMetrics {
808                instrument_id,
809                feed_stats: {
810                    let mut stats = FeedStats::default();
811                    stats.messages_processed = 10000;
812                    stats.messages_per_second = 150.0;
813                    stats.avg_process_latency_ns = 500_000; // 0.5ms
814                    stats.max_process_latency_ns = 2_000_000; // 2ms
815                    stats.p99_process_latency_ns = 1_500_000; // 1.5ms
816                    stats.dropped_messages = 5;
817                    stats.last_update_time = clock.raw();
818                    stats.memory_usage_bytes = 1024 * 1024; // 1MB
819                    stats
820                },
821                connection_stats: ConnectionStats {
822                    messages_received: 10000,
823                    messages_sent: 0,
824                    reconnections: 0,
825                    bytes_received: 1_000_000,
826                    bytes_sent: 0,
827                    last_message_time: clock.raw() - 100_000_000, // 100ms ago
828                    last_ping_time: clock.raw() - 30_000_000_000, // 30s ago
829                    last_pong_time: clock.raw() - 30_000_000_000, // 30s ago
830                    avg_latency_ns: 1_000_000,                    // 1ms
831                    connected_time: clock.raw() - 3_600_000_000_000, // 1 hour ago
832                    connection_state: ConnectionState::Connected,
833                    errors: 0,
834                },
835                order_flow_metrics: OrderFlowMetrics {
836                    volume_imbalance: 0.15,
837                    book_pressure: 1.2,
838                    trade_intensity: 25.5,
839                    avg_trade_size: Decimal::new(1500, 2), // 15.00
840                    price_volatility: 0.02,
841                    avg_spread_bps: 2.5,
842                    max_spread_bps: 10.0,
843                    market_impact_bps: 0.5,
844                },
845                latency_metrics: LatencyMetrics {
846                    network_latency_ns: 800_000,    // 0.8ms
847                    parsing_latency_ns: 100_000,    // 0.1ms
848                    processing_latency_ns: 100_000, // 0.1ms
849                    total_latency_ns: 1_000_000,    // 1ms
850                    latency_jitter_ns: 200_000,     // 0.2ms
851                },
852                anomaly_score: 0.05,
853                health_score: 0.95,
854                last_updated: clock.raw(),
855            }
856        }
857
858        fn simulate_degraded_health(&self) {
859            *self.health_state.write() = HealthState::Degraded;
860            self.connection_errors.store(100, Ordering::SeqCst);
861        }
862
863        fn simulate_high_latency(&self, instrument_id: &InstrumentId) {
864            let mut metrics = self.instrument_metrics.write();
865            if let Some(m) = metrics.get_mut(instrument_id) {
866                m.latency_metrics.total_latency_ns = 10_000_000; // 10ms
867                m.latency_metrics.network_latency_ns = 8_000_000; // 8ms
868                m.health_score = 0.7;
869                m.anomaly_score = 0.8;
870            }
871        }
872    }
873
874    #[async_trait]
875    impl MonitorExt for TestMonitor {
876        async fn get_instrument_metrics(
877            &self,
878            instrument_id: &InstrumentId,
879        ) -> Result<InstrumentMetrics> {
880            self.instrument_metrics
881                .read()
882                .get(instrument_id)
883                .cloned()
884                .ok_or_else(|| anyhow::anyhow!("Instrument {} not found", instrument_id))
885        }
886
887        async fn get_aggregated_metrics(&self) -> Result<AggregatedMetrics> {
888            let metrics = self.instrument_metrics.read();
889            let stats = self.stats.read();
890
891            let total_messages: u64 = metrics
892                .values()
893                .map(|m| m.feed_stats.messages_processed)
894                .sum();
895
896            let total_dropped: u64 = metrics
897                .values()
898                .map(|m| m.feed_stats.dropped_messages)
899                .sum();
900
901            let avg_latency = if stats.latency_samples > 0 {
902                stats.total_latency_ns / stats.latency_samples
903            } else {
904                metrics
905                    .values()
906                    .map(|m| m.latency_metrics.total_latency_ns)
907                    .sum::<u64>()
908                    / metrics.len().max(1) as u64
909            };
910
911            let max_latency = metrics
912                .values()
913                .map(|m| m.latency_metrics.total_latency_ns)
914                .max()
915                .unwrap_or(0);
916
917            let p99_latency = metrics
918                .values()
919                .map(|m| m.feed_stats.p99_process_latency_ns)
920                .max()
921                .unwrap_or(0);
922
923            let overall_health =
924                metrics.values().map(|m| m.health_score).sum::<f64>() / metrics.len().max(1) as f64;
925
926            Ok(AggregatedMetrics {
927                total_instruments: metrics.len(),
928                total_messages_processed: total_messages,
929                total_dropped_messages: total_dropped,
930                avg_latency_ns: avg_latency,
931                max_latency_ns: max_latency,
932                p99_latency_ns: p99_latency,
933                total_memory_usage_bytes: metrics
934                    .values()
935                    .map(|m| m.feed_stats.memory_usage_bytes)
936                    .sum(),
937                total_errors: self.connection_errors.load(Ordering::SeqCst) as u32,
938                overall_health_score: overall_health,
939                active_alerts: self.alerts.read().len() as u32,
940                timestamp: self.clock.raw(),
941            })
942        }
943
944        async fn register_metrics_collector(
945            &mut self,
946            collector: Box<dyn MetricsCollector>,
947        ) -> Result<()> {
948            let mut collectors = self.collectors.write();
949
950            // Check for duplicate names
951            let new_name = collector.name();
952            if collectors.iter().any(|c| c.name() == new_name) {
953                return Err(anyhow::anyhow!(
954                    "Collector '{}' already registered",
955                    new_name
956                ));
957            }
958
959            collectors.push(collector);
960            Ok(())
961        }
962
963        async fn enable_alert(&mut self, alert_config: AlertConfig) -> Result<AlertHandle> {
964            if !alert_config.enabled {
965                return Err(anyhow::anyhow!(
966                    "Cannot enable a disabled alert configuration"
967                ));
968            }
969
970            if alert_config.threshold < 0.0
971                && !matches!(
972                    alert_config.condition,
973                    AlertCondition::ConnectionLoss { .. }
974                )
975            {
976                return Err(anyhow::anyhow!(
977                    "Invalid threshold: {} (must be non-negative)",
978                    alert_config.threshold
979                ));
980            }
981
982            let handle = AlertHandle(self.next_alert_id.fetch_add(1, Ordering::SeqCst));
983            self.alerts.write().insert(handle, alert_config);
984            Ok(handle)
985        }
986
987        async fn disable_alert(&mut self, handle: AlertHandle) -> Result<()> {
988            self.alerts
989                .write()
990                .remove(&handle)
991                .ok_or_else(|| anyhow::anyhow!("Alert handle {:?} not found", handle))
992                .map(|_| ())
993        }
994
995        async fn get_health_status(&self) -> Result<HealthStatus> {
996            let health_state = self.health_state.read().clone();
997            let metrics = self.instrument_metrics.read();
998            let errors = self.connection_errors.load(Ordering::SeqCst);
999
1000            let mut component_health = vec![];
1001            let mut active_issues = vec![];
1002
1003            // Check WebSocket health
1004            let ws_health = if errors == 0 {
1005                HealthState::Healthy
1006            } else if errors < 10 {
1007                HealthState::Degraded
1008            } else {
1009                HealthState::Critical
1010            };
1011
1012            component_health.push(ComponentHealth {
1013                component_name: "WebSocket Connection".to_string(),
1014                status: ws_health.clone(),
1015                last_heartbeat: self.clock.raw() - 1_000_000_000, // 1 second ago
1016                error_rate: errors as f64 / 1000.0,
1017                latency_p99_ns: 2_000_000, // 2ms
1018            });
1019
1020            // Add data parser health
1021            component_health.push(ComponentHealth {
1022                component_name: "Data Parser".to_string(),
1023                status: HealthState::Healthy,
1024                last_heartbeat: self.clock.raw() - 500_000_000, // 0.5 seconds ago
1025                error_rate: 0.001,
1026                latency_p99_ns: 100_000, // 0.1ms
1027            });
1028
1029            // Check for issues
1030            if matches!(ws_health, HealthState::Critical) {
1031                active_issues.push(HealthIssue {
1032                    severity: IssueSeverity::Critical,
1033                    component: "WebSocket Connection".to_string(),
1034                    description: "High error rate detected".to_string(),
1035                    first_seen: self.clock.raw() - 300_000_000_000, // 5 minutes ago
1036                    last_seen: self.clock.raw(),
1037                    count: errors as u32,
1038                });
1039            }
1040
1041            // Check for high latency issues
1042            for (instrument_id, metrics) in metrics.iter() {
1043                if metrics.latency_metrics.total_latency_ns > 5_000_000 {
1044                    // 5ms
1045                    active_issues.push(HealthIssue {
1046                        severity: IssueSeverity::Warning,
1047                        component: format!("{instrument_id} Feed"),
1048                        description: format!(
1049                            "High latency: {}ms",
1050                            metrics.latency_metrics.total_latency_ns / 1_000_000
1051                        ),
1052                        first_seen: self.clock.raw() - 60_000_000_000, // 1 minute ago
1053                        last_seen: self.clock.raw(),
1054                        count: 1,
1055                    });
1056                }
1057            }
1058
1059            let performance_grade = match health_state {
1060                HealthState::Healthy => 'A',
1061                HealthState::Degraded => 'B',
1062                HealthState::Critical => 'D',
1063                HealthState::Offline => 'F',
1064            };
1065
1066            Ok(HealthStatus {
1067                overall_status: health_state,
1068                component_health,
1069                active_issues,
1070                performance_grade,
1071                uptime_seconds: 3600, // 1 hour for testing
1072                last_check_time: self.clock.raw(),
1073            })
1074        }
1075
1076        async fn start_anomaly_detection(&mut self, config: AnomalyConfig) -> Result<()> {
1077            if self.anomaly_detection_active.load(Ordering::SeqCst) {
1078                return Err(anyhow::anyhow!("Anomaly detection is already running"));
1079            }
1080
1081            if config.instruments.is_empty() {
1082                return Err(anyhow::anyhow!(
1083                    "No instruments specified for anomaly detection"
1084                ));
1085            }
1086
1087            if config.sensitivity < 0.0 || config.sensitivity > 1.0 {
1088                return Err(anyhow::anyhow!("Sensitivity must be between 0.0 and 1.0"));
1089            }
1090
1091            *self.anomaly_config.write() = Some(config);
1092            self.anomaly_detection_active.store(true, Ordering::SeqCst);
1093            Ok(())
1094        }
1095
1096        async fn stop_anomaly_detection(&mut self) -> Result<()> {
1097            if !self.anomaly_detection_active.load(Ordering::SeqCst) {
1098                return Err(anyhow::anyhow!("Anomaly detection is not running"));
1099            }
1100
1101            self.anomaly_detection_active.store(false, Ordering::SeqCst);
1102            *self.anomaly_config.write() = None;
1103            Ok(())
1104        }
1105
1106        async fn get_latency_distribution(
1107            &self,
1108            instrument_id: &InstrumentId,
1109        ) -> Result<LatencyDistribution> {
1110            let metrics = self.instrument_metrics.read();
1111            let instrument_metrics = metrics
1112                .get(instrument_id)
1113                .ok_or_else(|| anyhow::anyhow!("Instrument {} not found", instrument_id))?;
1114
1115            let base_latency = instrument_metrics.latency_metrics.total_latency_ns;
1116
1117            // Create realistic distribution based on base latency
1118            let histogram = vec![
1119                LatencyBucket {
1120                    min_ns: 0,
1121                    max_ns: base_latency / 2,
1122                    count: 100,
1123                },
1124                LatencyBucket {
1125                    min_ns: base_latency / 2,
1126                    max_ns: base_latency,
1127                    count: 500,
1128                },
1129                LatencyBucket {
1130                    min_ns: base_latency,
1131                    max_ns: base_latency * 2,
1132                    count: 300,
1133                },
1134                LatencyBucket {
1135                    min_ns: base_latency * 2,
1136                    max_ns: base_latency * 5,
1137                    count: 90,
1138                },
1139                LatencyBucket {
1140                    min_ns: base_latency * 5,
1141                    max_ns: base_latency * 10,
1142                    count: 10,
1143                },
1144            ];
1145
1146            Ok(LatencyDistribution {
1147                p50_ns: base_latency,
1148                p90_ns: base_latency * 2,
1149                p95_ns: (base_latency as f64 * 2.5) as u64,
1150                p99_ns: base_latency * 5,
1151                p99_9_ns: base_latency * 8,
1152                max_ns: base_latency * 10,
1153                histogram,
1154            })
1155        }
1156
1157        async fn export_prometheus_metrics(&self) -> Result<String> {
1158            let aggregated = self.get_aggregated_metrics().await?;
1159            let metrics = self.instrument_metrics.read();
1160            let alerts = self.alerts.read();
1161
1162            let mut output = String::new();
1163
1164            // System-wide metrics
1165            output.push_str("# HELP hft_instruments_total Total number of monitored instruments\n");
1166            output.push_str("# TYPE hft_instruments_total gauge\n");
1167            output.push_str(&format!(
1168                "hft_instruments_total {}\n\n",
1169                aggregated.total_instruments
1170            ));
1171
1172            output.push_str("# HELP hft_messages_processed_total Total messages processed\n");
1173            output.push_str("# TYPE hft_messages_processed_total counter\n");
1174            output.push_str(&format!(
1175                "hft_messages_processed_total {}\n\n",
1176                aggregated.total_messages_processed
1177            ));
1178
1179            output.push_str("# HELP hft_messages_dropped_total Total messages dropped\n");
1180            output.push_str("# TYPE hft_messages_dropped_total counter\n");
1181            output.push_str(&format!(
1182                "hft_messages_dropped_total {}\n\n",
1183                aggregated.total_dropped_messages
1184            ));
1185
1186            output.push_str("# HELP hft_latency_nanoseconds End-to-end latency in nanoseconds\n");
1187            output.push_str("# TYPE hft_latency_nanoseconds histogram\n");
1188
1189            // Per-instrument metrics
1190            for (instrument_id, m) in metrics.iter() {
1191                let labels = format!(
1192                    "exchange=\"{}\",instrument=\"{}\"",
1193                    instrument_id.venue.to_str(),
1194                    instrument_id.symbol
1195                );
1196
1197                // Latency histogram
1198                output.push_str(&format!(
1199                    "hft_latency_nanoseconds_bucket{{{}le=\"1000000\"}} {}\n",
1200                    labels,
1201                    if m.latency_metrics.total_latency_ns <= 1_000_000 {
1202                        1000
1203                    } else {
1204                        0
1205                    }
1206                ));
1207                output.push_str(&format!(
1208                    "hft_latency_nanoseconds_bucket{{{}le=\"5000000\"}} {}\n",
1209                    labels,
1210                    if m.latency_metrics.total_latency_ns <= 5_000_000 {
1211                        1000
1212                    } else {
1213                        500
1214                    }
1215                ));
1216                output.push_str(&format!(
1217                    "hft_latency_nanoseconds_bucket{{{labels}le=\"+Inf\"}} 1000\n"
1218                ));
1219                output.push_str(&format!(
1220                    "hft_latency_nanoseconds_sum{{{}}} {}\n",
1221                    labels,
1222                    m.latency_metrics.total_latency_ns * 1000 // Total across samples
1223                ));
1224                output.push_str(&format!(
1225                    "hft_latency_nanoseconds_count{{{labels}}} 1000\n\n"
1226                ));
1227
1228                // Health score
1229                output.push_str(&format!(
1230                    "hft_health_score{{{}}} {}\n",
1231                    labels, m.health_score
1232                ));
1233
1234                // Anomaly score
1235                output.push_str(&format!(
1236                    "hft_anomaly_score{{{}}} {}\n",
1237                    labels, m.anomaly_score
1238                ));
1239            }
1240
1241            // Active alerts
1242            output.push_str("\n# HELP hft_alerts_active Number of active alerts\n");
1243            output.push_str("# TYPE hft_alerts_active gauge\n");
1244            output.push_str(&format!("hft_alerts_active {}\n", alerts.len()));
1245
1246            Ok(output)
1247        }
1248
1249        async fn reset_statistics(&mut self) -> Result<()> {
1250            // Reset stats
1251            *self.stats.write() = TestStats::default();
1252
1253            // Reset error counter
1254            self.connection_errors.store(0, Ordering::SeqCst);
1255
1256            // Reset metrics to baseline
1257            let mut metrics = self.instrument_metrics.write();
1258            for (instrument_id, metric) in metrics.iter_mut() {
1259                *metric = Self::create_sample_metrics(instrument_id.clone(), &self.clock);
1260            }
1261
1262            Ok(())
1263        }
1264
1265        async fn get_trading_metrics(
1266            &self,
1267            instrument_id: &InstrumentId,
1268        ) -> Result<TradingMetrics> {
1269            let metrics = self.instrument_metrics.read();
1270            let _ = metrics
1271                .get(instrument_id)
1272                .ok_or_else(|| anyhow::anyhow!("Instrument {} not found", instrument_id))?;
1273
1274            Ok(TradingMetrics {
1275                order_flow_imbalance: 0.25,
1276                vwap: Decimal::new(4523450, 2), // 45234.50
1277                twap: Decimal::new(4520000, 2), // 45200.00
1278                microstructure_signals: MicrostructureSignals {
1279                    book_imbalance: 0.15,
1280                    trade_sign_accuracy: 0.72,
1281                    alpha_signal: 0.08,
1282                    momentum: 0.35,
1283                    mean_reversion: 0.20,
1284                },
1285                execution_quality: ExecutionQualityMetrics {
1286                    slippage_bps: 1.5,
1287                    market_impact_bps: 0.8,
1288                    fill_rate: 0.95,
1289                    avg_fill_time_ns: 2_500_000, // 2.5ms
1290                    implementation_shortfall_bps: 2.3,
1291                },
1292            })
1293        }
1294    }
1295
1296    // Custom test metrics collector
1297    struct TestMetricsCollector {
1298        name: String,
1299        metrics_to_return: Vec<CustomMetric>,
1300    }
1301
1302    #[async_trait]
1303    impl MetricsCollector for TestMetricsCollector {
1304        async fn collect(&self) -> Result<Vec<CustomMetric>> {
1305            Ok(self.metrics_to_return.clone())
1306        }
1307
1308        fn name(&self) -> &str {
1309            &self.name
1310        }
1311    }
1312
1313    // Tests start here
1314
1315    #[tokio::test]
1316    async fn test_get_instrument_metrics_success() {
1317        let monitor = TestMonitor::new();
1318        let btc_usdt = InstrumentId::new("BTC-USDT", Venue::Binance);
1319
1320        let metrics = monitor.get_instrument_metrics(&btc_usdt).await.unwrap();
1321
1322        assert_eq!(metrics.instrument_id, btc_usdt);
1323        assert_eq!(metrics.feed_stats.messages_processed, 10000);
1324        assert_eq!(metrics.latency_metrics.total_latency_ns, 1_000_000);
1325        assert_eq!(metrics.order_flow_metrics.avg_spread_bps, 2.5);
1326        assert_eq!(metrics.health_score, 0.95);
1327    }
1328
1329    #[tokio::test]
1330    async fn test_get_instrument_metrics_not_found() {
1331        let monitor = TestMonitor::new();
1332        let unknown = InstrumentId::new("UNKNOWN-USDT", Venue::Binance);
1333
1334        let result = monitor.get_instrument_metrics(&unknown).await;
1335
1336        assert!(result.is_err());
1337        assert!(result.unwrap_err().to_string().contains("not found"));
1338    }
1339
1340    #[tokio::test]
1341    async fn test_export_prometheus_metrics() {
1342        let monitor = TestMonitor::new();
1343
1344        let prometheus_output = monitor.export_prometheus_metrics().await.unwrap();
1345
1346        // Check for required metric types
1347        assert!(prometheus_output.contains("# HELP hft_instruments_total"));
1348        assert!(prometheus_output.contains("# TYPE hft_instruments_total gauge"));
1349        assert!(prometheus_output.contains("hft_instruments_total 2"));
1350
1351        assert!(prometheus_output.contains("# HELP hft_messages_processed_total"));
1352        assert!(prometheus_output.contains("# TYPE hft_messages_processed_total counter"));
1353
1354        assert!(prometheus_output.contains("# HELP hft_latency_nanoseconds"));
1355        assert!(prometheus_output.contains("# TYPE hft_latency_nanoseconds histogram"));
1356
1357        // Check for instrument-specific metrics - fixed to match actual format
1358        assert!(prometheus_output.contains("exchange=\"Binance\",instrument=\"BTC-USDT\""));
1359        assert!(prometheus_output.contains("hft_latency_nanoseconds_bucket"));
1360        assert!(prometheus_output.contains("hft_health_score"));
1361        assert!(prometheus_output.contains("hft_anomaly_score"));
1362
1363        // Check for alerts metric
1364        assert!(prometheus_output.contains("# HELP hft_alerts_active"));
1365        assert!(prometheus_output.contains("hft_alerts_active 0"));
1366    }
1367
1368    #[tokio::test]
1369    async fn test_get_health_status_healthy() {
1370        let monitor = TestMonitor::new();
1371
1372        let health = monitor.get_health_status().await.unwrap();
1373
1374        assert_eq!(health.overall_status, HealthState::Healthy);
1375        assert_eq!(health.performance_grade, 'A');
1376        assert!(health.active_issues.is_empty());
1377        assert_eq!(health.component_health.len(), 2);
1378
1379        let ws_health = &health.component_health[0];
1380        assert_eq!(ws_health.component_name, "WebSocket Connection");
1381        assert_eq!(ws_health.status, HealthState::Healthy);
1382        assert_eq!(ws_health.error_rate, 0.0);
1383    }
1384
1385    #[tokio::test]
1386    async fn test_get_health_status_degraded() {
1387        let monitor = TestMonitor::new();
1388        monitor.simulate_degraded_health();
1389
1390        let health = monitor.get_health_status().await.unwrap();
1391
1392        assert_eq!(health.overall_status, HealthState::Degraded);
1393        assert_eq!(health.performance_grade, 'B');
1394        assert!(!health.active_issues.is_empty());
1395
1396        let issue = &health.active_issues[0];
1397        assert_eq!(issue.severity, IssueSeverity::Critical);
1398        assert!(issue.description.contains("High error rate"));
1399    }
1400
1401    #[tokio::test]
1402    async fn test_get_health_status_with_high_latency() {
1403        let monitor = TestMonitor::new();
1404        let btc_usdt = InstrumentId::new("BTC-USDT", Venue::Binance);
1405        monitor.simulate_high_latency(&btc_usdt);
1406
1407        let health = monitor.get_health_status().await.unwrap();
1408
1409        // Should have a latency warning
1410        let latency_issues: Vec<_> = health
1411            .active_issues
1412            .iter()
1413            .filter(|i| i.description.contains("High latency"))
1414            .collect();
1415
1416        assert_eq!(latency_issues.len(), 1);
1417        assert_eq!(latency_issues[0].severity, IssueSeverity::Warning);
1418        assert!(latency_issues[0].component.contains("BTC-USDT"));
1419    }
1420
1421    #[tokio::test]
1422    async fn test_enable_alert_success() {
1423        let mut monitor = TestMonitor::new();
1424
1425        let alert_config = AlertConfig {
1426            name: "Test Alert".to_string(),
1427            condition: AlertCondition::LatencyThreshold {
1428                instrument_id: None,
1429            },
1430            threshold: 5_000_000.0, // 5ms
1431            duration_ms: 1000,
1432            severity: IssueSeverity::Warning,
1433            enabled: true,
1434        };
1435
1436        let handle = monitor.enable_alert(alert_config.clone()).await.unwrap();
1437
1438        assert_eq!(handle.0, 1);
1439        assert_eq!(monitor.alerts.read().len(), 1);
1440        assert_eq!(
1441            monitor.alerts.read().get(&handle).unwrap().name,
1442            "Test Alert"
1443        );
1444    }
1445}