1use parking_lot::RwLock;
20use rust_decimal::Decimal;
21use rusty_common::collections::FxHashMap;
22use rusty_common::decimal_utils::decimal_to_f64_or_nan;
23use smallvec::SmallVec;
24use std::sync::Arc;
25
26pub trait DualModeFeature<const N: usize = 32, const T: usize = 16>: Send + Sync {
117 fn calculate_incremental(&mut self, update: &MarketUpdate<N, T>) -> FeatureValue;
119
120 fn calculate_batch(&self, data: &[MarketData]) -> Vec<FeatureValue>;
122
123 fn name(&self) -> &'static str;
125
126 fn reset(&mut self);
128}
129
130#[derive(Debug, Clone)]
178pub struct MarketUpdate<const N: usize = 32, const T: usize = 16> {
179 pub timestamp_ns: u64,
181 pub symbol: String,
183 pub bid_prices: SmallVec<[Decimal; N]>,
185 pub bid_quantities: SmallVec<[Decimal; N]>,
187 pub ask_prices: SmallVec<[Decimal; N]>,
189 pub ask_quantities: SmallVec<[Decimal; N]>,
191 pub trades: SmallVec<[TradeUpdate; T]>,
193}
194
195#[derive(Debug, Clone)]
198pub struct TradeUpdate {
199 pub price: Decimal,
201 pub quantity: Decimal,
203 pub side: TradeSide,
205 pub timestamp_ns: u64,
207}
208
209#[derive(Debug, Clone, Copy)]
211pub enum TradeSide {
212 Buy,
214 Sell,
216}
217
218#[derive(Debug, Clone)]
221pub struct MarketData {
222 pub timestamp_ns: u64,
224 pub symbol: String,
226 pub bid_prices: Vec<Decimal>,
228 pub bid_quantities: Vec<Decimal>,
230 pub ask_prices: Vec<Decimal>,
232 pub ask_quantities: Vec<Decimal>,
234 pub trades: Vec<TradeUpdate>,
236}
237
238#[derive(Debug, Clone)]
241pub enum FeatureValue {
242 Scalar(f64),
244 Vector(Vec<f64>),
246 Matrix(Vec<Vec<f64>>),
248}
249
250pub struct DualModeOFI<const N: usize = 32> {
253 window_size: usize,
255 prev_bid_quantities: SmallVec<[Decimal; N]>,
257 prev_ask_quantities: SmallVec<[Decimal; N]>,
259 ofi_buffer: Vec<f64>,
261 buffer_idx: usize,
263}
264
265impl<const N: usize> DualModeOFI<N> {
266 #[must_use]
268 pub fn new(window_size: usize) -> Self {
269 Self {
270 window_size,
271 prev_bid_quantities: SmallVec::new(),
272 prev_ask_quantities: SmallVec::new(),
273 ofi_buffer: vec![0.0; window_size],
274 buffer_idx: 0,
275 }
276 }
277}
278
279impl<const N: usize> DualModeFeature<N, 16> for DualModeOFI<N> {
280 fn calculate_incremental(&mut self, update: &MarketUpdate<N, 16>) -> FeatureValue {
281 let mut ofi = 0.0;
283
284 if !self.prev_bid_quantities.is_empty() && !self.prev_ask_quantities.is_empty() {
285 let levels = update.bid_quantities.len().min(update.ask_quantities.len());
287
288 for i in 0..levels {
289 if i < self.prev_bid_quantities.len() && i < self.prev_ask_quantities.len() {
290 let bid_delta = update.bid_quantities[i] - self.prev_bid_quantities[i];
291 let ask_delta = update.ask_quantities[i] - self.prev_ask_quantities[i];
292 let delta_value =
293 rusty_common::decimal_utils::decimal_to_f64_or_nan(bid_delta - ask_delta);
294
295 if !delta_value.is_nan() {
297 ofi += delta_value;
298 }
299 }
300 }
301 }
302
303 self.ofi_buffer[self.buffer_idx] = ofi;
305 self.buffer_idx = (self.buffer_idx + 1) % self.window_size;
306
307 self.prev_bid_quantities = update.bid_quantities.clone();
309 self.prev_ask_quantities = update.ask_quantities.clone();
310
311 let valid_values: Vec<f64> = self
313 .ofi_buffer
314 .iter()
315 .filter(|&&v| !v.is_nan())
316 .copied()
317 .collect();
318
319 let avg_ofi = if valid_values.is_empty() {
320 0.0 } else {
322 valid_values.iter().sum::<f64>() / valid_values.len() as f64
323 };
324
325 FeatureValue::Scalar(avg_ofi)
326 }
327
328 fn calculate_batch(&self, data: &[MarketData]) -> Vec<FeatureValue> {
329 let mut results = Vec::with_capacity(data.len());
330 let mut prev_bid_quantities: Option<&Vec<Decimal>> = None;
331 let mut prev_ask_quantities: Option<&Vec<Decimal>> = None;
332
333 for market_data in data {
334 let mut ofi = 0.0;
335
336 if let (Some(prev_bids), Some(prev_asks)) = (prev_bid_quantities, prev_ask_quantities) {
337 let levels = market_data
338 .bid_quantities
339 .len()
340 .min(market_data.ask_quantities.len());
341
342 for i in 0..levels {
343 if i < prev_bids.len() && i < prev_asks.len() {
344 let bid_delta = market_data.bid_quantities[i] - prev_bids[i];
345 let ask_delta = market_data.ask_quantities[i] - prev_asks[i];
346 let delta_value = rusty_common::decimal_utils::decimal_to_f64_or_nan(
347 bid_delta - ask_delta,
348 );
349
350 if !delta_value.is_nan() {
352 ofi += delta_value;
353 }
354 }
355 }
356 }
357
358 results.push(FeatureValue::Scalar(ofi));
359
360 prev_bid_quantities = Some(&market_data.bid_quantities);
361 prev_ask_quantities = Some(&market_data.ask_quantities);
362 }
363
364 results
365 }
366
367 fn name(&self) -> &'static str {
368 "OFI"
369 }
370
371 fn reset(&mut self) {
372 self.prev_bid_quantities.clear();
373 self.prev_ask_quantities.clear();
374 self.ofi_buffer.fill(0.0);
375 self.buffer_idx = 0;
376 }
377}
378
379pub struct DualModeVPIN {
381 bucket_size: usize,
383 volume_buckets: Vec<(f64, f64)>,
385 current_bucket_volume: f64,
387 current_buy_volume: f64,
389 current_sell_volume: f64,
391}
392
393impl DualModeVPIN {
394 #[must_use]
396 pub const fn new(bucket_size: usize) -> Self {
397 Self {
398 bucket_size,
399 volume_buckets: Vec::new(),
400 current_bucket_volume: 0.0,
401 current_buy_volume: 0.0,
402 current_sell_volume: 0.0,
403 }
404 }
405}
406
407impl DualModeFeature for DualModeVPIN {
408 fn calculate_incremental(&mut self, update: &MarketUpdate) -> FeatureValue {
409 for trade in &update.trades {
411 let volume = rusty_common::decimal_utils::decimal_to_f64_or_nan(trade.quantity);
412
413 match trade.side {
414 TradeSide::Buy => self.current_buy_volume += volume,
415 TradeSide::Sell => self.current_sell_volume += volume,
416 }
417
418 self.current_bucket_volume += volume;
419
420 if self.current_bucket_volume >= self.bucket_size as f64 {
422 self.volume_buckets
423 .push((self.current_buy_volume, self.current_sell_volume));
424
425 if self.volume_buckets.len() > 50 {
427 self.volume_buckets.remove(0);
428 }
429
430 self.current_bucket_volume = 0.0;
432 self.current_buy_volume = 0.0;
433 self.current_sell_volume = 0.0;
434 }
435 }
436
437 if self.volume_buckets.is_empty() {
439 return FeatureValue::Scalar(0.0);
440 }
441
442 let mut vpin_sum = 0.0;
443 for (buy_vol, sell_vol) in &self.volume_buckets {
444 let total = buy_vol + sell_vol;
445 if total > 0.0 {
446 vpin_sum += ((buy_vol - sell_vol) / total).abs();
447 }
448 }
449
450 let vpin = vpin_sum / self.volume_buckets.len() as f64;
451 FeatureValue::Scalar(vpin)
452 }
453
454 fn calculate_batch(&self, data: &[MarketData]) -> Vec<FeatureValue> {
455 let mut results = Vec::with_capacity(data.len());
456 let mut volume_buckets = Vec::new();
457 let mut current_bucket_volume = 0.0;
458 let mut current_buy_volume = 0.0;
459 let mut current_sell_volume = 0.0;
460
461 for market_data in data {
462 for trade in &market_data.trades {
464 let volume = rusty_common::decimal_utils::decimal_to_f64_or_nan(trade.quantity);
465
466 match trade.side {
467 TradeSide::Buy => current_buy_volume += volume,
468 TradeSide::Sell => current_sell_volume += volume,
469 }
470
471 current_bucket_volume += volume;
472
473 if current_bucket_volume >= self.bucket_size as f64 {
474 volume_buckets.push((current_buy_volume, current_sell_volume));
475 current_bucket_volume = 0.0;
476 current_buy_volume = 0.0;
477 current_sell_volume = 0.0;
478 }
479 }
480
481 let vpin = if volume_buckets.is_empty() {
483 0.0
484 } else {
485 let mut vpin_sum = 0.0;
486 let start_idx = volume_buckets.len().saturating_sub(50);
487
488 for (buy_vol, sell_vol) in volume_buckets.iter().skip(start_idx) {
489 let (buy_vol, sell_vol) = (*buy_vol, *sell_vol);
490 let total = buy_vol + sell_vol;
491 if total > 0.0 {
492 vpin_sum += ((buy_vol - sell_vol) / total).abs();
493 }
494 }
495
496 vpin_sum / (volume_buckets.len() - start_idx) as f64
497 };
498
499 results.push(FeatureValue::Scalar(vpin));
500 }
501
502 results
503 }
504
505 fn name(&self) -> &'static str {
506 "VPIN"
507 }
508
509 fn reset(&mut self) {
510 self.volume_buckets.clear();
511 self.current_bucket_volume = 0.0;
512 self.current_buy_volume = 0.0;
513 self.current_sell_volume = 0.0;
514 }
515}
516
517pub struct DualModeKyleLambda {
520 window_size: usize,
522 price_changes: Vec<f64>,
524 order_flows: Vec<f64>,
526}
527
528impl DualModeKyleLambda {
529 #[must_use]
531 pub fn new(window_size: usize) -> Self {
532 Self {
533 window_size,
534 price_changes: Vec::with_capacity(window_size),
535 order_flows: Vec::with_capacity(window_size),
536 }
537 }
538
539 fn calculate_lambda(price_changes: &[f64], order_flows: &[f64]) -> Option<f64> {
540 if price_changes.len() != order_flows.len() || price_changes.len() < 2 {
541 return None;
542 }
543
544 let n = price_changes.len() as f64;
545 let sum_flows: f64 = order_flows.iter().sum();
546 let sum_prices: f64 = price_changes.iter().sum();
547 let sum_cross_product: f64 = order_flows
548 .iter()
549 .zip(price_changes.iter())
550 .map(|(x, y)| x * y)
551 .sum();
552 let sum_flows_squared: f64 = order_flows.iter().map(|x| x * x).sum();
553
554 let denominator = n.mul_add(sum_flows_squared, -(sum_flows * sum_flows));
555 if denominator.abs() < 1e-10 {
556 None
557 } else {
558 Some(n.mul_add(sum_cross_product, -(sum_flows * sum_prices)) / denominator)
559 }
560 }
561}
562
563impl DualModeFeature for DualModeKyleLambda {
564 fn calculate_incremental(&mut self, update: &MarketUpdate) -> FeatureValue {
565 if !update.bid_prices.is_empty() && !update.ask_prices.is_empty() {
567 let mid_price = (update.bid_prices[0] + update.ask_prices[0]) / Decimal::from(2);
568
569 let mut order_flow = 0.0;
571 for trade in &update.trades {
572 let signed_volume = match trade.side {
573 TradeSide::Buy => {
574 rusty_common::decimal_utils::decimal_to_f64_or_nan(trade.quantity)
575 }
576 TradeSide::Sell => {
577 -rusty_common::decimal_utils::decimal_to_f64_or_nan(trade.quantity)
578 }
579 };
580 order_flow += signed_volume;
581 }
582
583 if !self.price_changes.is_empty() || !self.order_flows.is_empty() {
585 let prev_mid = if let Some(last_flow) = self.order_flows.last() {
586 (*last_flow).mul_add(
588 -0.0001,
589 rusty_common::decimal_utils::decimal_to_f64_or_nan(mid_price),
590 )
591 } else {
592 rusty_common::decimal_utils::decimal_to_f64_or_nan(mid_price)
593 };
594
595 let price_change =
596 rusty_common::decimal_utils::decimal_to_f64_or_nan(mid_price) - prev_mid;
597
598 self.price_changes.push(price_change);
599 self.order_flows.push(order_flow);
600
601 if self.price_changes.len() > self.window_size {
603 self.price_changes.remove(0);
604 self.order_flows.remove(0);
605 }
606 } else if order_flow != 0.0 {
607 self.order_flows.push(order_flow);
609 }
610 }
611
612 let lambda = Self::calculate_lambda(&self.price_changes, &self.order_flows).unwrap_or(0.0);
614
615 FeatureValue::Scalar(lambda)
616 }
617
618 fn calculate_batch(&self, data: &[MarketData]) -> Vec<FeatureValue> {
619 let mut results = Vec::with_capacity(data.len());
620 let mut price_changes = Vec::new();
621 let mut order_flows = Vec::new();
622 let mut prev_mid_price = None;
623
624 for market_data in data {
625 let mid_price =
627 if !market_data.bid_prices.is_empty() && !market_data.ask_prices.is_empty() {
628 (market_data.bid_prices[0] + market_data.ask_prices[0]) / Decimal::from(2)
629 } else {
630 Decimal::ZERO
631 };
632
633 let mut order_flow = 0.0;
635 for trade in &market_data.trades {
636 let signed_volume = match trade.side {
637 TradeSide::Buy => {
638 rusty_common::decimal_utils::decimal_to_f64_or_nan(trade.quantity)
639 }
640 TradeSide::Sell => {
641 -rusty_common::decimal_utils::decimal_to_f64_or_nan(trade.quantity)
642 }
643 };
644 order_flow += signed_volume;
645 }
646
647 if let Some(prev_mid) = prev_mid_price {
649 let price_diff: Decimal = mid_price - prev_mid;
650 let price_change = rusty_common::decimal_utils::decimal_to_f64_or_nan(price_diff);
651 price_changes.push(price_change);
652 order_flows.push(order_flow);
653
654 if price_changes.len() > self.window_size {
656 price_changes.remove(0);
657 order_flows.remove(0);
658 }
659 }
660
661 prev_mid_price = Some(mid_price);
662
663 let lambda = Self::calculate_lambda(&price_changes, &order_flows).unwrap_or(0.0);
665
666 results.push(FeatureValue::Scalar(lambda));
667 }
668
669 results
670 }
671
672 fn name(&self) -> &'static str {
673 "KyleLambda"
674 }
675
676 fn reset(&mut self) {
677 self.price_changes.clear();
678 self.order_flows.clear();
679 }
680}
681
682pub struct DualModeBookSlope {
685 depth: usize,
687}
688
689impl DualModeBookSlope {
690 #[must_use]
691 pub const fn new(depth: usize) -> Self {
693 Self { depth }
694 }
695
696 fn calculate_slope(
697 bid_prices: &[Decimal],
698 bid_quantities: &[Decimal],
699 ask_prices: &[Decimal],
700 ask_quantities: &[Decimal],
701 depth: usize,
702 ) -> f64 {
703 let depth = depth.min(bid_prices.len()).min(ask_prices.len());
704 if depth == 0 {
705 return 0.0;
706 }
707
708 let mut bid_weighted_sum = 0.0;
710 let mut bid_qty_sum = 0.0;
711 let mut ask_weighted_sum = 0.0;
712 let mut ask_qty_sum = 0.0;
713
714 for i in 0..depth {
715 let bid_price = decimal_to_f64_or_nan(bid_prices[i]);
716 let bid_qty = decimal_to_f64_or_nan(bid_quantities[i]);
717 let ask_price = decimal_to_f64_or_nan(ask_prices[i]);
718 let ask_qty = decimal_to_f64_or_nan(ask_quantities[i]);
719
720 bid_weighted_sum += bid_price * bid_qty;
721 bid_qty_sum += bid_qty;
722 ask_weighted_sum += ask_price * ask_qty;
723 ask_qty_sum += ask_qty;
724 }
725
726 if bid_qty_sum == 0.0 || ask_qty_sum == 0.0 {
727 return 0.0;
728 }
729
730 let bid_weighted_avg = bid_weighted_sum / bid_qty_sum;
731 let ask_weighted_avg = ask_weighted_sum / ask_qty_sum;
732
733 (ask_weighted_avg - bid_weighted_avg) / depth as f64
734 }
735}
736
737impl DualModeFeature for DualModeBookSlope {
738 fn calculate_incremental(&mut self, update: &MarketUpdate) -> FeatureValue {
739 let slope = Self::calculate_slope(
740 &update.bid_prices,
741 &update.bid_quantities,
742 &update.ask_prices,
743 &update.ask_quantities,
744 self.depth,
745 );
746
747 FeatureValue::Scalar(slope)
748 }
749
750 fn calculate_batch(&self, data: &[MarketData]) -> Vec<FeatureValue> {
751 data.iter()
752 .map(|market_data| {
753 let slope = Self::calculate_slope(
754 &market_data.bid_prices,
755 &market_data.bid_quantities,
756 &market_data.ask_prices,
757 &market_data.ask_quantities,
758 self.depth,
759 );
760 FeatureValue::Scalar(slope)
761 })
762 .collect()
763 }
764
765 fn name(&self) -> &'static str {
766 "BookSlope"
767 }
768
769 fn reset(&mut self) {
770 }
772}
773
774const SMALLVEC_OVERHEAD_BYTES: usize = 24;
776
777const DECIMAL_SIZE_BYTES: usize = 16;
779
780const BOXING_THRESHOLD_BYTES: usize = 1024;
792
793const fn calculate_ofi_size<const N: usize>() -> usize {
809 let base_size = 8 + 24 + 8 + (2 * SMALLVEC_OVERHEAD_BYTES); let inline_storage = 2 * N * DECIMAL_SIZE_BYTES; base_size + inline_storage
816}
817
818const _: () = {
825 assert!(
827 calculate_ofi_size::<5>() < 512,
828 "OFI<5> size calculation seems wrong"
829 );
830 assert!(
831 calculate_ofi_size::<128>() > 2048,
832 "OFI<128> size calculation seems wrong"
833 );
834
835 assert!(
837 calculate_ofi_size::<5>() < BOXING_THRESHOLD_BYTES,
838 "Cap5 should not be boxed"
839 );
840 assert!(
841 calculate_ofi_size::<8>() < BOXING_THRESHOLD_BYTES,
842 "Cap8 should not be boxed"
843 );
844 assert!(
845 calculate_ofi_size::<16>() < BOXING_THRESHOLD_BYTES,
846 "Cap16 should not be boxed"
847 );
848 assert!(
849 calculate_ofi_size::<32>() >= BOXING_THRESHOLD_BYTES,
850 "Cap32 should be boxed"
851 );
852 assert!(
853 calculate_ofi_size::<64>() >= BOXING_THRESHOLD_BYTES,
854 "Cap64 should be boxed"
855 );
856 assert!(
857 calculate_ofi_size::<128>() >= BOXING_THRESHOLD_BYTES,
858 "Cap128 should be boxed"
859 );
860};
861
862enum DualModeOFIWrapper {
887 Cap5(DualModeOFI<5>),
890
891 Cap8(DualModeOFI<8>),
894
895 Cap16(DualModeOFI<16>),
898
899 Cap32(Box<DualModeOFI<32>>),
902
903 Cap64(Box<DualModeOFI<64>>),
906
907 Cap128(Box<DualModeOFI<128>>),
910}
911
912impl DualModeOFIWrapper {
913 pub fn new(capacity: usize, window_size: usize) -> Self {
914 match capacity {
915 5 => Self::Cap5(DualModeOFI::<5>::new(window_size)),
916 8 => Self::Cap8(DualModeOFI::<8>::new(window_size)),
917 16 => Self::Cap16(DualModeOFI::<16>::new(window_size)),
918 32 => Self::Cap32(Box::new(DualModeOFI::<32>::new(window_size))),
919 64 => Self::Cap64(Box::new(DualModeOFI::<64>::new(window_size))),
920 128 => Self::Cap128(Box::new(DualModeOFI::<128>::new(window_size))),
921 _ => {
922 if capacity <= 5 {
925 Self::Cap5(DualModeOFI::<5>::new(window_size))
926 } else if capacity <= 8 {
927 Self::Cap8(DualModeOFI::<8>::new(window_size))
928 } else if capacity <= 16 {
929 Self::Cap16(DualModeOFI::<16>::new(window_size))
930 } else if capacity <= 32 {
931 Self::Cap32(Box::new(DualModeOFI::<32>::new(window_size)))
932 } else if capacity <= 64 {
933 Self::Cap64(Box::new(DualModeOFI::<64>::new(window_size)))
934 } else {
935 Self::Cap128(Box::new(DualModeOFI::<128>::new(window_size)))
936 }
937 }
938 }
939 }
940}
941
942impl DualModeFeature for DualModeOFIWrapper {
943 fn calculate_incremental(&mut self, update: &MarketUpdate) -> FeatureValue {
944 match self {
945 Self::Cap5(ofi) => {
946 let converted_update = MarketUpdate::<5, 16> {
948 timestamp_ns: update.timestamp_ns,
949 symbol: update.symbol.clone(),
950 bid_prices: update.bid_prices.iter().take(5).copied().collect(),
951 bid_quantities: update.bid_quantities.iter().take(5).copied().collect(),
952 ask_prices: update.ask_prices.iter().take(5).copied().collect(),
953 ask_quantities: update.ask_quantities.iter().take(5).copied().collect(),
954 trades: update.trades.iter().take(16).cloned().collect(),
955 };
956 ofi.calculate_incremental(&converted_update)
957 }
958 Self::Cap8(ofi) => {
959 let converted_update = MarketUpdate::<8, 16> {
960 timestamp_ns: update.timestamp_ns,
961 symbol: update.symbol.clone(),
962 bid_prices: update.bid_prices.iter().take(8).copied().collect(),
963 bid_quantities: update.bid_quantities.iter().take(8).copied().collect(),
964 ask_prices: update.ask_prices.iter().take(8).copied().collect(),
965 ask_quantities: update.ask_quantities.iter().take(8).copied().collect(),
966 trades: update.trades.iter().take(16).cloned().collect(),
967 };
968 ofi.calculate_incremental(&converted_update)
969 }
970 Self::Cap16(ofi) => {
971 let converted_update = MarketUpdate::<16, 16> {
972 timestamp_ns: update.timestamp_ns,
973 symbol: update.symbol.clone(),
974 bid_prices: update.bid_prices.iter().take(16).copied().collect(),
975 bid_quantities: update.bid_quantities.iter().take(16).copied().collect(),
976 ask_prices: update.ask_prices.iter().take(16).copied().collect(),
977 ask_quantities: update.ask_quantities.iter().take(16).copied().collect(),
978 trades: update.trades.iter().take(16).cloned().collect(),
979 };
980 ofi.calculate_incremental(&converted_update)
981 }
982 Self::Cap32(ofi) => ofi.calculate_incremental(update),
983 Self::Cap64(ofi) => {
984 let converted_update = MarketUpdate::<64, 16> {
985 timestamp_ns: update.timestamp_ns,
986 symbol: update.symbol.clone(),
987 bid_prices: update.bid_prices.iter().copied().collect(),
988 bid_quantities: update.bid_quantities.iter().copied().collect(),
989 ask_prices: update.ask_prices.iter().copied().collect(),
990 ask_quantities: update.ask_quantities.iter().copied().collect(),
991 trades: update.trades.iter().take(16).cloned().collect(),
992 };
993 ofi.calculate_incremental(&converted_update)
994 }
995 Self::Cap128(ofi) => {
996 let converted_update = MarketUpdate::<128, 16> {
997 timestamp_ns: update.timestamp_ns,
998 symbol: update.symbol.clone(),
999 bid_prices: update.bid_prices.iter().copied().collect(),
1000 bid_quantities: update.bid_quantities.iter().copied().collect(),
1001 ask_prices: update.ask_prices.iter().copied().collect(),
1002 ask_quantities: update.ask_quantities.iter().copied().collect(),
1003 trades: update.trades.iter().take(16).cloned().collect(),
1004 };
1005 ofi.calculate_incremental(&converted_update)
1006 }
1007 }
1008 }
1009
1010 fn calculate_batch(&self, data: &[MarketData]) -> Vec<FeatureValue> {
1011 match self {
1012 Self::Cap5(ofi) => ofi.calculate_batch(data),
1013 Self::Cap8(ofi) => ofi.calculate_batch(data),
1014 Self::Cap16(ofi) => ofi.calculate_batch(data),
1015 Self::Cap32(ofi) => ofi.calculate_batch(data),
1016 Self::Cap64(ofi) => ofi.calculate_batch(data),
1017 Self::Cap128(ofi) => ofi.calculate_batch(data),
1018 }
1019 }
1020
1021 fn name(&self) -> &'static str {
1022 "OFI"
1023 }
1024
1025 fn reset(&mut self) {
1026 match self {
1027 Self::Cap5(ofi) => ofi.reset(),
1028 Self::Cap8(ofi) => ofi.reset(),
1029 Self::Cap16(ofi) => ofi.reset(),
1030 Self::Cap32(ofi) => ofi.reset(),
1031 Self::Cap64(ofi) => ofi.reset(),
1032 Self::Cap128(ofi) => ofi.reset(),
1033 }
1034 }
1035}
1036
1037pub struct FeatureEngine {
1039 features: FxHashMap<String, Box<dyn DualModeFeature>>,
1041 mode: CalculationMode,
1043 results_cache: Arc<RwLock<FxHashMap<String, Vec<FeatureValue>>>>,
1045}
1046
1047#[derive(Debug, Clone, Copy)]
1049pub enum CalculationMode {
1050 RealTime,
1052 Batch,
1054}
1055
1056impl FeatureEngine {
1057 #[must_use]
1058 pub fn new(mode: CalculationMode) -> Self {
1060 Self {
1061 features: FxHashMap::default(),
1062 mode,
1063 results_cache: Arc::new(RwLock::new(FxHashMap::default())),
1064 }
1065 }
1066
1067 pub fn add_feature(&mut self, name: String, feature: Box<dyn DualModeFeature>) {
1069 self.features.insert(name, feature);
1070 }
1071
1072 #[must_use]
1074 pub fn process_update(&mut self, update: &MarketUpdate) -> FxHashMap<String, FeatureValue> {
1075 let mut results = FxHashMap::default();
1076
1077 for (name, feature) in &mut self.features {
1078 let value = feature.calculate_incremental(update);
1079 results.insert(name.clone(), value);
1080 }
1081
1082 results
1083 }
1084
1085 #[must_use]
1087 pub fn process_batch(&self, data: &[MarketData]) -> FxHashMap<String, Vec<FeatureValue>> {
1088 let mut results = FxHashMap::default();
1089
1090 for (name, feature) in &self.features {
1091 let values = feature.calculate_batch(data);
1092 results.insert(name.clone(), values);
1093 }
1094
1095 *self.results_cache.write() = results.clone();
1097
1098 results
1099 }
1100
1101 pub fn switch_mode(&mut self, mode: CalculationMode) {
1103 self.mode = mode;
1104
1105 for feature in self.features.values_mut() {
1107 feature.reset();
1108 }
1109 }
1110
1111 #[must_use]
1113 pub fn get_cached_results(&self) -> FxHashMap<String, Vec<FeatureValue>> {
1114 self.results_cache.read().clone()
1115 }
1116}
1117
1118pub struct FeatureEngineBuilder {
1120 mode: CalculationMode,
1122 features: Vec<(String, Box<dyn DualModeFeature>)>,
1124}
1125
1126impl FeatureEngineBuilder {
1127 #[must_use]
1128 pub fn new(mode: CalculationMode) -> Self {
1130 Self {
1131 mode,
1132 features: Vec::new(),
1133 }
1134 }
1135
1136 #[must_use]
1170 pub fn with_ofi(mut self, window_size: usize, capacity: usize) -> Self {
1171 self.features.push((
1172 "OFI".to_string(),
1173 Box::new(DualModeOFIWrapper::new(capacity, window_size)),
1174 ));
1175 self
1176 }
1177
1178 #[must_use]
1183 pub fn with_ofi_default(self, window_size: usize) -> Self {
1184 self.with_ofi(window_size, 32)
1185 }
1186
1187 #[must_use]
1189 pub fn with_ofi_5(self, window_size: usize) -> Self {
1190 self.with_ofi(window_size, 5)
1191 }
1192
1193 #[must_use]
1195 pub fn with_ofi_8(self, window_size: usize) -> Self {
1196 self.with_ofi(window_size, 8)
1197 }
1198
1199 #[must_use]
1201 pub fn with_ofi_16(self, window_size: usize) -> Self {
1202 self.with_ofi(window_size, 16)
1203 }
1204
1205 #[must_use]
1207 pub fn with_ofi_32(self, window_size: usize) -> Self {
1208 self.with_ofi(window_size, 32)
1209 }
1210
1211 #[must_use]
1213 pub fn with_ofi_64(self, window_size: usize) -> Self {
1214 self.with_ofi(window_size, 64)
1215 }
1216
1217 #[must_use]
1219 pub fn with_ofi_128(self, window_size: usize) -> Self {
1220 self.with_ofi(window_size, 128)
1221 }
1222
1223 #[must_use]
1225 pub fn with_vpin(mut self, bucket_size: usize) -> Self {
1226 self.features
1227 .push(("VPIN".to_string(), Box::new(DualModeVPIN::new(bucket_size))));
1228 self
1229 }
1230
1231 #[must_use]
1233 pub fn with_kyle_lambda(mut self, window_size: usize) -> Self {
1234 self.features.push((
1235 "KyleLambda".to_string(),
1236 Box::new(DualModeKyleLambda::new(window_size)),
1237 ));
1238 self
1239 }
1240
1241 #[must_use]
1243 pub fn with_book_slope(mut self, depth: usize) -> Self {
1244 self.features.push((
1245 "BookSlope".to_string(),
1246 Box::new(DualModeBookSlope::new(depth)),
1247 ));
1248 self
1249 }
1250
1251 #[must_use]
1253 pub fn build(self) -> FeatureEngine {
1254 let mut engine = FeatureEngine::new(self.mode);
1255
1256 for (name, feature) in self.features {
1257 engine.add_feature(name, feature);
1258 }
1259
1260 engine
1261 }
1262}
1263
1264pub type MarketUpdate32 = MarketUpdate<32, 16>;
1270pub type MarketUpdate64 = MarketUpdate<64, 32>;
1272pub type MarketUpdate128 = MarketUpdate<128, 64>;
1274
1275pub type DualModeOFI5 = DualModeOFI<5>;
1277pub type DualModeOFI8 = DualModeOFI<8>;
1279pub type DualModeOFI16 = DualModeOFI<16>;
1281pub type DualModeOFI32 = DualModeOFI<32>;
1283pub type DualModeOFI64 = DualModeOFI<64>;
1285pub type DualModeOFI128 = DualModeOFI<128>;
1287
1288pub use DualModeOFI32 as DefaultDualModeOFI;
1290pub use MarketUpdate32 as DefaultMarketUpdate;
1291
1292#[cfg(test)]
1293mod tests {
1294 use super::*;
1295 use rust_decimal_macros::dec;
1296 use smallvec::smallvec;
1297
1298 #[test]
1299 fn test_dual_mode_ofi() {
1300 let mut ofi = DualModeOFI::<32>::new(5);
1301
1302 let update = MarketUpdate::<32, 16> {
1304 timestamp_ns: 1000,
1305 symbol: "TEST".to_string(),
1306 bid_prices: smallvec![dec!(100.0), dec!(99.9), dec!(99.8)],
1307 bid_quantities: smallvec![dec!(10), dec!(20), dec!(30)],
1308 ask_prices: smallvec![dec!(100.1), dec!(100.2), dec!(100.3)],
1309 ask_quantities: smallvec![dec!(15), dec!(25), dec!(35)],
1310 trades: SmallVec::new(),
1311 };
1312
1313 let result = ofi.calculate_incremental(&update);
1314
1315 match result {
1316 FeatureValue::Scalar(_) => {
1317 }
1320 _ => panic!("Expected scalar value"),
1321 }
1322 }
1323
1324 #[test]
1325 fn test_feature_engine_builder() {
1326 let engine = FeatureEngineBuilder::new(CalculationMode::RealTime)
1327 .with_ofi(10, 32)
1328 .with_vpin(100)
1329 .with_kyle_lambda(20)
1330 .with_book_slope(5)
1331 .build();
1332
1333 assert_eq!(engine.features.len(), 4);
1334 }
1335
1336 #[test]
1337 fn test_feature_engine_builder_multiple_features() {
1338 let engine = FeatureEngineBuilder::new(CalculationMode::RealTime)
1339 .with_ofi(10, 32)
1340 .with_vpin(100)
1341 .with_kyle_lambda(20)
1342 .build();
1343
1344 assert_eq!(engine.features.len(), 3);
1345 }
1346
1347 #[test]
1348 fn test_ofi_capacity_configuration() {
1349 let engine_small = FeatureEngineBuilder::new(CalculationMode::RealTime)
1351 .with_ofi_5(10)
1352 .build();
1353
1354 let engine_medium = FeatureEngineBuilder::new(CalculationMode::RealTime)
1355 .with_ofi_16(10)
1356 .build();
1357
1358 let engine_default = FeatureEngineBuilder::new(CalculationMode::RealTime)
1359 .with_ofi(10, 32)
1360 .build();
1361
1362 let engine_default_explicit = FeatureEngineBuilder::new(CalculationMode::RealTime)
1363 .with_ofi_32(10)
1364 .build();
1365
1366 let engine_large = FeatureEngineBuilder::new(CalculationMode::RealTime)
1367 .with_ofi_128(10)
1368 .build();
1369
1370 let engine_compat = FeatureEngineBuilder::new(CalculationMode::RealTime)
1372 .with_ofi_default(10)
1373 .build();
1374
1375 assert_eq!(engine_small.features.len(), 1);
1377 assert_eq!(engine_medium.features.len(), 1);
1378 assert_eq!(engine_default.features.len(), 1);
1379 assert_eq!(engine_default_explicit.features.len(), 1);
1380 assert_eq!(engine_large.features.len(), 1);
1381 assert_eq!(engine_compat.features.len(), 1);
1382
1383 assert!(engine_small.features.contains_key("OFI"));
1385 assert!(engine_medium.features.contains_key("OFI"));
1386 assert!(engine_default.features.contains_key("OFI"));
1387 assert!(engine_default_explicit.features.contains_key("OFI"));
1388 assert!(engine_large.features.contains_key("OFI"));
1389 assert!(engine_compat.features.contains_key("OFI"));
1390 }
1391
1392 #[test]
1393 fn test_ofi_all_capacity_methods() {
1394 let engine = FeatureEngineBuilder::new(CalculationMode::RealTime)
1396 .with_ofi_5(10)
1397 .build();
1398 assert_eq!(engine.features.len(), 1);
1399
1400 let engine = FeatureEngineBuilder::new(CalculationMode::RealTime)
1401 .with_ofi_8(10)
1402 .build();
1403 assert_eq!(engine.features.len(), 1);
1404
1405 let engine = FeatureEngineBuilder::new(CalculationMode::RealTime)
1406 .with_ofi_16(10)
1407 .build();
1408 assert_eq!(engine.features.len(), 1);
1409
1410 let engine = FeatureEngineBuilder::new(CalculationMode::RealTime)
1411 .with_ofi_32(10)
1412 .build();
1413 assert_eq!(engine.features.len(), 1);
1414
1415 let engine = FeatureEngineBuilder::new(CalculationMode::RealTime)
1416 .with_ofi_64(10)
1417 .build();
1418 assert_eq!(engine.features.len(), 1);
1419
1420 let engine = FeatureEngineBuilder::new(CalculationMode::RealTime)
1421 .with_ofi_128(10)
1422 .build();
1423 assert_eq!(engine.features.len(), 1);
1424 }
1425
1426 #[test]
1427 fn test_ofi_unsupported_capacity_fallback() {
1428 let engine_small_fallback = FeatureEngineBuilder::new(CalculationMode::RealTime)
1430 .with_ofi(10, 3) .build();
1432
1433 let engine_large_fallback = FeatureEngineBuilder::new(CalculationMode::RealTime)
1434 .with_ofi(10, 200) .build();
1436
1437 let engine_mid_fallback = FeatureEngineBuilder::new(CalculationMode::RealTime)
1438 .with_ofi(10, 48) .build();
1440
1441 assert_eq!(engine_small_fallback.features.len(), 1);
1443 assert_eq!(engine_large_fallback.features.len(), 1);
1444 assert_eq!(engine_mid_fallback.features.len(), 1);
1445 assert!(engine_small_fallback.features.contains_key("OFI"));
1446 assert!(engine_large_fallback.features.contains_key("OFI"));
1447 assert!(engine_mid_fallback.features.contains_key("OFI"));
1448 }
1449
1450 #[test]
1451 fn test_boxing_strategy_validation() {
1452 assert!(
1457 calculate_ofi_size::<5>() < 512,
1458 "OFI<5> size calculation appears incorrect"
1459 );
1460 assert!(
1461 calculate_ofi_size::<8>() < 512,
1462 "OFI<8> size calculation appears incorrect"
1463 );
1464 assert!(
1465 calculate_ofi_size::<16>() < 1024,
1466 "OFI<16> size calculation appears incorrect"
1467 );
1468 assert!(
1469 calculate_ofi_size::<32>() > 1024,
1470 "OFI<32> size calculation appears incorrect"
1471 );
1472 assert!(
1473 calculate_ofi_size::<64>() > 2048,
1474 "OFI<64> size calculation appears incorrect"
1475 );
1476 assert!(
1477 calculate_ofi_size::<128>() > 4096,
1478 "OFI<128> size calculation appears incorrect"
1479 );
1480
1481 assert!(calculate_ofi_size::<5>() < BOXING_THRESHOLD_BYTES);
1483 assert!(calculate_ofi_size::<8>() < BOXING_THRESHOLD_BYTES);
1484 assert!(calculate_ofi_size::<16>() < BOXING_THRESHOLD_BYTES);
1485 assert!(calculate_ofi_size::<32>() >= BOXING_THRESHOLD_BYTES);
1486 assert!(calculate_ofi_size::<64>() >= BOXING_THRESHOLD_BYTES);
1487 assert!(calculate_ofi_size::<128>() >= BOXING_THRESHOLD_BYTES);
1488
1489 println!("OFI size calculations:");
1491 println!(
1492 " OFI<5>: {} bytes (threshold: {})",
1493 calculate_ofi_size::<5>(),
1494 BOXING_THRESHOLD_BYTES
1495 );
1496 println!(
1497 " OFI<8>: {} bytes (threshold: {})",
1498 calculate_ofi_size::<8>(),
1499 BOXING_THRESHOLD_BYTES
1500 );
1501 println!(
1502 " OFI<16>: {} bytes (threshold: {})",
1503 calculate_ofi_size::<16>(),
1504 BOXING_THRESHOLD_BYTES
1505 );
1506 println!(
1507 " OFI<32>: {} bytes (threshold: {})",
1508 calculate_ofi_size::<32>(),
1509 BOXING_THRESHOLD_BYTES
1510 );
1511 println!(
1512 " OFI<64>: {} bytes (threshold: {})",
1513 calculate_ofi_size::<64>(),
1514 BOXING_THRESHOLD_BYTES
1515 );
1516 println!(
1517 " OFI<128>: {} bytes (threshold: {})",
1518 calculate_ofi_size::<128>(),
1519 BOXING_THRESHOLD_BYTES
1520 );
1521 }
1522
1523 #[test]
1524 fn test_mode_consistency() {
1525 let window_size = 5;
1527 let mut realtime_ofi = DualModeOFI::<32>::new(window_size);
1528 let batch_ofi = DualModeOFI::<32>::new(window_size);
1529
1530 let market_data = vec![
1532 MarketData {
1533 timestamp_ns: 1000,
1534 symbol: "TEST".to_string(),
1535 bid_prices: vec![dec!(100.0)],
1536 bid_quantities: vec![dec!(10)],
1537 ask_prices: vec![dec!(100.1)],
1538 ask_quantities: vec![dec!(15)],
1539 trades: vec![],
1540 },
1541 MarketData {
1542 timestamp_ns: 2000,
1543 symbol: "TEST".to_string(),
1544 bid_prices: vec![dec!(100.0)],
1545 bid_quantities: vec![dec!(12)],
1546 ask_prices: vec![dec!(100.1)],
1547 ask_quantities: vec![dec!(13)],
1548 trades: vec![],
1549 },
1550 ];
1551
1552 let batch_results = batch_ofi.calculate_batch(&market_data);
1554
1555 let mut incremental_results = Vec::new();
1557 for data in &market_data {
1558 let update = MarketUpdate::<32, 16> {
1559 timestamp_ns: data.timestamp_ns,
1560 symbol: data.symbol.clone(),
1561 bid_prices: data.bid_prices.iter().copied().collect(),
1562 bid_quantities: data.bid_quantities.iter().copied().collect(),
1563 ask_prices: data.ask_prices.iter().copied().collect(),
1564 ask_quantities: data.ask_quantities.iter().copied().collect(),
1565 trades: SmallVec::new(),
1566 };
1567
1568 incremental_results.push(realtime_ofi.calculate_incremental(&update));
1569 }
1570
1571 assert_eq!(batch_results.len(), incremental_results.len());
1573 }
1574}