rusty_backtest/features/
order_flow.rs

1//! Order Flow Analysis - High-performance implementation of OFI and VPIN
2//!
3//! Based on patterns from Binance Tardis reference implementation with
4//! Rust-specific optimizations for HFT performance.
5
6use super::{OrderBookSnapshot, TradeSide, TradeTick, decimal_to_f64_or_nan};
7use rust_decimal::prelude::ToPrimitive;
8
9/// Calculate basic Order Flow Imbalance (OFI)
10///
11/// OFI measures the net order flow at the best bid/ask level between two snapshots.
12/// Positive values indicate buying pressure, negative values indicate selling pressure.
13#[inline(always)]
14pub fn calculate_ofi(prev_book: &OrderBookSnapshot, curr_book: &OrderBookSnapshot) -> f64 {
15    let mut ofi = 0.0;
16
17    // Best bid/ask analysis
18    if let (Some(prev_bid), Some(curr_bid)) = (prev_book.bids.first(), curr_book.bids.first())
19        && prev_bid.price == curr_bid.price
20    {
21        ofi += (curr_bid.quantity - prev_bid.quantity)
22            .to_f64()
23            .unwrap_or(0.0);
24    }
25
26    if let (Some(prev_ask), Some(curr_ask)) = (prev_book.asks.first(), curr_book.asks.first())
27        && prev_ask.price == curr_ask.price
28    {
29        ofi -= (curr_ask.quantity - prev_ask.quantity)
30            .to_f64()
31            .unwrap_or(0.0);
32    }
33
34    ofi
35}
36
37/// Calculate multi-level weighted OFI
38///
39/// Extends basic OFI to multiple levels with distance-based weighting.
40/// Levels closer to mid-price receive higher weights.
41#[inline(always)]
42pub fn calculate_weighted_ofi(
43    prev_book: &OrderBookSnapshot,
44    curr_book: &OrderBookSnapshot,
45    max_levels: usize,
46) -> f64 {
47    let mut weighted_ofi = 0.0;
48    let levels = max_levels
49        .min(prev_book.bids.len())
50        .min(curr_book.bids.len());
51
52    for i in 0..levels {
53        let weight = 1.0 / (i as f64 + 1.0); // Distance-based weighting
54
55        // Bid side
56        if let (Some(prev_bid), Some(curr_bid)) = (prev_book.bids.get(i), curr_book.bids.get(i))
57            && prev_bid.price == curr_bid.price
58        {
59            weighted_ofi += weight
60                * (curr_bid.quantity - prev_bid.quantity)
61                    .to_f64()
62                    .unwrap_or(0.0);
63        }
64
65        // Ask side
66        if let (Some(prev_ask), Some(curr_ask)) = (prev_book.asks.get(i), curr_book.asks.get(i))
67            && prev_ask.price == curr_ask.price
68        {
69            weighted_ofi -= weight
70                * (curr_ask.quantity - prev_ask.quantity)
71                    .to_f64()
72                    .unwrap_or(0.0);
73        }
74    }
75
76    weighted_ofi
77}
78
79/// Calculate Volume-Synchronized Probability of Informed Trading (VPIN)
80///
81/// VPIN measures the imbalance of trade volume between buy and sell orders
82/// over fixed volume buckets, providing early warning of informed trading.
83pub fn calculate_vpin(trades: &[TradeTick], bucket_size: usize) -> f64 {
84    if trades.len() < bucket_size {
85        return 0.0;
86    }
87
88    let recent_trades = &trades[trades.len() - bucket_size..];
89
90    let mut buy_volume = 0.0;
91    let mut sell_volume = 0.0;
92
93    for trade in recent_trades {
94        match trade.side {
95            TradeSide::Buy => buy_volume += decimal_to_f64_or_nan(trade.quantity),
96            TradeSide::Sell => sell_volume += decimal_to_f64_or_nan(trade.quantity),
97        }
98    }
99
100    let total_volume = buy_volume + sell_volume;
101    if total_volume > 0.0 {
102        (buy_volume - sell_volume).abs() / total_volume
103    } else {
104        0.0
105    }
106}
107
108/// High-performance VPIN calculation with rolling window
109///
110/// Optimized version using circular buffer for continuous VPIN computation
111/// without reallocating memory.
112pub struct VPINCalculator {
113    bucket_size: usize,
114    buy_volumes: Vec<f64>,
115    sell_volumes: Vec<f64>,
116    current_index: usize,
117    is_full: bool,
118}
119
120impl VPINCalculator {
121    /// Create new VPIN calculator with specified bucket size
122    #[must_use]
123    pub fn new(bucket_size: usize) -> Self {
124        Self {
125            bucket_size,
126            buy_volumes: vec![0.0; bucket_size],
127            sell_volumes: vec![0.0; bucket_size],
128            current_index: 0,
129            is_full: false,
130        }
131    }
132
133    /// Add trade and get updated VPIN value
134    #[inline(always)]
135    pub fn add_trade(&mut self, trade: &TradeTick) -> Option<f64> {
136        // Reset current bucket
137        self.buy_volumes[self.current_index] = 0.0;
138        self.sell_volumes[self.current_index] = 0.0;
139
140        // Add trade to current bucket
141        match trade.side {
142            TradeSide::Buy => {
143                self.buy_volumes[self.current_index] += decimal_to_f64_or_nan(trade.quantity)
144            }
145            TradeSide::Sell => {
146                self.sell_volumes[self.current_index] += decimal_to_f64_or_nan(trade.quantity)
147            }
148        }
149
150        // Move to next bucket
151        self.current_index = (self.current_index + 1) % self.bucket_size;
152        if self.current_index == 0 {
153            self.is_full = true;
154        }
155
156        // Calculate VPIN if we have enough data
157        if self.is_full {
158            Some(self.calculate_current_vpin())
159        } else {
160            None
161        }
162    }
163    /// Alias for calculate_current_vpin() to maintain backward compatibility
164    pub fn calculate(&self) -> f64 {
165        self.calculate_current_vpin()
166    }
167
168    /// Calculate VPIN for current window
169    #[inline(always)]
170    fn calculate_current_vpin(&self) -> f64 {
171        let total_buy: f64 = self.buy_volumes.iter().sum();
172        let total_sell: f64 = self.sell_volumes.iter().sum();
173        let total_volume = total_buy + total_sell;
174
175        if total_volume > 0.0 {
176            (total_buy - total_sell).abs() / total_volume
177        } else {
178            0.0
179        }
180    }
181}
182
183/// Order Flow Analyzer for comprehensive order flow metrics
184pub struct OrderFlowAnalyzer {
185    vpin_calculator: VPINCalculator,
186    ofi_window: Vec<f64>,
187    window_size: usize,
188}
189
190impl OrderFlowAnalyzer {
191    /// Create new order flow analyzer
192    #[must_use]
193    pub fn new(window_size: usize, vpin_bucket_size: usize) -> Self {
194        Self {
195            vpin_calculator: VPINCalculator::new(vpin_bucket_size),
196            ofi_window: Vec::with_capacity(window_size * 2),
197            window_size,
198        }
199    }
200
201    /// Process trade tick and update metrics
202    pub fn process_trade(&mut self, trade: &TradeTick) -> Option<f64> {
203        self.vpin_calculator.add_trade(trade)
204    }
205
206    /// Process order book update and calculate OFI
207    pub fn process_orderbook_update(
208        &mut self,
209        prev_book: &OrderBookSnapshot,
210        curr_book: &OrderBookSnapshot,
211    ) -> f64 {
212        let ofi = calculate_ofi(prev_book, curr_book);
213
214        // Add to rolling window
215        self.ofi_window.push(ofi);
216        if self.ofi_window.len() > self.window_size * 2 {
217            let drain_count = self.window_size;
218            self.ofi_window.drain(0..drain_count);
219        }
220
221        ofi
222    }
223
224    /// Get average OFI over window
225    pub fn get_average_ofi(&self) -> f64 {
226        if self.ofi_window.is_empty() {
227            return 0.0;
228        }
229
230        let recent_window =
231            &self.ofi_window[self.ofi_window.len().saturating_sub(self.window_size)..];
232        recent_window.iter().sum::<f64>() / recent_window.len() as f64
233    }
234
235    /// Get OFI standard deviation
236    pub fn get_ofi_std(&self) -> f64 {
237        if self.ofi_window.len() < 2 {
238            return 0.0;
239        }
240
241        let recent_window =
242            &self.ofi_window[self.ofi_window.len().saturating_sub(self.window_size)..];
243        let mean = self.get_average_ofi();
244
245        let variance = recent_window
246            .iter()
247            .map(|&x| (x - mean).powi(2))
248            .sum::<f64>()
249            / recent_window.len() as f64;
250
251        variance.sqrt()
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258    use crate::features::Level;
259    use rust_decimal::{Decimal, prelude::FromPrimitive};
260    use rust_decimal_macros::dec;
261
262    fn create_test_book(
263        bid_price: Decimal,
264        bid_qty: f64,
265        ask_price: Decimal,
266        ask_qty: f64,
267    ) -> OrderBookSnapshot {
268        OrderBookSnapshot {
269            timestamp_ns: 1000000000,
270            symbol: "BTC-USD".into(),
271            bids: smallvec::smallvec![Level {
272                price: bid_price,
273                quantity: Decimal::from_f64(bid_qty).unwrap_or(Decimal::ZERO),
274                order_count: 1
275            }],
276            asks: smallvec::smallvec![Level {
277                price: ask_price,
278                quantity: Decimal::from_f64(ask_qty).unwrap_or(Decimal::ZERO),
279                order_count: 1
280            }],
281        }
282    }
283
284    #[test]
285    fn test_basic_ofi_positive() {
286        let prev_book = create_test_book(dec!(49999), 10.0, dec!(50001), 10.0);
287        let curr_book = create_test_book(dec!(49999), 15.0, dec!(50001), 10.0); // Bid quantity increased
288
289        let ofi = calculate_ofi(&prev_book, &curr_book);
290        assert_eq!(ofi, 5.0); // Positive OFI indicates buying pressure
291    }
292
293    #[test]
294    fn test_basic_ofi_negative() {
295        let prev_book = create_test_book(dec!(49999), 10.0, dec!(50001), 10.0);
296        let curr_book = create_test_book(dec!(49999), 10.0, dec!(50001), 15.0); // Ask quantity increased
297
298        let ofi = calculate_ofi(&prev_book, &curr_book);
299        assert_eq!(ofi, -5.0); // Negative OFI indicates selling pressure
300    }
301
302    #[test]
303    fn test_vpin_balanced() {
304        let trades = vec![
305            TradeTick {
306                timestamp_ns: 1000000000,
307                symbol: "BTC-USD".into(),
308                side: TradeSide::Buy,
309                price: dec!(50000),
310                quantity: dec!(1.0),
311            },
312            TradeTick {
313                timestamp_ns: 1000000001,
314                symbol: "BTC-USD".into(),
315                side: TradeSide::Sell,
316                price: dec!(50000),
317                quantity: dec!(1.0),
318            },
319        ];
320
321        let vpin = calculate_vpin(&trades, 2);
322        assert_eq!(vpin, 0.0); // Balanced trades should give VPIN of 0
323    }
324
325    #[test]
326    fn test_vpin_imbalanced() {
327        let trades = vec![
328            TradeTick {
329                timestamp_ns: 1000000000,
330                symbol: "BTC-USD".into(),
331                side: TradeSide::Buy,
332                price: dec!(50000),
333                quantity: dec!(3.0),
334            },
335            TradeTick {
336                timestamp_ns: 1000000001,
337                symbol: "BTC-USD".into(),
338                side: TradeSide::Sell,
339                price: dec!(50000),
340                quantity: dec!(1.0),
341            },
342        ];
343
344        let vpin = calculate_vpin(&trades, 2);
345        assert_eq!(vpin, 0.5); // 2.0 net buy volume out of 4.0 total = 0.5
346    }
347}