1use crate::features::{Level, OrderBookSnapshot};
7use rust_decimal::Decimal;
8use rusty_common::collections::FxHashMap;
9use smallvec::SmallVec;
10use std::io::{BufRead, BufReader, Read};
11
12#[derive(Debug, Clone)]
17pub struct TardisL2Event {
18 pub symbol: String,
20 pub timestamp_us: u64,
22 pub bids: Vec<(Decimal, Decimal)>,
24 pub asks: Vec<(Decimal, Decimal)>,
26 pub local_timestamp_us: Option<u64>,
28}
29
30impl TardisL2Event {
31 pub fn to_snapshot(self) -> OrderBookSnapshot {
33 let timestamp_ns = self.timestamp_us * 1000;
35
36 let mut bids = SmallVec::with_capacity(self.bids.len());
38 for (price, quantity) in self.bids {
39 bids.push(Level {
40 price,
41 quantity,
42 order_count: 0, });
44 }
45
46 let mut asks = SmallVec::with_capacity(self.asks.len());
48 for (price, quantity) in self.asks {
49 asks.push(Level {
50 price,
51 quantity,
52 order_count: 0, });
54 }
55
56 OrderBookSnapshot {
57 timestamp_ns,
58 symbol: self.symbol,
59 bids,
60 asks,
61 }
62 }
63
64 #[cfg(target_arch = "x86_64")]
66 pub fn batch_to_snapshots(events: Vec<Self>) -> Vec<OrderBookSnapshot> {
67 if is_x86_feature_detected!("avx2") {
69 unsafe { Self::batch_to_snapshots_avx2(events) }
70 } else {
71 events.into_iter().map(|e| e.to_snapshot()).collect()
73 }
74 }
75
76 #[cfg(target_arch = "x86_64")]
78 unsafe fn batch_to_snapshots_avx2(events: Vec<Self>) -> Vec<OrderBookSnapshot> {
79 let mut snapshots = Vec::with_capacity(events.len());
80
81 let chunks = events.chunks(4);
83
84 for chunk in chunks {
85 let mut timestamps = [0u64; 4];
87 for (i, event) in chunk.iter().enumerate() {
88 timestamps[i] = event.timestamp_us * 1000;
89 }
90
91 for (i, event) in chunk.iter().enumerate() {
93 let mut bids = SmallVec::with_capacity(event.bids.len());
94 let mut asks = SmallVec::with_capacity(event.asks.len());
95
96 for (price, quantity) in &event.bids {
98 bids.push(Level {
99 price: *price,
100 quantity: *quantity,
101 order_count: 0,
102 });
103 }
104
105 for (price, quantity) in &event.asks {
107 asks.push(Level {
108 price: *price,
109 quantity: *quantity,
110 order_count: 0,
111 });
112 }
113
114 snapshots.push(OrderBookSnapshot {
115 timestamp_ns: timestamps[i],
116 symbol: event.symbol.clone(),
117 bids,
118 asks,
119 });
120 }
121 }
122
123 snapshots
124 }
125
126 pub fn from_csv_lines(lines: &[&str]) -> Result<Self, ParseError> {
131 if lines.is_empty() {
132 return Err(ParseError::EmptyInput);
133 }
134
135 let mut symbol = String::new();
136 let mut timestamp_us = 0u64;
137 let mut local_timestamp_us = None;
138 let mut bids = Vec::with_capacity(25);
139 let mut asks = Vec::with_capacity(25);
140
141 for line in lines {
142 let parts: Vec<&str> = line.split(',').collect();
143 if parts.len() < 5 {
144 return Err(ParseError::InvalidFormat((*line).to_string()));
145 }
146
147 let line_timestamp = parts[0]
149 .parse::<u64>()
150 .map_err(|_| ParseError::InvalidTimestamp(parts[0].to_string()))?;
151 let line_symbol = parts[1];
152 let side = parts[2];
153 let price = parts[3]
154 .parse::<Decimal>()
155 .map_err(|_| ParseError::InvalidPrice(parts[3].to_string()))?;
156 let quantity = parts[4]
157 .parse::<Decimal>()
158 .map_err(|_| ParseError::InvalidQuantity(parts[4].to_string()))?;
159
160 if parts.len() > 5 && !parts[5].is_empty() {
162 local_timestamp_us = Some(
163 parts[5]
164 .parse::<u64>()
165 .map_err(|_| ParseError::InvalidTimestamp(parts[5].to_string()))?,
166 );
167 }
168
169 if symbol.is_empty() {
171 symbol = line_symbol.to_string();
172 timestamp_us = line_timestamp;
173 } else {
174 if line_symbol != symbol || line_timestamp != timestamp_us {
176 return Err(ParseError::InconsistentData);
177 }
178 }
179
180 match side {
182 "bid" => bids.push((price, quantity)),
183 "ask" => asks.push((price, quantity)),
184 _ => return Err(ParseError::InvalidSide(side.to_string())),
185 }
186 }
187
188 bids.sort_by(|a, b| b.0.cmp(&a.0)); asks.sort_by(|a, b| a.0.cmp(&b.0)); bids.truncate(25);
194 asks.truncate(25);
195
196 Ok(TardisL2Event {
197 symbol,
198 timestamp_us,
199 bids,
200 asks,
201 local_timestamp_us,
202 })
203 }
204}
205
206#[derive(Debug, Clone)]
208pub enum ParseError {
209 EmptyInput,
211 InvalidFormat(String),
213 InvalidTimestamp(String),
215 InvalidPrice(String),
217 InvalidQuantity(String),
219 InvalidSide(String),
221 InconsistentData,
223 IoError(String),
225}
226
227impl std::fmt::Display for ParseError {
228 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 match self {
230 ParseError::EmptyInput => write!(f, "Empty input"),
231 ParseError::InvalidFormat(line) => write!(f, "Invalid format: {line}"),
232 ParseError::InvalidTimestamp(ts) => write!(f, "Invalid timestamp: {ts}"),
233 ParseError::InvalidPrice(p) => write!(f, "Invalid price: {p}"),
234 ParseError::InvalidQuantity(q) => write!(f, "Invalid quantity: {q}"),
235 ParseError::InvalidSide(s) => write!(f, "Invalid side: {s}"),
236 ParseError::InconsistentData => write!(f, "Inconsistent data in group"),
237 ParseError::IoError(e) => write!(f, "IO error: {e}"),
238 }
239 }
240}
241
242impl std::error::Error for ParseError {}
243
244pub struct BinanceTardisAdapter {
246 symbol_map: FxHashMap<String, String>,
248 line_buffer: Vec<String>,
250 current_timestamp: Option<u64>,
252}
253
254impl Default for BinanceTardisAdapter {
255 fn default() -> Self {
256 Self::new()
257 }
258}
259
260impl BinanceTardisAdapter {
261 #[must_use]
263 pub fn new() -> Self {
264 let mut symbol_map = FxHashMap::default();
265 symbol_map.insert("BTCUSDT".to_string(), "BTC-USDT".to_string());
267 symbol_map.insert("ETHUSDT".to_string(), "ETH-USDT".to_string());
268
269 Self {
270 symbol_map,
271 line_buffer: Vec::with_capacity(50), current_timestamp: None,
273 }
274 }
275
276 pub fn add_symbol_mapping(&mut self, tardis_symbol: String, internal_symbol: String) {
278 self.symbol_map.insert(tardis_symbol, internal_symbol);
279 }
280
281 pub fn process_line(&mut self, line: &str) -> Result<Option<OrderBookSnapshot>, ParseError> {
283 let timestamp = line
285 .split(',')
286 .next()
287 .ok_or_else(|| ParseError::InvalidFormat(line.to_string()))?
288 .parse::<u64>()
289 .map_err(|_| ParseError::InvalidTimestamp(line.to_string()))?;
290
291 if let Some(current_ts) = self.current_timestamp {
293 if timestamp != current_ts {
294 let lines: Vec<&str> = self.line_buffer.iter().map(|s| s.as_str()).collect();
296 let event = TardisL2Event::from_csv_lines(&lines)?;
297
298 self.line_buffer.clear();
300 self.line_buffer.push(line.to_string());
301 self.current_timestamp = Some(timestamp);
302
303 let mut snapshot = event.to_snapshot();
305 if let Some(mapped) = self.symbol_map.get(&snapshot.symbol) {
306 snapshot.symbol = mapped.clone();
307 }
308
309 return Ok(Some(snapshot));
310 }
311 } else {
312 self.current_timestamp = Some(timestamp);
313 }
314
315 self.line_buffer.push(line.to_string());
317 Ok(None)
318 }
319
320 pub fn flush(&mut self) -> Result<Option<OrderBookSnapshot>, ParseError> {
322 if self.line_buffer.is_empty() {
323 return Ok(None);
324 }
325
326 let lines: Vec<&str> = self.line_buffer.iter().map(|s| s.as_str()).collect();
327 let event = TardisL2Event::from_csv_lines(&lines)?;
328
329 self.line_buffer.clear();
330 self.current_timestamp = None;
331
332 let mut snapshot = event.to_snapshot();
334 if let Some(mapped) = self.symbol_map.get(&snapshot.symbol) {
335 snapshot.symbol = mapped.clone();
336 }
337
338 Ok(Some(snapshot))
339 }
340}
341
342pub struct TardisL2Iterator<R: Read> {
344 reader: BufReader<R>,
345 adapter: BinanceTardisAdapter,
346 finished: bool,
347}
348
349impl<R: Read> TardisL2Iterator<R> {
350 #[must_use]
352 pub fn new(reader: R) -> Self {
353 Self {
354 reader: BufReader::new(reader),
355 adapter: BinanceTardisAdapter::new(),
356 finished: false,
357 }
358 }
359
360 #[must_use]
362 pub fn with_symbol_mapping(mut self, tardis_symbol: String, internal_symbol: String) -> Self {
363 self.adapter
364 .add_symbol_mapping(tardis_symbol, internal_symbol);
365 self
366 }
367}
368
369impl<R: Read> Iterator for TardisL2Iterator<R> {
370 type Item = Result<OrderBookSnapshot, ParseError>;
371
372 fn next(&mut self) -> Option<Self::Item> {
373 if self.finished {
374 return None;
375 }
376
377 let mut line = String::new();
378 loop {
379 line.clear();
380 match self.reader.read_line(&mut line) {
381 Ok(0) => {
382 self.finished = true;
384 return self.adapter.flush().transpose();
386 }
387 Ok(_) => {
388 let line = line.trim();
389 if line.is_empty() || line.starts_with('#') {
390 continue; }
392
393 match self.adapter.process_line(line) {
394 Ok(Some(snapshot)) => return Some(Ok(snapshot)),
395 Ok(None) => continue, Err(e) => return Some(Err(e)),
397 }
398 }
399 Err(e) => {
400 self.finished = true;
401 return Some(Err(ParseError::IoError(e.to_string())));
402 }
403 }
404 }
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411 use rust_decimal_macros::dec;
412
413 #[test]
414 fn test_tardis_l2_event_parsing() {
415 let lines = vec![
416 "1640995200000000,BTCUSDT,bid,50000.00,1.5",
417 "1640995200000000,BTCUSDT,bid,49999.00,2.0",
418 "1640995200000000,BTCUSDT,ask,50001.00,1.2",
419 "1640995200000000,BTCUSDT,ask,50002.00,1.8",
420 ];
421
422 let event = TardisL2Event::from_csv_lines(&lines).unwrap();
423
424 assert_eq!(event.symbol, "BTCUSDT");
425 assert_eq!(event.timestamp_us, 1640995200000000);
426 assert_eq!(event.bids.len(), 2);
427 assert_eq!(event.asks.len(), 2);
428
429 assert_eq!(event.bids[0].0, dec!(50000.00));
431 assert_eq!(event.bids[1].0, dec!(49999.00));
432
433 assert_eq!(event.asks[0].0, dec!(50001.00));
435 assert_eq!(event.asks[1].0, dec!(50002.00));
436 }
437
438 #[test]
439 fn test_to_snapshot_conversion() {
440 let event = TardisL2Event {
441 symbol: "BTC-USDT".to_string(),
442 timestamp_us: 1640995200000000,
443 bids: vec![(dec!(50000), dec!(1.5)), (dec!(49999), dec!(2.0))],
444 asks: vec![(dec!(50001), dec!(1.2)), (dec!(50002), dec!(1.8))],
445 local_timestamp_us: None,
446 };
447
448 let snapshot = event.to_snapshot();
449
450 assert_eq!(snapshot.symbol, "BTC-USDT");
451 assert_eq!(snapshot.timestamp_ns, 1640995200000000000); assert_eq!(snapshot.bids.len(), 2);
453 assert_eq!(snapshot.asks.len(), 2);
454
455 assert_eq!(snapshot.bids[0].price, dec!(50000));
456 assert_eq!(snapshot.bids[0].quantity, dec!(1.5));
457 }
458
459 #[test]
460 fn test_adapter_streaming() {
461 let csv_data = r#"1640995200000000,BTCUSDT,bid,50000.00,1.5
4621640995200000000,BTCUSDT,bid,49999.00,2.0
4631640995200000000,BTCUSDT,ask,50001.00,1.2
4641640995200000000,BTCUSDT,ask,50002.00,1.8
4651640995201000000,BTCUSDT,bid,50000.50,1.6
4661640995201000000,BTCUSDT,ask,50001.50,1.3"#;
467
468 let mut adapter = BinanceTardisAdapter::new();
469 let lines: Vec<&str> = csv_data.lines().collect();
470
471 let mut snapshots = Vec::new();
473 for line in &lines[0..4] {
474 if let Some(snapshot) = adapter.process_line(line).unwrap() {
475 snapshots.push(snapshot);
476 }
477 }
478 assert_eq!(snapshots.len(), 0); if let Some(snapshot) = adapter.process_line(lines[4]).unwrap() {
482 snapshots.push(snapshot);
483 }
484 assert_eq!(snapshots.len(), 1);
485 assert_eq!(snapshots[0].timestamp_ns, 1640995200000000000);
486
487 for line in &lines[5..] {
489 if let Some(snapshot) = adapter.process_line(line).unwrap() {
490 snapshots.push(snapshot);
491 }
492 }
493
494 if let Some(snapshot) = adapter.flush().unwrap() {
495 snapshots.push(snapshot);
496 }
497
498 assert_eq!(snapshots.len(), 2);
499 assert_eq!(snapshots[1].timestamp_ns, 1640995201000000000);
500 }
501}