1use super::{TradeSide, TradeTick, decimal_to_f64_or_nan};
7use smallvec::SmallVec;
8
9#[derive(Debug, Clone)]
15pub struct KylesLambdaCalculator {
16 window_size: usize,
18 price_changes: Vec<f64>,
20 volume_imbalances: Vec<f64>,
22 buffer_position: usize,
24 samples_count: usize,
26 previous_price: Option<f64>,
28 interval_buy_volume: f64,
30 interval_sell_volume: f64,
32 last_interval_time: u64,
34 interval_duration_ns: u64,
36}
37
38impl KylesLambdaCalculator {
39 #[must_use]
45 pub fn new(window_size: usize, interval_duration_ms: u64) -> Self {
46 Self {
47 window_size,
48 price_changes: vec![0.0; window_size],
49 volume_imbalances: vec![0.0; window_size],
50 buffer_position: 0,
51 samples_count: 0,
52 previous_price: None,
53 interval_buy_volume: 0.0,
54 interval_sell_volume: 0.0,
55 last_interval_time: 0,
56 interval_duration_ns: interval_duration_ms * 1_000_000,
57 }
58 }
59
60 pub fn update_trade(&mut self, trade: &TradeTick) -> Option<f64> {
62 let timestamp = trade.timestamp_ns;
63 let price = decimal_to_f64_or_nan(trade.price);
64 let quantity = decimal_to_f64_or_nan(trade.quantity);
65
66 if self.last_interval_time == 0 {
68 self.last_interval_time = timestamp;
69 self.previous_price = Some(price);
70 }
71
72 if trade.side == TradeSide::Buy {
74 self.interval_buy_volume += quantity;
75 } else {
76 self.interval_sell_volume += quantity;
77 }
78
79 if timestamp >= self.last_interval_time + self.interval_duration_ns
81 && let Some(prev_price) = self.previous_price
82 {
83 let price_change = price - prev_price;
85
86 let volume_imbalance = self.interval_buy_volume - self.interval_sell_volume;
88
89 self.price_changes[self.buffer_position] = price_change;
91 self.volume_imbalances[self.buffer_position] = volume_imbalance;
92
93 self.buffer_position = (self.buffer_position + 1) % self.window_size;
95 self.samples_count = self.samples_count.saturating_add(1).min(self.window_size);
96
97 self.interval_buy_volume = 0.0;
99 self.interval_sell_volume = 0.0;
100 self.last_interval_time = timestamp;
101 self.previous_price = Some(price);
102
103 return self.calculate_lambda();
105 }
106
107 None
108 }
109 pub fn add_trade(&mut self, trade: &TradeTick) {
111 self.update_trade(trade);
112 }
113
114 pub fn calculate(&self) -> f64 {
116 self.calculate_lambda().unwrap_or(0.0)
117 }
118
119 fn calculate_lambda(&self) -> Option<f64> {
124 if self.samples_count < 10 {
125 return None; }
127
128 let n = self.samples_count;
129
130 let mut sum_x = 0.0;
132 let mut sum_y = 0.0;
133
134 for i in 0..n {
135 sum_x += self.volume_imbalances[i];
136 sum_y += self.price_changes[i];
137 }
138
139 let mean_x = sum_x / n as f64;
140 let mean_y = sum_y / n as f64;
141
142 let mut numerator = 0.0;
144 let mut denominator = 0.0;
145
146 for i in 0..n {
147 let x_diff = self.volume_imbalances[i] - mean_x;
148 let y_diff = self.price_changes[i] - mean_y;
149 numerator += x_diff * y_diff;
150 denominator += x_diff * x_diff;
151 }
152
153 if denominator.abs() < 1e-10 {
154 return None; }
156
157 Some(numerator / denominator)
158 }
159
160 pub fn get_lambda(&self) -> Option<f64> {
162 self.calculate_lambda()
163 }
164
165 pub fn reset(&mut self) {
167 self.price_changes.fill(0.0);
168 self.volume_imbalances.fill(0.0);
169 self.buffer_position = 0;
170 self.samples_count = 0;
171 self.previous_price = None;
172 self.interval_buy_volume = 0.0;
173 self.interval_sell_volume = 0.0;
174 self.last_interval_time = 0;
175 }
176}
177
178#[derive(Debug, Clone)]
180pub struct MarketImpactAnalyzer {
181 kyles_lambda: KylesLambdaCalculator,
183 temp_impact_halflife: f64,
185 permanent_impact_ratio: f64,
187 recent_impacts: SmallVec<[(u64, f64, f64); 32]>, }
190
191impl MarketImpactAnalyzer {
192 #[must_use]
194 pub fn new(window_size: usize, interval_duration_ms: u64) -> Self {
195 Self {
196 kyles_lambda: KylesLambdaCalculator::new(window_size, interval_duration_ms),
197 temp_impact_halflife: 300_000.0, permanent_impact_ratio: 0.3, recent_impacts: SmallVec::new(),
200 }
201 }
202
203 pub fn update(&mut self, trade: &TradeTick) -> MarketImpactMetrics {
205 let kyles_lambda = self.kyles_lambda.update_trade(trade);
207
208 let instantaneous_impact = if let Some(lambda) = kyles_lambda {
210 let signed_volume = if trade.side == TradeSide::Buy {
211 decimal_to_f64_or_nan(trade.quantity)
212 } else {
213 -decimal_to_f64_or_nan(trade.quantity)
214 };
215 lambda * signed_volume
216 } else {
217 0.0
218 };
219
220 if instantaneous_impact.abs() > 1e-10 {
222 self.recent_impacts.push((
223 trade.timestamp_ns,
224 instantaneous_impact,
225 decimal_to_f64_or_nan(trade.quantity),
226 ));
227
228 let cutoff_time = trade.timestamp_ns.saturating_sub(3_600_000_000_000);
230 self.recent_impacts.retain(|(ts, _, _)| *ts > cutoff_time);
231 }
232
233 let cumulative_temp_impact = self.calculate_cumulative_impact(trade.timestamp_ns);
235
236 MarketImpactMetrics {
237 kyles_lambda,
238 instantaneous_impact,
239 cumulative_temp_impact,
240 permanent_impact: instantaneous_impact * self.permanent_impact_ratio,
241 }
242 }
243
244 fn calculate_cumulative_impact(&self, current_time: u64) -> f64 {
246 let mut total_impact = 0.0;
247 let decay_rate = 0.693 / self.temp_impact_halflife; for &(timestamp, impact, _) in &self.recent_impacts {
250 let time_elapsed = (current_time - timestamp) as f64 / 1_000_000.0; let decay_factor = (-decay_rate * time_elapsed).exp();
252 total_impact += impact * decay_factor * (1.0 - self.permanent_impact_ratio);
253 }
254
255 total_impact
256 }
257
258 pub fn reset(&mut self) {
260 self.kyles_lambda.reset();
261 self.recent_impacts.clear();
262 }
263}
264
265#[derive(Debug, Clone, Copy)]
267pub struct MarketImpactMetrics {
268 pub kyles_lambda: Option<f64>,
270 pub instantaneous_impact: f64,
272 pub cumulative_temp_impact: f64,
274 pub permanent_impact: f64,
276}
277
278pub mod simd {
280 use wide::f64x4;
281
282 pub fn calculate_kyles_lambda_batch(
294 price_changes: &[&[f64]],
295 volume_imbalances: &[&[f64]],
296 ) -> Vec<f64> {
297 let n_symbols = price_changes.len();
298 let mut results = vec![0.0; n_symbols];
299
300 let chunks = n_symbols / 4;
302
303 for chunk in 0..chunks {
304 let base_idx = chunk * 4;
305
306 if base_idx + 3 >= n_symbols {
308 break;
309 }
310
311 let n_samples = price_changes[base_idx].len();
313 if n_samples == 0 {
314 continue;
315 }
316
317 let mut sum_x = f64x4::ZERO;
319 let mut sum_y = f64x4::ZERO;
320
321 for i in 0..n_samples {
322 let x_vec = f64x4::from([
324 volume_imbalances[base_idx][i],
325 volume_imbalances[base_idx + 1][i],
326 volume_imbalances[base_idx + 2][i],
327 volume_imbalances[base_idx + 3][i],
328 ]);
329
330 let y_vec = f64x4::from([
332 price_changes[base_idx][i],
333 price_changes[base_idx + 1][i],
334 price_changes[base_idx + 2][i],
335 price_changes[base_idx + 3][i],
336 ]);
337
338 sum_x += x_vec;
340 sum_y += y_vec;
341 }
342
343 let n_vec = f64x4::splat(n_samples as f64);
345 let mean_x = sum_x / n_vec;
346 let mean_y = sum_y / n_vec;
347
348 let mut numerator = f64x4::ZERO;
350 let mut denominator = f64x4::ZERO;
351
352 for i in 0..n_samples {
353 let x_vec = f64x4::from([
355 volume_imbalances[base_idx][i],
356 volume_imbalances[base_idx + 1][i],
357 volume_imbalances[base_idx + 2][i],
358 volume_imbalances[base_idx + 3][i],
359 ]);
360
361 let y_vec = f64x4::from([
362 price_changes[base_idx][i],
363 price_changes[base_idx + 1][i],
364 price_changes[base_idx + 2][i],
365 price_changes[base_idx + 3][i],
366 ]);
367
368 let x_diff = x_vec - mean_x;
370 let y_diff = y_vec - mean_y;
371
372 let xy_prod = x_diff * y_diff;
374 let xx_prod = x_diff * x_diff;
375
376 numerator += xy_prod;
377 denominator += xx_prod;
378 }
379
380 let lambda = numerator / denominator;
382
383 let lambda_array = lambda.as_array_ref();
385 results[base_idx] = lambda_array[0];
386 results[base_idx + 1] = lambda_array[1];
387 results[base_idx + 2] = lambda_array[2];
388 results[base_idx + 3] = lambda_array[3];
389
390 for j in 0..4 {
392 if results[base_idx + j].is_nan() || results[base_idx + j].is_infinite() {
393 results[base_idx + j] = 0.0;
394 }
395 }
396 }
397
398 for i in (chunks * 4)..n_symbols {
400 results[i] = calculate_single_lambda(price_changes[i], volume_imbalances[i]);
401 }
402
403 results
404 }
405
406 fn calculate_single_lambda(price_changes: &[f64], volume_imbalances: &[f64]) -> f64 {
411 if price_changes.is_empty() || volume_imbalances.is_empty() {
412 return 0.0;
413 }
414
415 if price_changes.len() != volume_imbalances.len() {
416 return 0.0;
417 }
418
419 let _n = price_changes.len() as f64;
420
421 let sum_x: f64 = volume_imbalances.iter().filter(|x| x.is_finite()).sum();
423 let sum_y: f64 = price_changes.iter().filter(|y| y.is_finite()).sum();
424 let valid_count = price_changes
425 .iter()
426 .zip(volume_imbalances.iter())
427 .filter(|(y, x)| y.is_finite() && x.is_finite())
428 .count() as f64;
429
430 if valid_count < 2.0 {
431 return 0.0; }
433
434 let mean_x = sum_x / valid_count;
435 let mean_y = sum_y / valid_count;
436
437 let mut numerator = 0.0;
438 let mut denominator = 0.0;
439
440 for i in 0..price_changes.len() {
442 let x = volume_imbalances[i];
443 let y = price_changes[i];
444
445 if x.is_finite() && y.is_finite() {
446 let x_diff = x - mean_x;
447 let y_diff = y - mean_y;
448 numerator += x_diff * y_diff;
449 denominator += x_diff * x_diff;
450 }
451 }
452
453 if denominator.abs() < 1e-10 || !denominator.is_finite() {
454 0.0
455 } else {
456 let result = numerator / denominator;
457 if result.is_finite() { result } else { 0.0 }
458 }
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use super::*;
465 use rust_decimal::Decimal;
466 use rust_decimal_macros::dec;
467
468 #[test]
469 fn test_kyles_lambda_basic() {
470 let mut calculator = KylesLambdaCalculator::new(20, 100); let base_time = 1_000_000_000u64;
474 let mut last_lambda = None;
475
476 for interval in 0..15 {
478 let interval_time = base_time + (interval as u64) * 200_000_000; let price_base = dec!(100.0) + Decimal::from(interval) / Decimal::from(10); let (side, volume) = if interval % 2 == 0 {
483 (TradeSide::Buy, dec!(100.0)) } else {
485 (TradeSide::Sell, dec!(50.0)) };
487
488 for j in 0..3 {
490 let trade = TradeTick {
491 symbol: "TEST".into(),
492 timestamp_ns: interval_time + j * 20_000_000, side,
494 price: price_base,
495 quantity: volume / Decimal::from(3),
496 };
497 let lambda = calculator.update_trade(&trade);
498 if lambda.is_some() {
499 last_lambda = lambda;
500 }
501 }
502 }
503
504 assert!(
506 last_lambda.is_some(),
507 "Expected Kyle's Lambda to be calculated after 15 intervals"
508 );
509 }
510
511 #[test]
512 fn test_market_impact_analyzer() {
513 let mut analyzer = MarketImpactAnalyzer::new(20, 1000);
514
515 let trade = TradeTick {
516 symbol: "TEST".into(),
517 timestamp_ns: 1_000_000_000,
518 side: TradeSide::Buy,
519 price: dec!(100.0),
520 quantity: dec!(50.0),
521 };
522
523 let metrics = analyzer.update(&trade);
524
525 assert!(metrics.kyles_lambda.is_none());
527 assert_eq!(metrics.instantaneous_impact, 0.0);
528 }
529
530 #[test]
531 fn test_simd_batch_calculation() {
532 use crate::features::market_impact::simd::calculate_kyles_lambda_batch;
533
534 let price_changes = [
536 vec![0.1, 0.2, -0.1, 0.3, -0.2],
537 vec![0.15, 0.25, -0.05, 0.35, -0.15],
538 vec![0.05, 0.15, -0.15, 0.25, -0.25],
539 vec![0.2, 0.3, -0.2, 0.4, -0.1],
540 ];
541
542 let volume_imbalances = [
543 vec![100.0, 200.0, -100.0, 300.0, -200.0],
544 vec![150.0, 250.0, -50.0, 350.0, -150.0],
545 vec![50.0, 150.0, -150.0, 250.0, -250.0],
546 vec![200.0, 300.0, -200.0, 400.0, -100.0],
547 ];
548
549 let price_refs: Vec<&[f64]> = price_changes.iter().map(|v| v.as_slice()).collect();
550 let volume_refs: Vec<&[f64]> = volume_imbalances.iter().map(|v| v.as_slice()).collect();
551
552 let results = calculate_kyles_lambda_batch(&price_refs, &volume_refs);
554
555 assert_eq!(results.len(), 4);
556 for lambda in &results {
558 assert!(*lambda > 0.0, "Lambda should be positive, got: {lambda}");
559 }
560 }
561
562 #[test]
563 fn test_simd_nan_safety() {
564 use crate::features::market_impact::simd::calculate_kyles_lambda_batch;
565
566 let price_changes = [
568 vec![0.1, f64::NAN, -0.1, 0.3, -0.2],
569 vec![0.15, 0.25, -0.05, f64::INFINITY, -0.15],
570 vec![f64::NEG_INFINITY, 0.15, -0.15, 0.25, -0.25],
571 vec![0.2, 0.3, -0.2, 0.4, f64::NAN],
572 ];
573
574 let volume_imbalances = [
575 vec![100.0, 200.0, f64::NAN, 300.0, -200.0],
576 vec![150.0, f64::INFINITY, -50.0, 350.0, -150.0],
577 vec![50.0, 150.0, -150.0, f64::NEG_INFINITY, -250.0],
578 vec![f64::NAN, 300.0, -200.0, 400.0, -100.0],
579 ];
580
581 let price_refs: Vec<&[f64]> = price_changes.iter().map(|v| v.as_slice()).collect();
582 let volume_refs: Vec<&[f64]> = volume_imbalances.iter().map(|v| v.as_slice()).collect();
583
584 let results = calculate_kyles_lambda_batch(&price_refs, &volume_refs);
586
587 assert_eq!(results.len(), 4);
588 for (i, lambda) in results.iter().enumerate() {
590 assert!(
591 lambda.is_finite(),
592 "Lambda[{i}] should be finite, got: {lambda}"
593 );
594 }
595 }
596}