pub trait MonitorExt {
// Required methods
fn get_instrument_metrics<'life0, 'life1, 'async_trait>(
&'life0 self,
instrument_id: &'life1 InstrumentId,
) -> Pin<Box<dyn Future<Output = Result<InstrumentMetrics>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn get_aggregated_metrics<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<AggregatedMetrics>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn register_metrics_collector<'life0, 'async_trait>(
&'life0 mut self,
collector: Box<dyn MetricsCollector>,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn enable_alert<'life0, 'async_trait>(
&'life0 mut self,
alert_config: AlertConfig,
) -> Pin<Box<dyn Future<Output = Result<AlertHandle>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn disable_alert<'life0, 'async_trait>(
&'life0 mut self,
handle: AlertHandle,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn get_health_status<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<HealthStatus>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn start_anomaly_detection<'life0, 'async_trait>(
&'life0 mut self,
config: AnomalyConfig,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn stop_anomaly_detection<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn get_latency_distribution<'life0, 'life1, 'async_trait>(
&'life0 self,
instrument_id: &'life1 InstrumentId,
) -> Pin<Box<dyn Future<Output = Result<LatencyDistribution>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn export_prometheus_metrics<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<String>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn reset_statistics<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn get_trading_metrics<'life0, 'life1, 'async_trait>(
&'life0 self,
instrument_id: &'life1 InstrumentId,
) -> Pin<Box<dyn Future<Output = Result<TradingMetrics>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
}Expand description
Advanced monitoring capabilities for High-Frequency Trading (HFT) systems
The MonitorExt trait provides comprehensive monitoring, alerting, and observability features
specifically designed for HFT systems that require nanosecond precision and real-time performance
tracking. This trait extends basic monitoring with advanced analytics including:
- Real-time Performance Metrics: Track latency distributions, throughput, and system health
- Trading Analytics: Monitor order flow, market impact, and microstructure signals
- Anomaly Detection: Identify unusual patterns in trading data and system behavior
- Alerting System: Configure alerts for critical conditions with customizable thresholds
- Health Monitoring: Component-level health tracking with degradation detection
- Custom Metrics: Extensible framework for domain-specific metric collection
§Design Philosophy
The trait is designed with HFT requirements in mind:
- All timestamps use nanosecond precision
- Zero-allocation operations where possible
- Lock-free metrics collection for minimal overhead
- Configurable aggregation intervals to balance accuracy vs performance
§Usage Example
use rusty_feeder::provider::ext::{MonitorExt, AlertConfig, AlertCondition, IssueSeverity};
use rusty_model::instruments::InstrumentId;
use rusty_model::venues::Venue;
async fn monitor_trading_system(monitor: &impl MonitorExt) -> anyhow::Result<()> {
// Configure latency alert
let alert_config = AlertConfig {
name: "High Latency Alert".to_string(),
condition: AlertCondition::LatencyThreshold { instrument_id: None },
threshold: 1_000_000.0, // 1ms in nanoseconds
duration_ms: 5000, // Trigger after 5 seconds
severity: IssueSeverity::Warning,
enabled: true,
};
let alert_handle = monitor.enable_alert(alert_config).await?;
// Get real-time metrics
let btc_usdt = InstrumentId::new("BTC-USDT", Venue::Binance);
let metrics = monitor.get_instrument_metrics(&btc_usdt).await?;
println!("Current latency: {}ns", metrics.latency_metrics.total_latency_ns);
// Export for Prometheus
let prometheus_data = monitor.export_prometheus_metrics().await?;
Ok(())
}Required Methods§
Sourcefn get_instrument_metrics<'life0, 'life1, 'async_trait>(
&'life0 self,
instrument_id: &'life1 InstrumentId,
) -> Pin<Box<dyn Future<Output = Result<InstrumentMetrics>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn get_instrument_metrics<'life0, 'life1, 'async_trait>(
&'life0 self,
instrument_id: &'life1 InstrumentId,
) -> Pin<Box<dyn Future<Output = Result<InstrumentMetrics>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Retrieves comprehensive real-time metrics for a specific trading instrument
This method provides a complete snapshot of all monitoring data for a single instrument, including connection health, data feed statistics, order flow metrics, and latency measurements. All metrics use nanosecond precision timestamps.
§Arguments
instrument_id- The unique identifier of the instrument to query
§Returns
Ok(InstrumentMetrics)- Complete metrics snapshot for the instrumentErr- If the instrument is not found or monitoring is not available
§Example
let eth_usdt = InstrumentId::new("ETH-USDT", Venue::Binance);
let metrics = monitor.get_instrument_metrics(ð_usdt).await?;
// Check order flow imbalance
if metrics.order_flow_metrics.volume_imbalance.abs() > 0.7 {
println!("High volume imbalance detected: {}", metrics.order_flow_metrics.volume_imbalance);
}
// Monitor latency
println!("Network latency: {}µs", metrics.latency_metrics.network_latency_ns / 1000);
println!("Total latency: {}µs", metrics.latency_metrics.total_latency_ns / 1000);Sourcefn get_aggregated_metrics<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<AggregatedMetrics>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn get_aggregated_metrics<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<AggregatedMetrics>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Retrieves aggregated metrics across all actively monitored instruments
This method provides system-wide performance metrics, useful for monitoring overall system health and identifying cross-instrument issues. Aggregated metrics include total message throughput, average latencies, memory usage, and health scores.
§Returns
Ok(AggregatedMetrics)- System-wide aggregated metricsErr- If monitoring subsystem is unavailable
§Example
let system_metrics = monitor.get_aggregated_metrics().await?;
// Check system health
if system_metrics.overall_health_score < 0.8 {
println!("System health degraded: {:.2}", system_metrics.overall_health_score);
println!("Active alerts: {}", system_metrics.active_alerts);
}
// Monitor performance
println!("Processing {} messages/sec",
system_metrics.total_messages_processed / (system_metrics.timestamp / 1_000_000_000));
println!("P99 latency: {}µs", system_metrics.p99_latency_ns / 1000);Sourcefn register_metrics_collector<'life0, 'async_trait>(
&'life0 mut self,
collector: Box<dyn MetricsCollector>,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn register_metrics_collector<'life0, 'async_trait>(
&'life0 mut self,
collector: Box<dyn MetricsCollector>,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Registers a custom metrics collector for domain-specific monitoring
This method allows extending the monitoring system with custom metrics collectors
that implement the MetricsCollector trait. Collectors are invoked periodically
and their metrics are included in aggregated reports and exports.
§Arguments
collector- A boxed implementation of theMetricsCollectortrait
§Returns
Ok(())- If the collector was successfully registeredErr- If registration failed (e.g., duplicate name)
§Example
struct SpreadMonitor;
#[async_trait]
impl MetricsCollector for SpreadMonitor {
async fn collect(&self) -> Result<Vec<CustomMetric>> {
// Calculate custom spread metrics
let mut tags = FxHashMap::default();
tags.insert("exchange".to_string(), "binance".to_string());
Ok(vec![
CustomMetric {
name: "custom_spread_bps".to_string(),
value: 2.5,
tags,
timestamp: quanta::Instant::now().as_nanos() as u64,
}
])
}
fn name(&self) -> &str {
"spread_monitor"
}
}
monitor.register_metrics_collector(Box::new(SpreadMonitor)).await?;Sourcefn enable_alert<'life0, 'async_trait>(
&'life0 mut self,
alert_config: AlertConfig,
) -> Pin<Box<dyn Future<Output = Result<AlertHandle>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn enable_alert<'life0, 'async_trait>(
&'life0 mut self,
alert_config: AlertConfig,
) -> Pin<Box<dyn Future<Output = Result<AlertHandle>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Enables a real-time alert for specific monitoring conditions
This method configures and activates an alert that will trigger when the specified condition is met for the configured duration. Alerts can monitor various conditions including latency thresholds, error rates, connection status, and custom metrics.
§Arguments
alert_config- Configuration specifying the alert condition, threshold, and behavior
§Returns
Ok(AlertHandle)- A handle for managing the active alertErr- If the alert configuration is invalid
§Example
// Alert for high latency on specific instrument
let btc_instrument = InstrumentId::new("BTC-USDT", Venue::Binance);
let latency_alert = AlertConfig {
name: "BTC-USDT High Latency".to_string(),
condition: AlertCondition::LatencyThreshold {
instrument_id: Some(btc_instrument),
},
threshold: 5_000_000.0, // 5ms in nanoseconds
duration_ms: 10_000, // Sustained for 10 seconds
severity: IssueSeverity::Error,
enabled: true,
};
let alert_handle = monitor.enable_alert(latency_alert).await?;
// Alert for connection loss
let connection_alert = AlertConfig {
name: "Binance Connection Lost".to_string(),
condition: AlertCondition::ConnectionLoss {
exchange: "binance".to_string(),
},
threshold: 1.0, // Binary condition
duration_ms: 0, // Immediate
severity: IssueSeverity::Critical,
enabled: true,
};
let conn_handle = monitor.enable_alert(connection_alert).await?;Sourcefn disable_alert<'life0, 'async_trait>(
&'life0 mut self,
handle: AlertHandle,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn disable_alert<'life0, 'async_trait>(
&'life0 mut self,
handle: AlertHandle,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Disables an active alert using its handle
This method deactivates a previously configured alert. The alert will stop evaluating its condition and no new notifications will be generated.
§Arguments
handle- The handle returned when the alert was enabled
§Returns
Ok(())- If the alert was successfully disabledErr- If the handle is invalid or the alert was already disabled
§Example
// Disable an alert after market close
monitor.disable_alert(alert_handle).await?;Sourcefn get_health_status<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<HealthStatus>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn get_health_status<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<HealthStatus>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Retrieves the current health status with detailed component diagnostics
This method provides a comprehensive health check of the monitoring system and all monitored components. It includes overall system status, individual component health, active issues, performance grading, and uptime information.
§Returns
Ok(HealthStatus)- Detailed health informationErr- If health check fails
§Example
let health = monitor.get_health_status().await?;
match health.overall_status {
HealthState::Healthy => println!("System healthy - Grade: {}", health.performance_grade),
HealthState::Degraded => {
println!("System degraded - Active issues: {}", health.active_issues.len());
for issue in &health.active_issues {
println!(" - {}: {}", issue.component, issue.description);
}
}
HealthState::Critical => println!("CRITICAL: System requires immediate attention"),
HealthState::Offline => println!("System offline"),
}
// Check component health
for component in &health.component_health {
if component.error_rate > 0.01 {
println!("{} has high error rate: {:.2}%",
component.component_name, component.error_rate * 100.0);
}
}Sourcefn start_anomaly_detection<'life0, 'async_trait>(
&'life0 mut self,
config: AnomalyConfig,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn start_anomaly_detection<'life0, 'async_trait>(
&'life0 mut self,
config: AnomalyConfig,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Starts anomaly detection for trading patterns and system behavior
This method activates anomaly detection algorithms that monitor for unusual patterns in trading data, latency spikes, volume anomalies, and other deviations from normal behavior. Multiple algorithms can run concurrently.
§Arguments
config- Configuration specifying instruments, algorithms, and sensitivity
§Returns
Ok(())- If anomaly detection was successfully startedErr- If configuration is invalid or detection is already running
§Example
let config = AnomalyConfig {
instruments: vec![
InstrumentId::new("BTC-USDT", Venue::Binance),
InstrumentId::new("ETH-USDT", Venue::Binance),
],
sensitivity: 0.8, // High sensitivity
window_size_ms: 60_000, // 1 minute windows
algorithms: vec![
AnomalyAlgorithm::StatisticalOutlier {
threshold_std_dev: 3.0,
},
AnomalyAlgorithm::VolumeAnomaly {
volume_threshold: 5.0, // 5x normal volume
},
AnomalyAlgorithm::LatencyAnomaly {
latency_threshold_ns: 10_000_000, // 10ms
},
],
};
monitor.start_anomaly_detection(config).await?;Sourcefn stop_anomaly_detection<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn stop_anomaly_detection<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Stops all active anomaly detection algorithms
This method deactivates all running anomaly detection algorithms and clears any accumulated detection state.
§Returns
Ok(())- If anomaly detection was successfully stoppedErr- If no detection was running
§Example
// Stop detection during maintenance window
monitor.stop_anomaly_detection().await?;Sourcefn get_latency_distribution<'life0, 'life1, 'async_trait>(
&'life0 self,
instrument_id: &'life1 InstrumentId,
) -> Pin<Box<dyn Future<Output = Result<LatencyDistribution>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn get_latency_distribution<'life0, 'life1, 'async_trait>(
&'life0 self,
instrument_id: &'life1 InstrumentId,
) -> Pin<Box<dyn Future<Output = Result<LatencyDistribution>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Retrieves detailed latency distribution statistics for an instrument
This method provides a comprehensive breakdown of latency measurements including percentiles (p50, p90, p95, p99, p99.9), maximum values, and histogram buckets. Useful for identifying latency patterns and optimizing performance.
§Arguments
instrument_id- The instrument to analyze
§Returns
Ok(LatencyDistribution)- Detailed latency statisticsErr- If instrument is not found or has insufficient data
§Example
let btc_usdt = InstrumentId::new("BTC-USDT", Venue::Binance);
let latency = monitor.get_latency_distribution(&btc_usdt).await?;
println!("Latency percentiles for BTC-USDT:");
println!(" P50: {}µs", latency.p50_ns / 1000);
println!(" P90: {}µs", latency.p90_ns / 1000);
println!(" P95: {}µs", latency.p95_ns / 1000);
println!(" P99: {}µs", latency.p99_ns / 1000);
println!(" P99.9: {}µs", latency.p99_9_ns / 1000);
println!(" Max: {}µs", latency.max_ns / 1000);
// Analyze histogram
for bucket in &latency.histogram {
println!(" {}µs-{}µs: {} samples",
bucket.min_ns / 1000, bucket.max_ns / 1000, bucket.count);
}Sourcefn export_prometheus_metrics<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<String>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn export_prometheus_metrics<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<String>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Exports all metrics in Prometheus exposition format
This method generates a text representation of all metrics following the Prometheus exposition format. The output can be scraped by Prometheus or compatible monitoring systems. Includes all standard metrics, custom metrics, and alert states.
§Returns
Ok(String)- Prometheus-formatted metrics textErr- If export fails
§Output Format
# HELP hft_latency_nanoseconds End-to-end latency in nanoseconds
# TYPE hft_latency_nanoseconds histogram
hft_latency_nanoseconds_bucket{exchange="binance",instrument="BTC-USDT",le="1000"} 245
hft_latency_nanoseconds_bucket{exchange="binance",instrument="BTC-USDT",le="5000"} 1823
hft_latency_nanoseconds_sum{exchange="binance",instrument="BTC-USDT"} 4523000
hft_latency_nanoseconds_count{exchange="binance",instrument="BTC-USDT"} 2068
# HELP hft_messages_total Total messages processed
# TYPE hft_messages_total counter
hft_messages_total{exchange="binance",type="trade"} 150234
hft_messages_total{exchange="binance",type="orderbook"} 892341§Example
let prometheus_data = monitor.export_prometheus_metrics().await?;
// Write to file for Prometheus to scrape
std::fs::write("/var/lib/prometheus/hft_metrics.prom", prometheus_data)?;
// Or serve via HTTP endpoint
// warp::reply::with_header(prometheus_data, "Content-Type", "text/plain; version=0.0.4")Sourcefn reset_statistics<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn reset_statistics<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Resets all monitoring statistics to their initial state
This method clears all accumulated metrics, counters, and statistics while preserving configuration (alerts, collectors, etc.). Useful for starting fresh measurements or after system maintenance.
§Returns
Ok(())- If statistics were successfully resetErr- If reset fails
§Example
// Reset stats at market open
monitor.reset_statistics().await?;
println!("Statistics reset for new trading session");Sourcefn get_trading_metrics<'life0, 'life1, 'async_trait>(
&'life0 self,
instrument_id: &'life1 InstrumentId,
) -> Pin<Box<dyn Future<Output = Result<TradingMetrics>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn get_trading_metrics<'life0, 'life1, 'async_trait>(
&'life0 self,
instrument_id: &'life1 InstrumentId,
) -> Pin<Box<dyn Future<Output = Result<TradingMetrics>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Retrieves advanced trading-specific metrics for an instrument
This method provides sophisticated trading analytics including order flow imbalance, VWAP/TWAP calculations, microstructure signals, and execution quality metrics. These metrics are essential for HFT strategy development and performance analysis.
§Arguments
instrument_id- The instrument to analyze
§Returns
Ok(TradingMetrics)- Comprehensive trading analyticsErr- If instrument is not found or has insufficient data
§Example
let eth_usdt = InstrumentId::new("ETH-USDT", Venue::Binance);
let trading_metrics = monitor.get_trading_metrics(ð_usdt).await?;
// Check microstructure signals
let signals = &trading_metrics.microstructure_signals;
if signals.book_imbalance > 0.7 && signals.momentum > 0.5 {
println!("Strong bullish signal detected");
println!(" Book imbalance: {:.3}", signals.book_imbalance);
println!(" Momentum: {:.3}", signals.momentum);
println!(" Alpha signal: {:.3}", signals.alpha_signal);
}
// Analyze execution quality
let exec_quality = &trading_metrics.execution_quality;
println!("Execution metrics:");
println!(" Slippage: {:.2} bps", exec_quality.slippage_bps);
println!(" Market impact: {:.2} bps", exec_quality.market_impact_bps);
println!(" Fill rate: {:.1}%", exec_quality.fill_rate * 100.0);
// Price analysis
println!("VWAP: {}", trading_metrics.vwap);
println!("TWAP: {}", trading_metrics.twap);Implementors§
impl MonitorExt for BinanceFuturesProvider
MonitorExt implementation for BinanceFuturesProvider
Provides advanced monitoring capabilities for HFT trading systems