rusty_backtest/features/
market_impact.rs

1//! Market Impact Analysis
2//!
3//! Implements Kyle's Lambda and other market impact metrics for measuring
4//! the price impact of trades and order flow.
5
6use super::{TradeSide, TradeTick, decimal_to_f64_or_nan};
7use smallvec::SmallVec;
8
9/// Kyle's Lambda calculator for market impact coefficient
10///
11/// Kyle's Lambda (λ) measures the price impact per unit of net order flow.
12/// Formula: λ = Δp / Δq
13/// where Δp is the price change and Δq is the net volume imbalance
14#[derive(Debug, Clone)]
15pub struct KylesLambdaCalculator {
16    /// Window size for rolling calculations
17    window_size: usize,
18    /// Price changes buffer (circular)
19    price_changes: Vec<f64>,
20    /// Volume imbalances buffer (circular)
21    volume_imbalances: Vec<f64>,
22    /// Current position in circular buffer
23    buffer_position: usize,
24    /// Number of samples collected
25    samples_count: usize,
26    /// Previous price for calculating changes
27    previous_price: Option<f64>,
28    /// Accumulated buy volume in current interval
29    interval_buy_volume: f64,
30    /// Accumulated sell volume in current interval
31    interval_sell_volume: f64,
32    /// Time of last interval calculation (nanoseconds)
33    last_interval_time: u64,
34    /// Interval duration in nanoseconds
35    interval_duration_ns: u64,
36}
37
38impl KylesLambdaCalculator {
39    /// Create a new Kyle's Lambda calculator
40    ///
41    /// # Arguments
42    /// * `window_size` - Number of intervals to use for rolling calculation
43    /// * `interval_duration_ms` - Duration of each interval in milliseconds
44    #[must_use]
45    pub fn new(window_size: usize, interval_duration_ms: u64) -> Self {
46        Self {
47            window_size,
48            price_changes: vec![0.0; window_size],
49            volume_imbalances: vec![0.0; window_size],
50            buffer_position: 0,
51            samples_count: 0,
52            previous_price: None,
53            interval_buy_volume: 0.0,
54            interval_sell_volume: 0.0,
55            last_interval_time: 0,
56            interval_duration_ns: interval_duration_ms * 1_000_000,
57        }
58    }
59
60    /// Update with a new trade
61    pub fn update_trade(&mut self, trade: &TradeTick) -> Option<f64> {
62        let timestamp = trade.timestamp_ns;
63        let price = decimal_to_f64_or_nan(trade.price);
64        let quantity = decimal_to_f64_or_nan(trade.quantity);
65
66        // Initialize timing on first trade
67        if self.last_interval_time == 0 {
68            self.last_interval_time = timestamp;
69            self.previous_price = Some(price);
70        }
71
72        // Accumulate volume based on trade side
73        if trade.side == TradeSide::Buy {
74            self.interval_buy_volume += quantity;
75        } else {
76            self.interval_sell_volume += quantity;
77        }
78
79        // Check if interval has elapsed
80        if timestamp >= self.last_interval_time + self.interval_duration_ns
81            && let Some(prev_price) = self.previous_price
82        {
83            // Calculate price change
84            let price_change = price - prev_price;
85
86            // Calculate net volume imbalance
87            let volume_imbalance = self.interval_buy_volume - self.interval_sell_volume;
88
89            // Store in circular buffer
90            self.price_changes[self.buffer_position] = price_change;
91            self.volume_imbalances[self.buffer_position] = volume_imbalance;
92
93            // Update buffer position
94            self.buffer_position = (self.buffer_position + 1) % self.window_size;
95            self.samples_count = self.samples_count.saturating_add(1).min(self.window_size);
96
97            // Reset for next interval
98            self.interval_buy_volume = 0.0;
99            self.interval_sell_volume = 0.0;
100            self.last_interval_time = timestamp;
101            self.previous_price = Some(price);
102
103            // Calculate Kyle's Lambda if we have enough samples
104            return self.calculate_lambda();
105        }
106
107        None
108    }
109    /// Alias for update_trade() to maintain backward compatibility
110    pub fn add_trade(&mut self, trade: &TradeTick) {
111        self.update_trade(trade);
112    }
113
114    /// Alias for calculate_lambda() to maintain backward compatibility
115    pub fn calculate(&self) -> f64 {
116        self.calculate_lambda().unwrap_or(0.0)
117    }
118
119    /// Calculate Kyle's Lambda using linear regression
120    ///
121    /// Performs OLS regression: Δp = λ * Δq + ε
122    /// Returns the slope coefficient λ
123    fn calculate_lambda(&self) -> Option<f64> {
124        if self.samples_count < 10 {
125            return None; // Need minimum samples for reliable estimate
126        }
127
128        let n = self.samples_count;
129
130        // Calculate means
131        let mut sum_x = 0.0;
132        let mut sum_y = 0.0;
133
134        for i in 0..n {
135            sum_x += self.volume_imbalances[i];
136            sum_y += self.price_changes[i];
137        }
138
139        let mean_x = sum_x / n as f64;
140        let mean_y = sum_y / n as f64;
141
142        // Calculate slope using least squares
143        let mut numerator = 0.0;
144        let mut denominator = 0.0;
145
146        for i in 0..n {
147            let x_diff = self.volume_imbalances[i] - mean_x;
148            let y_diff = self.price_changes[i] - mean_y;
149            numerator += x_diff * y_diff;
150            denominator += x_diff * x_diff;
151        }
152
153        if denominator.abs() < 1e-10 {
154            return None; // Avoid division by zero
155        }
156
157        Some(numerator / denominator)
158    }
159
160    /// Get current Kyle's Lambda estimate
161    pub fn get_lambda(&self) -> Option<f64> {
162        self.calculate_lambda()
163    }
164
165    /// Reset the calculator
166    pub fn reset(&mut self) {
167        self.price_changes.fill(0.0);
168        self.volume_imbalances.fill(0.0);
169        self.buffer_position = 0;
170        self.samples_count = 0;
171        self.previous_price = None;
172        self.interval_buy_volume = 0.0;
173        self.interval_sell_volume = 0.0;
174        self.last_interval_time = 0;
175    }
176}
177
178/// Advanced market impact metrics
179#[derive(Debug, Clone)]
180pub struct MarketImpactAnalyzer {
181    /// Kyle's Lambda calculator
182    kyles_lambda: KylesLambdaCalculator,
183    /// Temporary impact decay estimator
184    temp_impact_halflife: f64,
185    /// Permanent impact ratio
186    permanent_impact_ratio: f64,
187    /// Recent impacts for decay modeling
188    recent_impacts: SmallVec<[(u64, f64, f64); 32]>, // (timestamp, impact, volume)
189}
190
191impl MarketImpactAnalyzer {
192    /// Create a new market impact analyzer
193    #[must_use]
194    pub fn new(window_size: usize, interval_duration_ms: u64) -> Self {
195        Self {
196            kyles_lambda: KylesLambdaCalculator::new(window_size, interval_duration_ms),
197            temp_impact_halflife: 300_000.0, // 5 minutes in milliseconds
198            permanent_impact_ratio: 0.3,     // 30% of impact is permanent
199            recent_impacts: SmallVec::new(),
200        }
201    }
202
203    /// Update with new trade and get current impact metrics
204    pub fn update(&mut self, trade: &TradeTick) -> MarketImpactMetrics {
205        // Update Kyle's Lambda
206        let kyles_lambda = self.kyles_lambda.update_trade(trade);
207
208        // Calculate instantaneous impact if we have lambda
209        let instantaneous_impact = if let Some(lambda) = kyles_lambda {
210            let signed_volume = if trade.side == TradeSide::Buy {
211                decimal_to_f64_or_nan(trade.quantity)
212            } else {
213                -decimal_to_f64_or_nan(trade.quantity)
214            };
215            lambda * signed_volume
216        } else {
217            0.0
218        };
219
220        // Add to recent impacts
221        if instantaneous_impact.abs() > 1e-10 {
222            self.recent_impacts.push((
223                trade.timestamp_ns,
224                instantaneous_impact,
225                decimal_to_f64_or_nan(trade.quantity),
226            ));
227
228            // Keep only recent impacts (last hour)
229            let cutoff_time = trade.timestamp_ns.saturating_sub(3_600_000_000_000);
230            self.recent_impacts.retain(|(ts, _, _)| *ts > cutoff_time);
231        }
232
233        // Calculate cumulative temporary impact with decay
234        let cumulative_temp_impact = self.calculate_cumulative_impact(trade.timestamp_ns);
235
236        MarketImpactMetrics {
237            kyles_lambda,
238            instantaneous_impact,
239            cumulative_temp_impact,
240            permanent_impact: instantaneous_impact * self.permanent_impact_ratio,
241        }
242    }
243
244    /// Calculate cumulative temporary impact with exponential decay
245    fn calculate_cumulative_impact(&self, current_time: u64) -> f64 {
246        let mut total_impact = 0.0;
247        let decay_rate = 0.693 / self.temp_impact_halflife; // ln(2) / halflife
248
249        for &(timestamp, impact, _) in &self.recent_impacts {
250            let time_elapsed = (current_time - timestamp) as f64 / 1_000_000.0; // to milliseconds
251            let decay_factor = (-decay_rate * time_elapsed).exp();
252            total_impact += impact * decay_factor * (1.0 - self.permanent_impact_ratio);
253        }
254
255        total_impact
256    }
257
258    /// Reset all calculators
259    pub fn reset(&mut self) {
260        self.kyles_lambda.reset();
261        self.recent_impacts.clear();
262    }
263}
264
265/// Market impact metrics
266#[derive(Debug, Clone, Copy)]
267pub struct MarketImpactMetrics {
268    /// Kyle's Lambda coefficient
269    pub kyles_lambda: Option<f64>,
270    /// Instantaneous price impact of the last trade
271    pub instantaneous_impact: f64,
272    /// Cumulative temporary impact (with decay)
273    pub cumulative_temp_impact: f64,
274    /// Permanent impact component
275    pub permanent_impact: f64,
276}
277
278/// SIMD-optimized batch Kyle's Lambda calculation using safe operations
279pub mod simd {
280    use wide::f64x4;
281
282    /// Calculate Kyle's Lambda for multiple symbols in parallel using safe SIMD
283    ///
284    /// This function processes 4 symbols at a time using portable `wide` operations.
285    /// Completely safe, NaN-aware, and works on all platforms.
286    ///
287    /// # Arguments
288    /// * `price_changes` - Slice of price change arrays, one per symbol
289    /// * `volume_imbalances` - Slice of volume imbalance arrays, one per symbol
290    ///
291    /// # Returns
292    /// Vector of Kyle's Lambda coefficients, one per symbol
293    pub fn calculate_kyles_lambda_batch(
294        price_changes: &[&[f64]],
295        volume_imbalances: &[&[f64]],
296    ) -> Vec<f64> {
297        let n_symbols = price_changes.len();
298        let mut results = vec![0.0; n_symbols];
299
300        // Process 4 symbols at a time using safe SIMD
301        let chunks = n_symbols / 4;
302
303        for chunk in 0..chunks {
304            let base_idx = chunk * 4;
305
306            // Ensure all 4 symbols have data
307            if base_idx + 3 >= n_symbols {
308                break;
309            }
310
311            // Get sample count (assume all symbols have same length)
312            let n_samples = price_changes[base_idx].len();
313            if n_samples == 0 {
314                continue;
315            }
316
317            // Calculate means for 4 symbols simultaneously
318            let mut sum_x = f64x4::ZERO;
319            let mut sum_y = f64x4::ZERO;
320
321            for i in 0..n_samples {
322                // Load volume imbalances for 4 symbols (NaN-safe)
323                let x_vec = f64x4::from([
324                    volume_imbalances[base_idx][i],
325                    volume_imbalances[base_idx + 1][i],
326                    volume_imbalances[base_idx + 2][i],
327                    volume_imbalances[base_idx + 3][i],
328                ]);
329
330                // Load price changes for 4 symbols (NaN-safe)
331                let y_vec = f64x4::from([
332                    price_changes[base_idx][i],
333                    price_changes[base_idx + 1][i],
334                    price_changes[base_idx + 2][i],
335                    price_changes[base_idx + 3][i],
336                ]);
337
338                // Accumulate sums (NaN-safe addition)
339                sum_x += x_vec;
340                sum_y += y_vec;
341            }
342
343            // Calculate means
344            let n_vec = f64x4::splat(n_samples as f64);
345            let mean_x = sum_x / n_vec;
346            let mean_y = sum_y / n_vec;
347
348            // Calculate regression coefficients
349            let mut numerator = f64x4::ZERO;
350            let mut denominator = f64x4::ZERO;
351
352            for i in 0..n_samples {
353                // Load data for 4 symbols (NaN-safe)
354                let x_vec = f64x4::from([
355                    volume_imbalances[base_idx][i],
356                    volume_imbalances[base_idx + 1][i],
357                    volume_imbalances[base_idx + 2][i],
358                    volume_imbalances[base_idx + 3][i],
359                ]);
360
361                let y_vec = f64x4::from([
362                    price_changes[base_idx][i],
363                    price_changes[base_idx + 1][i],
364                    price_changes[base_idx + 2][i],
365                    price_changes[base_idx + 3][i],
366                ]);
367
368                // Calculate differences from means (NaN-safe)
369                let x_diff = x_vec - mean_x;
370                let y_diff = y_vec - mean_y;
371
372                // Update numerator and denominator (NaN-safe)
373                let xy_prod = x_diff * y_diff;
374                let xx_prod = x_diff * x_diff;
375
376                numerator += xy_prod;
377                denominator += xx_prod;
378            }
379
380            // Calculate lambda = numerator / denominator (NaN-safe division)
381            let lambda = numerator / denominator;
382
383            // Extract results safely
384            let lambda_array = lambda.as_array_ref();
385            results[base_idx] = lambda_array[0];
386            results[base_idx + 1] = lambda_array[1];
387            results[base_idx + 2] = lambda_array[2];
388            results[base_idx + 3] = lambda_array[3];
389
390            // Handle NaN results (division by zero or invalid inputs)
391            for j in 0..4 {
392                if results[base_idx + j].is_nan() || results[base_idx + j].is_infinite() {
393                    results[base_idx + j] = 0.0;
394                }
395            }
396        }
397
398        // Handle remaining symbols (scalar fallback)
399        for i in (chunks * 4)..n_symbols {
400            results[i] = calculate_single_lambda(price_changes[i], volume_imbalances[i]);
401        }
402
403        results
404    }
405
406    /// Calculate Kyle's Lambda for a single symbol using scalar operations
407    ///
408    /// This is the fallback function for individual symbols or when SIMD processing
409    /// is not beneficial. Handles NaN values gracefully.
410    fn calculate_single_lambda(price_changes: &[f64], volume_imbalances: &[f64]) -> f64 {
411        if price_changes.is_empty() || volume_imbalances.is_empty() {
412            return 0.0;
413        }
414
415        if price_changes.len() != volume_imbalances.len() {
416            return 0.0;
417        }
418
419        let _n = price_changes.len() as f64;
420
421        // Calculate means with NaN handling
422        let sum_x: f64 = volume_imbalances.iter().filter(|x| x.is_finite()).sum();
423        let sum_y: f64 = price_changes.iter().filter(|y| y.is_finite()).sum();
424        let valid_count = price_changes
425            .iter()
426            .zip(volume_imbalances.iter())
427            .filter(|(y, x)| y.is_finite() && x.is_finite())
428            .count() as f64;
429
430        if valid_count < 2.0 {
431            return 0.0; // Need at least 2 valid samples
432        }
433
434        let mean_x = sum_x / valid_count;
435        let mean_y = sum_y / valid_count;
436
437        let mut numerator = 0.0;
438        let mut denominator = 0.0;
439
440        // Calculate regression coefficients with NaN handling
441        for i in 0..price_changes.len() {
442            let x = volume_imbalances[i];
443            let y = price_changes[i];
444
445            if x.is_finite() && y.is_finite() {
446                let x_diff = x - mean_x;
447                let y_diff = y - mean_y;
448                numerator += x_diff * y_diff;
449                denominator += x_diff * x_diff;
450            }
451        }
452
453        if denominator.abs() < 1e-10 || !denominator.is_finite() {
454            0.0
455        } else {
456            let result = numerator / denominator;
457            if result.is_finite() { result } else { 0.0 }
458        }
459    }
460}
461
462#[cfg(test)]
463mod tests {
464    use super::*;
465    use rust_decimal::Decimal;
466    use rust_decimal_macros::dec;
467
468    #[test]
469    fn test_kyles_lambda_basic() {
470        let mut calculator = KylesLambdaCalculator::new(20, 100); // 100ms intervals for faster testing
471
472        // Simulate trades with clear price impact pattern
473        let base_time = 1_000_000_000u64;
474        let mut last_lambda = None;
475
476        // Generate 15 intervals to ensure we have enough samples (need 10 minimum)
477        for interval in 0..15 {
478            let interval_time = base_time + (interval as u64) * 200_000_000; // 200ms between intervals
479            let price_base = dec!(100.0) + Decimal::from(interval) / Decimal::from(10); // Price increases gradually
480
481            // Alternate between net buy and net sell intervals
482            let (side, volume) = if interval % 2 == 0 {
483                (TradeSide::Buy, dec!(100.0)) // Net buy volume
484            } else {
485                (TradeSide::Sell, dec!(50.0)) // Net sell volume
486            };
487
488            // Create a few trades within each interval
489            for j in 0..3 {
490                let trade = TradeTick {
491                    symbol: "TEST".into(),
492                    timestamp_ns: interval_time + j * 20_000_000, // 20ms apart
493                    side,
494                    price: price_base,
495                    quantity: volume / Decimal::from(3),
496                };
497                let lambda = calculator.update_trade(&trade);
498                if lambda.is_some() {
499                    last_lambda = lambda;
500                }
501            }
502        }
503
504        // After 15 intervals, we should have a lambda estimate
505        assert!(
506            last_lambda.is_some(),
507            "Expected Kyle's Lambda to be calculated after 15 intervals"
508        );
509    }
510
511    #[test]
512    fn test_market_impact_analyzer() {
513        let mut analyzer = MarketImpactAnalyzer::new(20, 1000);
514
515        let trade = TradeTick {
516            symbol: "TEST".into(),
517            timestamp_ns: 1_000_000_000,
518            side: TradeSide::Buy,
519            price: dec!(100.0),
520            quantity: dec!(50.0),
521        };
522
523        let metrics = analyzer.update(&trade);
524
525        // Initially no lambda estimate
526        assert!(metrics.kyles_lambda.is_none());
527        assert_eq!(metrics.instantaneous_impact, 0.0);
528    }
529
530    #[test]
531    fn test_simd_batch_calculation() {
532        use crate::features::market_impact::simd::calculate_kyles_lambda_batch;
533
534        // Create test data for 4 symbols
535        let price_changes = [
536            vec![0.1, 0.2, -0.1, 0.3, -0.2],
537            vec![0.15, 0.25, -0.05, 0.35, -0.15],
538            vec![0.05, 0.15, -0.15, 0.25, -0.25],
539            vec![0.2, 0.3, -0.2, 0.4, -0.1],
540        ];
541
542        let volume_imbalances = [
543            vec![100.0, 200.0, -100.0, 300.0, -200.0],
544            vec![150.0, 250.0, -50.0, 350.0, -150.0],
545            vec![50.0, 150.0, -150.0, 250.0, -250.0],
546            vec![200.0, 300.0, -200.0, 400.0, -100.0],
547        ];
548
549        let price_refs: Vec<&[f64]> = price_changes.iter().map(|v| v.as_slice()).collect();
550        let volume_refs: Vec<&[f64]> = volume_imbalances.iter().map(|v| v.as_slice()).collect();
551
552        // Now safe - no unsafe block needed!
553        let results = calculate_kyles_lambda_batch(&price_refs, &volume_refs);
554
555        assert_eq!(results.len(), 4);
556        // All results should be positive given the positive correlation in test data
557        for lambda in &results {
558            assert!(*lambda > 0.0, "Lambda should be positive, got: {lambda}");
559        }
560    }
561
562    #[test]
563    fn test_simd_nan_safety() {
564        use crate::features::market_impact::simd::calculate_kyles_lambda_batch;
565
566        // Test with NaN values to ensure safe handling
567        let price_changes = [
568            vec![0.1, f64::NAN, -0.1, 0.3, -0.2],
569            vec![0.15, 0.25, -0.05, f64::INFINITY, -0.15],
570            vec![f64::NEG_INFINITY, 0.15, -0.15, 0.25, -0.25],
571            vec![0.2, 0.3, -0.2, 0.4, f64::NAN],
572        ];
573
574        let volume_imbalances = [
575            vec![100.0, 200.0, f64::NAN, 300.0, -200.0],
576            vec![150.0, f64::INFINITY, -50.0, 350.0, -150.0],
577            vec![50.0, 150.0, -150.0, f64::NEG_INFINITY, -250.0],
578            vec![f64::NAN, 300.0, -200.0, 400.0, -100.0],
579        ];
580
581        let price_refs: Vec<&[f64]> = price_changes.iter().map(|v| v.as_slice()).collect();
582        let volume_refs: Vec<&[f64]> = volume_imbalances.iter().map(|v| v.as_slice()).collect();
583
584        // Should handle NaN/Infinity gracefully without panicking
585        let results = calculate_kyles_lambda_batch(&price_refs, &volume_refs);
586
587        assert_eq!(results.len(), 4);
588        // All results should be finite (NaN/Inf converted to 0.0)
589        for (i, lambda) in results.iter().enumerate() {
590            assert!(
591                lambda.is_finite(),
592                "Lambda[{i}] should be finite, got: {lambda}"
593            );
594        }
595    }
596}