rusty_feeder/exchange/binance/common/
bar_aggregator.rs

1//! Module for aggregating trade data into candlestick (bar) data
2use std::sync::Arc;
3
4use anyhow::{Result, anyhow};
5use parking_lot::RwLock;
6use quanta::{Clock, Instant};
7use rust_decimal::Decimal;
8use rusty_model::{
9    data::{
10        bar::{Bar, BarAggregation, BarCache, BarType},
11        market_trade::MarketTrade,
12    },
13    enums::OrderSide,
14    instruments::InstrumentId,
15};
16
17/// Bar aggregator for managing state and creating bars from trade data
18#[derive(Debug)]
19pub struct BarAggregator {
20    /// Instrument identifier
21    instrument_id: InstrumentId,
22    /// Bar type
23    bar_type: BarType,
24    /// Bar interval in seconds
25    interval_sec: u64,
26    /// Opening price
27    open: Option<Decimal>,
28    /// Highest price
29    high: Option<Decimal>,
30    /// Lowest price
31    low: Option<Decimal>,
32    /// Closing price
33    close: Option<Decimal>,
34    /// Total volume
35    volume: Decimal,
36    /// Buy volume
37    buy_volume: Decimal,
38    /// Sell volume
39    sell_volume: Decimal,
40    /// Trade count
41    trade_count: u32,
42    /// Last bar time (interval timestamp)
43    last_bar_time: u64,
44    /// Bar cache
45    bar_cache: Arc<RwLock<BarCache>>,
46    /// Clock for time synchronization
47    clock: Clock,
48}
49
50impl BarAggregator {
51    /// Create a new bar aggregator
52    pub fn new(
53        instrument_id: InstrumentId,
54        bar_type: BarType,
55        bar_cache: Arc<RwLock<BarCache>>,
56        clock: Clock,
57    ) -> Result<Self> {
58        // Calculate interval in seconds
59        let interval_sec = match bar_type.get_spec().aggregation {
60            BarAggregation::Second => bar_type.get_spec().step,
61            BarAggregation::Minute => bar_type.get_spec().step * 60,
62            BarAggregation::Hour => bar_type.get_spec().step * 3600,
63            BarAggregation::Day => bar_type.get_spec().step * 86400,
64            _ => return Err(anyhow!("Only time-based bars are supported")),
65        };
66
67        Ok(Self {
68            instrument_id,
69            bar_type,
70            interval_sec,
71            open: None,
72            high: None,
73            low: None,
74            close: None,
75            volume: Decimal::ZERO,
76            buy_volume: Decimal::ZERO,
77            sell_volume: Decimal::ZERO,
78            trade_count: 0,
79            last_bar_time: 0,
80            bar_cache,
81            clock,
82        })
83    }
84
85    /// Process a trade and potentially create a new bar
86    pub fn process_trade(&mut self, trade: &MarketTrade) -> Option<Bar> {
87        let now_nanos = self.clock.raw(); // Assuming clock.raw() gives nanoseconds
88        let current_interval_index = now_nanos / 1_000_000_000 / self.interval_sec;
89
90        // If this is the first trade
91        if self.last_bar_time == 0 && self.trade_count == 0 {
92            // More robust check for first trade
93            self.init_first_trade(trade, current_interval_index);
94            return None;
95        }
96
97        // If we've moved to a new bar interval
98        if current_interval_index > self.last_bar_time {
99            // Finalize current bar and return it
100            let bar = self.create_bar(Some(self.clock.now()));
101
102            // Start a new bar
103            self.start_new_bar(trade);
104            self.last_bar_time = current_interval_index;
105
106            return bar;
107        }
108
109        // Update the current bar (or initialize if it's the first trade in the very first interval)
110        // This case handles if last_bar_time was 0 but it's not the first trade overall (e.g. aggregator reset)
111        // or if current_interval_index is same as last_bar_time
112        if self.open.is_none() {
113            // If bar was reset and this is the first trade for current interval
114            self.init_first_trade(trade, current_interval_index);
115        } else {
116            self.update_bar(trade);
117        }
118        None
119    }
120
121    /// Initialize with the first trade
122    const fn init_first_trade(&mut self, trade: &MarketTrade, current_interval_index: u64) {
123        self.open = Some(trade.price);
124        self.high = Some(trade.price);
125        self.low = Some(trade.price);
126        self.close = Some(trade.price);
127        self.volume = trade.quantity;
128
129        // Update buy/sell volumes
130        match trade.direction {
131            OrderSide::Buy => self.buy_volume = trade.quantity,
132            OrderSide::Sell => self.sell_volume = trade.quantity,
133        }
134
135        self.trade_count = 1;
136        // Set the current interval
137        self.last_bar_time = current_interval_index;
138    }
139
140    /// Start a new bar
141    const fn start_new_bar(&mut self, trade: &MarketTrade) {
142        self.open = Some(trade.price);
143        self.high = Some(trade.price);
144        self.low = Some(trade.price);
145        self.close = Some(trade.price);
146        self.volume = trade.quantity;
147
148        // Reset buy/sell volumes
149        self.buy_volume = Decimal::ZERO;
150        self.sell_volume = Decimal::ZERO;
151
152        // Update buy/sell volumes
153        match trade.direction {
154            OrderSide::Buy => self.buy_volume = trade.quantity,
155            OrderSide::Sell => self.sell_volume = trade.quantity,
156        }
157
158        self.trade_count = 1;
159    }
160
161    /// Update the current bar with a new trade
162    fn update_bar(&mut self, trade: &MarketTrade) {
163        // Update high/low prices
164        self.high = self
165            .high
166            .map(|h| std::cmp::max(h, trade.price))
167            .or(Some(trade.price));
168        self.low = self
169            .low
170            .map(|l| std::cmp::min(l, trade.price))
171            .or(Some(trade.price));
172        self.close = Some(trade.price);
173        self.volume += trade.quantity;
174
175        // Update buy/sell volumes
176        match trade.direction {
177            OrderSide::Buy => self.buy_volume += trade.quantity,
178            OrderSide::Sell => self.sell_volume += trade.quantity,
179        }
180
181        self.trade_count += 1;
182    }
183
184    /// Create a bar from the current state
185    fn create_bar(&self, _close_timestamp: Option<Instant>) -> Option<Bar> {
186        match (self.open, self.high, self.low, self.close) {
187            (Some(o), Some(h), Some(l), Some(c)) => Some(Bar {
188                bar_type: self.bar_type.clone(),
189                open: o,
190                high: h,
191                low: l,
192                close: c,
193                volume: self.volume,
194                timestamp_ns: self.clock.raw(), // Timestamp of bar creation/finalization in nanoseconds
195            }),
196            _ => None, // Should not happen if logic is correct and bar has trades
197        }
198    }
199}