rusty_feeder/exchange/binance/common/
bar_aggregator.rs1use std::sync::Arc;
3
4use anyhow::{Result, anyhow};
5use parking_lot::RwLock;
6use quanta::{Clock, Instant};
7use rust_decimal::Decimal;
8use rusty_model::{
9 data::{
10 bar::{Bar, BarAggregation, BarCache, BarType},
11 market_trade::MarketTrade,
12 },
13 enums::OrderSide,
14 instruments::InstrumentId,
15};
16
17#[derive(Debug)]
19pub struct BarAggregator {
20 instrument_id: InstrumentId,
22 bar_type: BarType,
24 interval_sec: u64,
26 open: Option<Decimal>,
28 high: Option<Decimal>,
30 low: Option<Decimal>,
32 close: Option<Decimal>,
34 volume: Decimal,
36 buy_volume: Decimal,
38 sell_volume: Decimal,
40 trade_count: u32,
42 last_bar_time: u64,
44 bar_cache: Arc<RwLock<BarCache>>,
46 clock: Clock,
48}
49
50impl BarAggregator {
51 pub fn new(
53 instrument_id: InstrumentId,
54 bar_type: BarType,
55 bar_cache: Arc<RwLock<BarCache>>,
56 clock: Clock,
57 ) -> Result<Self> {
58 let interval_sec = match bar_type.get_spec().aggregation {
60 BarAggregation::Second => bar_type.get_spec().step,
61 BarAggregation::Minute => bar_type.get_spec().step * 60,
62 BarAggregation::Hour => bar_type.get_spec().step * 3600,
63 BarAggregation::Day => bar_type.get_spec().step * 86400,
64 _ => return Err(anyhow!("Only time-based bars are supported")),
65 };
66
67 Ok(Self {
68 instrument_id,
69 bar_type,
70 interval_sec,
71 open: None,
72 high: None,
73 low: None,
74 close: None,
75 volume: Decimal::ZERO,
76 buy_volume: Decimal::ZERO,
77 sell_volume: Decimal::ZERO,
78 trade_count: 0,
79 last_bar_time: 0,
80 bar_cache,
81 clock,
82 })
83 }
84
85 pub fn process_trade(&mut self, trade: &MarketTrade) -> Option<Bar> {
87 let now_nanos = self.clock.raw(); let current_interval_index = now_nanos / 1_000_000_000 / self.interval_sec;
89
90 if self.last_bar_time == 0 && self.trade_count == 0 {
92 self.init_first_trade(trade, current_interval_index);
94 return None;
95 }
96
97 if current_interval_index > self.last_bar_time {
99 let bar = self.create_bar(Some(self.clock.now()));
101
102 self.start_new_bar(trade);
104 self.last_bar_time = current_interval_index;
105
106 return bar;
107 }
108
109 if self.open.is_none() {
113 self.init_first_trade(trade, current_interval_index);
115 } else {
116 self.update_bar(trade);
117 }
118 None
119 }
120
121 const fn init_first_trade(&mut self, trade: &MarketTrade, current_interval_index: u64) {
123 self.open = Some(trade.price);
124 self.high = Some(trade.price);
125 self.low = Some(trade.price);
126 self.close = Some(trade.price);
127 self.volume = trade.quantity;
128
129 match trade.direction {
131 OrderSide::Buy => self.buy_volume = trade.quantity,
132 OrderSide::Sell => self.sell_volume = trade.quantity,
133 }
134
135 self.trade_count = 1;
136 self.last_bar_time = current_interval_index;
138 }
139
140 const fn start_new_bar(&mut self, trade: &MarketTrade) {
142 self.open = Some(trade.price);
143 self.high = Some(trade.price);
144 self.low = Some(trade.price);
145 self.close = Some(trade.price);
146 self.volume = trade.quantity;
147
148 self.buy_volume = Decimal::ZERO;
150 self.sell_volume = Decimal::ZERO;
151
152 match trade.direction {
154 OrderSide::Buy => self.buy_volume = trade.quantity,
155 OrderSide::Sell => self.sell_volume = trade.quantity,
156 }
157
158 self.trade_count = 1;
159 }
160
161 fn update_bar(&mut self, trade: &MarketTrade) {
163 self.high = self
165 .high
166 .map(|h| std::cmp::max(h, trade.price))
167 .or(Some(trade.price));
168 self.low = self
169 .low
170 .map(|l| std::cmp::min(l, trade.price))
171 .or(Some(trade.price));
172 self.close = Some(trade.price);
173 self.volume += trade.quantity;
174
175 match trade.direction {
177 OrderSide::Buy => self.buy_volume += trade.quantity,
178 OrderSide::Sell => self.sell_volume += trade.quantity,
179 }
180
181 self.trade_count += 1;
182 }
183
184 fn create_bar(&self, _close_timestamp: Option<Instant>) -> Option<Bar> {
186 match (self.open, self.high, self.low, self.close) {
187 (Some(o), Some(h), Some(l), Some(c)) => Some(Bar {
188 bar_type: self.bar_type.clone(),
189 open: o,
190 high: h,
191 low: l,
192 close: c,
193 volume: self.volume,
194 timestamp_ns: self.clock.raw(), }),
196 _ => None, }
198 }
199}