Trait MonitorExt

Source
pub trait MonitorExt {
    // Required methods
    fn get_instrument_metrics<'life0, 'life1, 'async_trait>(
        &'life0 self,
        instrument_id: &'life1 InstrumentId,
    ) -> Pin<Box<dyn Future<Output = Result<InstrumentMetrics>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
    fn get_aggregated_metrics<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = Result<AggregatedMetrics>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn register_metrics_collector<'life0, 'async_trait>(
        &'life0 mut self,
        collector: Box<dyn MetricsCollector>,
    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn enable_alert<'life0, 'async_trait>(
        &'life0 mut self,
        alert_config: AlertConfig,
    ) -> Pin<Box<dyn Future<Output = Result<AlertHandle>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn disable_alert<'life0, 'async_trait>(
        &'life0 mut self,
        handle: AlertHandle,
    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn get_health_status<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = Result<HealthStatus>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn start_anomaly_detection<'life0, 'async_trait>(
        &'life0 mut self,
        config: AnomalyConfig,
    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn stop_anomaly_detection<'life0, 'async_trait>(
        &'life0 mut self,
    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn get_latency_distribution<'life0, 'life1, 'async_trait>(
        &'life0 self,
        instrument_id: &'life1 InstrumentId,
    ) -> Pin<Box<dyn Future<Output = Result<LatencyDistribution>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
    fn export_prometheus_metrics<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = Result<String>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn reset_statistics<'life0, 'async_trait>(
        &'life0 mut self,
    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn get_trading_metrics<'life0, 'life1, 'async_trait>(
        &'life0 self,
        instrument_id: &'life1 InstrumentId,
    ) -> Pin<Box<dyn Future<Output = Result<TradingMetrics>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
}
Expand description

Advanced monitoring capabilities for High-Frequency Trading (HFT) systems

The MonitorExt trait provides comprehensive monitoring, alerting, and observability features specifically designed for HFT systems that require nanosecond precision and real-time performance tracking. This trait extends basic monitoring with advanced analytics including:

  • Real-time Performance Metrics: Track latency distributions, throughput, and system health
  • Trading Analytics: Monitor order flow, market impact, and microstructure signals
  • Anomaly Detection: Identify unusual patterns in trading data and system behavior
  • Alerting System: Configure alerts for critical conditions with customizable thresholds
  • Health Monitoring: Component-level health tracking with degradation detection
  • Custom Metrics: Extensible framework for domain-specific metric collection

§Design Philosophy

The trait is designed with HFT requirements in mind:

  • All timestamps use nanosecond precision
  • Zero-allocation operations where possible
  • Lock-free metrics collection for minimal overhead
  • Configurable aggregation intervals to balance accuracy vs performance

§Usage Example

use rusty_feeder::provider::ext::{MonitorExt, AlertConfig, AlertCondition, IssueSeverity};
use rusty_model::instruments::InstrumentId;
use rusty_model::venues::Venue;

async fn monitor_trading_system(monitor: &impl MonitorExt) -> anyhow::Result<()> {
    // Configure latency alert
    let alert_config = AlertConfig {
        name: "High Latency Alert".to_string(),
        condition: AlertCondition::LatencyThreshold { instrument_id: None },
        threshold: 1_000_000.0, // 1ms in nanoseconds
        duration_ms: 5000,      // Trigger after 5 seconds
        severity: IssueSeverity::Warning,
        enabled: true,
    };

    let alert_handle = monitor.enable_alert(alert_config).await?;

    // Get real-time metrics
    let btc_usdt = InstrumentId::new("BTC-USDT", Venue::Binance);
    let metrics = monitor.get_instrument_metrics(&btc_usdt).await?;
    println!("Current latency: {}ns", metrics.latency_metrics.total_latency_ns);

    // Export for Prometheus
    let prometheus_data = monitor.export_prometheus_metrics().await?;

    Ok(())
}

Required Methods§

Source

fn get_instrument_metrics<'life0, 'life1, 'async_trait>( &'life0 self, instrument_id: &'life1 InstrumentId, ) -> Pin<Box<dyn Future<Output = Result<InstrumentMetrics>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Retrieves comprehensive real-time metrics for a specific trading instrument

This method provides a complete snapshot of all monitoring data for a single instrument, including connection health, data feed statistics, order flow metrics, and latency measurements. All metrics use nanosecond precision timestamps.

§Arguments
  • instrument_id - The unique identifier of the instrument to query
§Returns
  • Ok(InstrumentMetrics) - Complete metrics snapshot for the instrument
  • Err - If the instrument is not found or monitoring is not available
§Example
let eth_usdt = InstrumentId::new("ETH-USDT", Venue::Binance);
let metrics = monitor.get_instrument_metrics(&eth_usdt).await?;

// Check order flow imbalance
if metrics.order_flow_metrics.volume_imbalance.abs() > 0.7 {
    println!("High volume imbalance detected: {}", metrics.order_flow_metrics.volume_imbalance);
}

// Monitor latency
println!("Network latency: {}µs", metrics.latency_metrics.network_latency_ns / 1000);
println!("Total latency: {}µs", metrics.latency_metrics.total_latency_ns / 1000);
Source

fn get_aggregated_metrics<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<AggregatedMetrics>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Retrieves aggregated metrics across all actively monitored instruments

This method provides system-wide performance metrics, useful for monitoring overall system health and identifying cross-instrument issues. Aggregated metrics include total message throughput, average latencies, memory usage, and health scores.

§Returns
  • Ok(AggregatedMetrics) - System-wide aggregated metrics
  • Err - If monitoring subsystem is unavailable
§Example
let system_metrics = monitor.get_aggregated_metrics().await?;

// Check system health
if system_metrics.overall_health_score < 0.8 {
    println!("System health degraded: {:.2}", system_metrics.overall_health_score);
    println!("Active alerts: {}", system_metrics.active_alerts);
}

// Monitor performance
println!("Processing {} messages/sec",
    system_metrics.total_messages_processed / (system_metrics.timestamp / 1_000_000_000));
println!("P99 latency: {}µs", system_metrics.p99_latency_ns / 1000);
Source

fn register_metrics_collector<'life0, 'async_trait>( &'life0 mut self, collector: Box<dyn MetricsCollector>, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Registers a custom metrics collector for domain-specific monitoring

This method allows extending the monitoring system with custom metrics collectors that implement the MetricsCollector trait. Collectors are invoked periodically and their metrics are included in aggregated reports and exports.

§Arguments
  • collector - A boxed implementation of the MetricsCollector trait
§Returns
  • Ok(()) - If the collector was successfully registered
  • Err - If registration failed (e.g., duplicate name)
§Example

struct SpreadMonitor;

#[async_trait]
impl MetricsCollector for SpreadMonitor {
    async fn collect(&self) -> Result<Vec<CustomMetric>> {
        // Calculate custom spread metrics
        let mut tags = FxHashMap::default();
        tags.insert("exchange".to_string(), "binance".to_string());

        Ok(vec![
            CustomMetric {
                name: "custom_spread_bps".to_string(),
                value: 2.5,
                tags,
                timestamp: quanta::Instant::now().as_nanos() as u64,
            }
        ])
    }

    fn name(&self) -> &str {
        "spread_monitor"
    }
}

monitor.register_metrics_collector(Box::new(SpreadMonitor)).await?;
Source

fn enable_alert<'life0, 'async_trait>( &'life0 mut self, alert_config: AlertConfig, ) -> Pin<Box<dyn Future<Output = Result<AlertHandle>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Enables a real-time alert for specific monitoring conditions

This method configures and activates an alert that will trigger when the specified condition is met for the configured duration. Alerts can monitor various conditions including latency thresholds, error rates, connection status, and custom metrics.

§Arguments
  • alert_config - Configuration specifying the alert condition, threshold, and behavior
§Returns
  • Ok(AlertHandle) - A handle for managing the active alert
  • Err - If the alert configuration is invalid
§Example
// Alert for high latency on specific instrument
let btc_instrument = InstrumentId::new("BTC-USDT", Venue::Binance);
let latency_alert = AlertConfig {
    name: "BTC-USDT High Latency".to_string(),
    condition: AlertCondition::LatencyThreshold {
        instrument_id: Some(btc_instrument),
    },
    threshold: 5_000_000.0,  // 5ms in nanoseconds
    duration_ms: 10_000,     // Sustained for 10 seconds
    severity: IssueSeverity::Error,
    enabled: true,
};

let alert_handle = monitor.enable_alert(latency_alert).await?;

// Alert for connection loss
let connection_alert = AlertConfig {
    name: "Binance Connection Lost".to_string(),
    condition: AlertCondition::ConnectionLoss {
        exchange: "binance".to_string(),
    },
    threshold: 1.0,          // Binary condition
    duration_ms: 0,          // Immediate
    severity: IssueSeverity::Critical,
    enabled: true,
};

let conn_handle = monitor.enable_alert(connection_alert).await?;
Source

fn disable_alert<'life0, 'async_trait>( &'life0 mut self, handle: AlertHandle, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Disables an active alert using its handle

This method deactivates a previously configured alert. The alert will stop evaluating its condition and no new notifications will be generated.

§Arguments
  • handle - The handle returned when the alert was enabled
§Returns
  • Ok(()) - If the alert was successfully disabled
  • Err - If the handle is invalid or the alert was already disabled
§Example
// Disable an alert after market close
monitor.disable_alert(alert_handle).await?;
Source

fn get_health_status<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<HealthStatus>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Retrieves the current health status with detailed component diagnostics

This method provides a comprehensive health check of the monitoring system and all monitored components. It includes overall system status, individual component health, active issues, performance grading, and uptime information.

§Returns
  • Ok(HealthStatus) - Detailed health information
  • Err - If health check fails
§Example
let health = monitor.get_health_status().await?;

match health.overall_status {
    HealthState::Healthy => println!("System healthy - Grade: {}", health.performance_grade),
    HealthState::Degraded => {
        println!("System degraded - Active issues: {}", health.active_issues.len());
        for issue in &health.active_issues {
            println!("  - {}: {}", issue.component, issue.description);
        }
    }
    HealthState::Critical => println!("CRITICAL: System requires immediate attention"),
    HealthState::Offline => println!("System offline"),
}

// Check component health
for component in &health.component_health {
    if component.error_rate > 0.01 {
        println!("{} has high error rate: {:.2}%",
            component.component_name, component.error_rate * 100.0);
    }
}
Source

fn start_anomaly_detection<'life0, 'async_trait>( &'life0 mut self, config: AnomalyConfig, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Starts anomaly detection for trading patterns and system behavior

This method activates anomaly detection algorithms that monitor for unusual patterns in trading data, latency spikes, volume anomalies, and other deviations from normal behavior. Multiple algorithms can run concurrently.

§Arguments
  • config - Configuration specifying instruments, algorithms, and sensitivity
§Returns
  • Ok(()) - If anomaly detection was successfully started
  • Err - If configuration is invalid or detection is already running
§Example
let config = AnomalyConfig {
    instruments: vec![
        InstrumentId::new("BTC-USDT", Venue::Binance),
        InstrumentId::new("ETH-USDT", Venue::Binance),
    ],
    sensitivity: 0.8,  // High sensitivity
    window_size_ms: 60_000,  // 1 minute windows
    algorithms: vec![
        AnomalyAlgorithm::StatisticalOutlier {
            threshold_std_dev: 3.0,
        },
        AnomalyAlgorithm::VolumeAnomaly {
            volume_threshold: 5.0,  // 5x normal volume
        },
        AnomalyAlgorithm::LatencyAnomaly {
            latency_threshold_ns: 10_000_000,  // 10ms
        },
    ],
};

monitor.start_anomaly_detection(config).await?;
Source

fn stop_anomaly_detection<'life0, 'async_trait>( &'life0 mut self, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Stops all active anomaly detection algorithms

This method deactivates all running anomaly detection algorithms and clears any accumulated detection state.

§Returns
  • Ok(()) - If anomaly detection was successfully stopped
  • Err - If no detection was running
§Example
// Stop detection during maintenance window
monitor.stop_anomaly_detection().await?;
Source

fn get_latency_distribution<'life0, 'life1, 'async_trait>( &'life0 self, instrument_id: &'life1 InstrumentId, ) -> Pin<Box<dyn Future<Output = Result<LatencyDistribution>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Retrieves detailed latency distribution statistics for an instrument

This method provides a comprehensive breakdown of latency measurements including percentiles (p50, p90, p95, p99, p99.9), maximum values, and histogram buckets. Useful for identifying latency patterns and optimizing performance.

§Arguments
  • instrument_id - The instrument to analyze
§Returns
  • Ok(LatencyDistribution) - Detailed latency statistics
  • Err - If instrument is not found or has insufficient data
§Example
let btc_usdt = InstrumentId::new("BTC-USDT", Venue::Binance);
let latency = monitor.get_latency_distribution(&btc_usdt).await?;

println!("Latency percentiles for BTC-USDT:");
println!("  P50: {}µs", latency.p50_ns / 1000);
println!("  P90: {}µs", latency.p90_ns / 1000);
println!("  P95: {}µs", latency.p95_ns / 1000);
println!("  P99: {}µs", latency.p99_ns / 1000);
println!("  P99.9: {}µs", latency.p99_9_ns / 1000);
println!("  Max: {}µs", latency.max_ns / 1000);

// Analyze histogram
for bucket in &latency.histogram {
    println!("  {}µs-{}µs: {} samples",
        bucket.min_ns / 1000, bucket.max_ns / 1000, bucket.count);
}
Source

fn export_prometheus_metrics<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<String>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Exports all metrics in Prometheus exposition format

This method generates a text representation of all metrics following the Prometheus exposition format. The output can be scraped by Prometheus or compatible monitoring systems. Includes all standard metrics, custom metrics, and alert states.

§Returns
  • Ok(String) - Prometheus-formatted metrics text
  • Err - If export fails
§Output Format
# HELP hft_latency_nanoseconds End-to-end latency in nanoseconds
# TYPE hft_latency_nanoseconds histogram
hft_latency_nanoseconds_bucket{exchange="binance",instrument="BTC-USDT",le="1000"} 245
hft_latency_nanoseconds_bucket{exchange="binance",instrument="BTC-USDT",le="5000"} 1823
hft_latency_nanoseconds_sum{exchange="binance",instrument="BTC-USDT"} 4523000
hft_latency_nanoseconds_count{exchange="binance",instrument="BTC-USDT"} 2068

# HELP hft_messages_total Total messages processed
# TYPE hft_messages_total counter
hft_messages_total{exchange="binance",type="trade"} 150234
hft_messages_total{exchange="binance",type="orderbook"} 892341
§Example
let prometheus_data = monitor.export_prometheus_metrics().await?;

// Write to file for Prometheus to scrape
std::fs::write("/var/lib/prometheus/hft_metrics.prom", prometheus_data)?;

// Or serve via HTTP endpoint
// warp::reply::with_header(prometheus_data, "Content-Type", "text/plain; version=0.0.4")
Source

fn reset_statistics<'life0, 'async_trait>( &'life0 mut self, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Resets all monitoring statistics to their initial state

This method clears all accumulated metrics, counters, and statistics while preserving configuration (alerts, collectors, etc.). Useful for starting fresh measurements or after system maintenance.

§Returns
  • Ok(()) - If statistics were successfully reset
  • Err - If reset fails
§Example
// Reset stats at market open
monitor.reset_statistics().await?;
println!("Statistics reset for new trading session");
Source

fn get_trading_metrics<'life0, 'life1, 'async_trait>( &'life0 self, instrument_id: &'life1 InstrumentId, ) -> Pin<Box<dyn Future<Output = Result<TradingMetrics>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Retrieves advanced trading-specific metrics for an instrument

This method provides sophisticated trading analytics including order flow imbalance, VWAP/TWAP calculations, microstructure signals, and execution quality metrics. These metrics are essential for HFT strategy development and performance analysis.

§Arguments
  • instrument_id - The instrument to analyze
§Returns
  • Ok(TradingMetrics) - Comprehensive trading analytics
  • Err - If instrument is not found or has insufficient data
§Example
let eth_usdt = InstrumentId::new("ETH-USDT", Venue::Binance);
let trading_metrics = monitor.get_trading_metrics(&eth_usdt).await?;

// Check microstructure signals
let signals = &trading_metrics.microstructure_signals;
if signals.book_imbalance > 0.7 && signals.momentum > 0.5 {
    println!("Strong bullish signal detected");
    println!("  Book imbalance: {:.3}", signals.book_imbalance);
    println!("  Momentum: {:.3}", signals.momentum);
    println!("  Alpha signal: {:.3}", signals.alpha_signal);
}

// Analyze execution quality
let exec_quality = &trading_metrics.execution_quality;
println!("Execution metrics:");
println!("  Slippage: {:.2} bps", exec_quality.slippage_bps);
println!("  Market impact: {:.2} bps", exec_quality.market_impact_bps);
println!("  Fill rate: {:.1}%", exec_quality.fill_rate * 100.0);

// Price analysis
println!("VWAP: {}", trading_metrics.vwap);
println!("TWAP: {}", trading_metrics.twap);

Implementors§

Source§

impl MonitorExt for BinanceFuturesProvider

MonitorExt implementation for BinanceFuturesProvider

Provides advanced monitoring capabilities for HFT trading systems