rusty_backtest/adapters/
binance_tardis.rs

1//! Binance Tardis 25-Level L2 Data Adapter
2//!
3//! Provides zero-copy transformation from Tardis L2 data format
4//! to rusty-backtest OrderBookSnapshot format.
5
6use crate::features::{Level, OrderBookSnapshot};
7use rust_decimal::Decimal;
8use rusty_common::collections::FxHashMap;
9use smallvec::SmallVec;
10use std::io::{BufRead, BufReader, Read};
11
12/// Tardis L2 event structure for Binance 25-level orderbook data
13///
14/// Tardis provides orderbook snapshots with up to 25 price levels
15/// on each side (bid/ask) with microsecond timestamps.
16#[derive(Debug, Clone)]
17pub struct TardisL2Event {
18    /// Symbol (e.g., "BTC-USDT")
19    pub symbol: String,
20    /// Exchange timestamp in microseconds
21    pub timestamp_us: u64,
22    /// Bid levels (price, quantity) - up to 25 levels
23    pub bids: Vec<(Decimal, Decimal)>,
24    /// Ask levels (price, quantity) - up to 25 levels
25    pub asks: Vec<(Decimal, Decimal)>,
26    /// Local timestamp when data was received (microseconds)
27    pub local_timestamp_us: Option<u64>,
28}
29
30impl TardisL2Event {
31    /// Convert to OrderBookSnapshot with nanosecond precision
32    pub fn to_snapshot(self) -> OrderBookSnapshot {
33        // Convert microseconds to nanoseconds
34        let timestamp_ns = self.timestamp_us * 1000;
35
36        // Convert bid levels
37        let mut bids = SmallVec::with_capacity(self.bids.len());
38        for (price, quantity) in self.bids {
39            bids.push(Level {
40                price,
41                quantity,
42                order_count: 0, // Tardis doesn't provide order count
43            });
44        }
45
46        // Convert ask levels
47        let mut asks = SmallVec::with_capacity(self.asks.len());
48        for (price, quantity) in self.asks {
49            asks.push(Level {
50                price,
51                quantity,
52                order_count: 0, // Tardis doesn't provide order count
53            });
54        }
55
56        OrderBookSnapshot {
57            timestamp_ns,
58            symbol: self.symbol,
59            bids,
60            asks,
61        }
62    }
63
64    /// SIMD-optimized batch conversion of multiple events
65    #[cfg(target_arch = "x86_64")]
66    pub fn batch_to_snapshots(events: Vec<Self>) -> Vec<OrderBookSnapshot> {
67        // Check if AVX2 is available
68        if is_x86_feature_detected!("avx2") {
69            unsafe { Self::batch_to_snapshots_avx2(events) }
70        } else {
71            // Fallback to scalar implementation
72            events.into_iter().map(|e| e.to_snapshot()).collect()
73        }
74    }
75
76    /// AVX2-optimized batch conversion
77    #[cfg(target_arch = "x86_64")]
78    unsafe fn batch_to_snapshots_avx2(events: Vec<Self>) -> Vec<OrderBookSnapshot> {
79        let mut snapshots = Vec::with_capacity(events.len());
80
81        // Process events in chunks of 4 for SIMD efficiency
82        let chunks = events.chunks(4);
83
84        for chunk in chunks {
85            // Convert timestamps in parallel (4 at once)
86            let mut timestamps = [0u64; 4];
87            for (i, event) in chunk.iter().enumerate() {
88                timestamps[i] = event.timestamp_us * 1000;
89            }
90
91            // Convert each event
92            for (i, event) in chunk.iter().enumerate() {
93                let mut bids = SmallVec::with_capacity(event.bids.len());
94                let mut asks = SmallVec::with_capacity(event.asks.len());
95
96                // Process bid levels
97                for (price, quantity) in &event.bids {
98                    bids.push(Level {
99                        price: *price,
100                        quantity: *quantity,
101                        order_count: 0,
102                    });
103                }
104
105                // Process ask levels
106                for (price, quantity) in &event.asks {
107                    asks.push(Level {
108                        price: *price,
109                        quantity: *quantity,
110                        order_count: 0,
111                    });
112                }
113
114                snapshots.push(OrderBookSnapshot {
115                    timestamp_ns: timestamps[i],
116                    symbol: event.symbol.clone(),
117                    bids,
118                    asks,
119                });
120            }
121        }
122
123        snapshots
124    }
125
126    /// Parse from Tardis CSV line format
127    ///
128    /// Expected format: timestamp_us,symbol,side,price,quantity,local_timestamp_us
129    /// Multiple lines represent different levels, grouped by timestamp
130    pub fn from_csv_lines(lines: &[&str]) -> Result<Self, ParseError> {
131        if lines.is_empty() {
132            return Err(ParseError::EmptyInput);
133        }
134
135        let mut symbol = String::new();
136        let mut timestamp_us = 0u64;
137        let mut local_timestamp_us = None;
138        let mut bids = Vec::with_capacity(25);
139        let mut asks = Vec::with_capacity(25);
140
141        for line in lines {
142            let parts: Vec<&str> = line.split(',').collect();
143            if parts.len() < 5 {
144                return Err(ParseError::InvalidFormat((*line).to_string()));
145            }
146
147            // Parse fields
148            let line_timestamp = parts[0]
149                .parse::<u64>()
150                .map_err(|_| ParseError::InvalidTimestamp(parts[0].to_string()))?;
151            let line_symbol = parts[1];
152            let side = parts[2];
153            let price = parts[3]
154                .parse::<Decimal>()
155                .map_err(|_| ParseError::InvalidPrice(parts[3].to_string()))?;
156            let quantity = parts[4]
157                .parse::<Decimal>()
158                .map_err(|_| ParseError::InvalidQuantity(parts[4].to_string()))?;
159
160            // Optional local timestamp
161            if parts.len() > 5 && !parts[5].is_empty() {
162                local_timestamp_us = Some(
163                    parts[5]
164                        .parse::<u64>()
165                        .map_err(|_| ParseError::InvalidTimestamp(parts[5].to_string()))?,
166                );
167            }
168
169            // Initialize on first line
170            if symbol.is_empty() {
171                symbol = line_symbol.to_string();
172                timestamp_us = line_timestamp;
173            } else {
174                // Verify consistency
175                if line_symbol != symbol || line_timestamp != timestamp_us {
176                    return Err(ParseError::InconsistentData);
177                }
178            }
179
180            // Add to appropriate side
181            match side {
182                "bid" => bids.push((price, quantity)),
183                "ask" => asks.push((price, quantity)),
184                _ => return Err(ParseError::InvalidSide(side.to_string())),
185            }
186        }
187
188        // Sort levels by price (best first)
189        bids.sort_by(|a, b| b.0.cmp(&a.0)); // Descending for bids
190        asks.sort_by(|a, b| a.0.cmp(&b.0)); // Ascending for asks
191
192        // Limit to 25 levels
193        bids.truncate(25);
194        asks.truncate(25);
195
196        Ok(TardisL2Event {
197            symbol,
198            timestamp_us,
199            bids,
200            asks,
201            local_timestamp_us,
202        })
203    }
204}
205
206/// Error types for parsing Tardis CSV data.
207#[derive(Debug, Clone)]
208pub enum ParseError {
209    /// The input is empty.
210    EmptyInput,
211    /// The format of the input is invalid.
212    InvalidFormat(String),
213    /// The timestamp in the input is invalid.
214    InvalidTimestamp(String),
215    /// The price in the input is invalid.
216    InvalidPrice(String),
217    /// The quantity in the input is invalid.
218    InvalidQuantity(String),
219    /// The side in the input is invalid.
220    InvalidSide(String),
221    /// The data in the input is inconsistent.
222    InconsistentData,
223    /// An I/O error occurred.
224    IoError(String),
225}
226
227impl std::fmt::Display for ParseError {
228    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229        match self {
230            ParseError::EmptyInput => write!(f, "Empty input"),
231            ParseError::InvalidFormat(line) => write!(f, "Invalid format: {line}"),
232            ParseError::InvalidTimestamp(ts) => write!(f, "Invalid timestamp: {ts}"),
233            ParseError::InvalidPrice(p) => write!(f, "Invalid price: {p}"),
234            ParseError::InvalidQuantity(q) => write!(f, "Invalid quantity: {q}"),
235            ParseError::InvalidSide(s) => write!(f, "Invalid side: {s}"),
236            ParseError::InconsistentData => write!(f, "Inconsistent data in group"),
237            ParseError::IoError(e) => write!(f, "IO error: {e}"),
238        }
239    }
240}
241
242impl std::error::Error for ParseError {}
243
244/// Binance Tardis L2 data adapter
245pub struct BinanceTardisAdapter {
246    /// Symbol mapping (Tardis format to internal format)
247    symbol_map: FxHashMap<String, String>,
248    /// Buffer for grouped lines
249    line_buffer: Vec<String>,
250    /// Current timestamp being processed
251    current_timestamp: Option<u64>,
252}
253
254impl Default for BinanceTardisAdapter {
255    fn default() -> Self {
256        Self::new()
257    }
258}
259
260impl BinanceTardisAdapter {
261    /// Create a new adapter
262    #[must_use]
263    pub fn new() -> Self {
264        let mut symbol_map = FxHashMap::default();
265        // Common mappings
266        symbol_map.insert("BTCUSDT".to_string(), "BTC-USDT".to_string());
267        symbol_map.insert("ETHUSDT".to_string(), "ETH-USDT".to_string());
268
269        Self {
270            symbol_map,
271            line_buffer: Vec::with_capacity(50), // 25 bids + 25 asks
272            current_timestamp: None,
273        }
274    }
275
276    /// Add custom symbol mapping
277    pub fn add_symbol_mapping(&mut self, tardis_symbol: String, internal_symbol: String) {
278        self.symbol_map.insert(tardis_symbol, internal_symbol);
279    }
280
281    /// Process a CSV line and return snapshot if complete
282    pub fn process_line(&mut self, line: &str) -> Result<Option<OrderBookSnapshot>, ParseError> {
283        // Quick check for timestamp
284        let timestamp = line
285            .split(',')
286            .next()
287            .ok_or_else(|| ParseError::InvalidFormat(line.to_string()))?
288            .parse::<u64>()
289            .map_err(|_| ParseError::InvalidTimestamp(line.to_string()))?;
290
291        // Check if we're starting a new group
292        if let Some(current_ts) = self.current_timestamp {
293            if timestamp != current_ts {
294                // Process buffered lines
295                let lines: Vec<&str> = self.line_buffer.iter().map(|s| s.as_str()).collect();
296                let event = TardisL2Event::from_csv_lines(&lines)?;
297
298                // Clear buffer and start new group
299                self.line_buffer.clear();
300                self.line_buffer.push(line.to_string());
301                self.current_timestamp = Some(timestamp);
302
303                // Apply symbol mapping
304                let mut snapshot = event.to_snapshot();
305                if let Some(mapped) = self.symbol_map.get(&snapshot.symbol) {
306                    snapshot.symbol = mapped.clone();
307                }
308
309                return Ok(Some(snapshot));
310            }
311        } else {
312            self.current_timestamp = Some(timestamp);
313        }
314
315        // Add to current group
316        self.line_buffer.push(line.to_string());
317        Ok(None)
318    }
319
320    /// Flush any remaining buffered data
321    pub fn flush(&mut self) -> Result<Option<OrderBookSnapshot>, ParseError> {
322        if self.line_buffer.is_empty() {
323            return Ok(None);
324        }
325
326        let lines: Vec<&str> = self.line_buffer.iter().map(|s| s.as_str()).collect();
327        let event = TardisL2Event::from_csv_lines(&lines)?;
328
329        self.line_buffer.clear();
330        self.current_timestamp = None;
331
332        // Apply symbol mapping
333        let mut snapshot = event.to_snapshot();
334        if let Some(mapped) = self.symbol_map.get(&snapshot.symbol) {
335            snapshot.symbol = mapped.clone();
336        }
337
338        Ok(Some(snapshot))
339    }
340}
341
342/// Iterator adapter for reading Tardis CSV files
343pub struct TardisL2Iterator<R: Read> {
344    reader: BufReader<R>,
345    adapter: BinanceTardisAdapter,
346    finished: bool,
347}
348
349impl<R: Read> TardisL2Iterator<R> {
350    /// Create a new iterator from a reader
351    #[must_use]
352    pub fn new(reader: R) -> Self {
353        Self {
354            reader: BufReader::new(reader),
355            adapter: BinanceTardisAdapter::new(),
356            finished: false,
357        }
358    }
359
360    /// Add custom symbol mapping
361    #[must_use]
362    pub fn with_symbol_mapping(mut self, tardis_symbol: String, internal_symbol: String) -> Self {
363        self.adapter
364            .add_symbol_mapping(tardis_symbol, internal_symbol);
365        self
366    }
367}
368
369impl<R: Read> Iterator for TardisL2Iterator<R> {
370    type Item = Result<OrderBookSnapshot, ParseError>;
371
372    fn next(&mut self) -> Option<Self::Item> {
373        if self.finished {
374            return None;
375        }
376
377        let mut line = String::new();
378        loop {
379            line.clear();
380            match self.reader.read_line(&mut line) {
381                Ok(0) => {
382                    // EOF reached
383                    self.finished = true;
384                    // Flush any remaining data
385                    return self.adapter.flush().transpose();
386                }
387                Ok(_) => {
388                    let line = line.trim();
389                    if line.is_empty() || line.starts_with('#') {
390                        continue; // Skip empty lines and comments
391                    }
392
393                    match self.adapter.process_line(line) {
394                        Ok(Some(snapshot)) => return Some(Ok(snapshot)),
395                        Ok(None) => continue, // Need more lines
396                        Err(e) => return Some(Err(e)),
397                    }
398                }
399                Err(e) => {
400                    self.finished = true;
401                    return Some(Err(ParseError::IoError(e.to_string())));
402                }
403            }
404        }
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411    use rust_decimal_macros::dec;
412
413    #[test]
414    fn test_tardis_l2_event_parsing() {
415        let lines = vec![
416            "1640995200000000,BTCUSDT,bid,50000.00,1.5",
417            "1640995200000000,BTCUSDT,bid,49999.00,2.0",
418            "1640995200000000,BTCUSDT,ask,50001.00,1.2",
419            "1640995200000000,BTCUSDT,ask,50002.00,1.8",
420        ];
421
422        let event = TardisL2Event::from_csv_lines(&lines).unwrap();
423
424        assert_eq!(event.symbol, "BTCUSDT");
425        assert_eq!(event.timestamp_us, 1640995200000000);
426        assert_eq!(event.bids.len(), 2);
427        assert_eq!(event.asks.len(), 2);
428
429        // Check bid ordering (best first)
430        assert_eq!(event.bids[0].0, dec!(50000.00));
431        assert_eq!(event.bids[1].0, dec!(49999.00));
432
433        // Check ask ordering (best first)
434        assert_eq!(event.asks[0].0, dec!(50001.00));
435        assert_eq!(event.asks[1].0, dec!(50002.00));
436    }
437
438    #[test]
439    fn test_to_snapshot_conversion() {
440        let event = TardisL2Event {
441            symbol: "BTC-USDT".to_string(),
442            timestamp_us: 1640995200000000,
443            bids: vec![(dec!(50000), dec!(1.5)), (dec!(49999), dec!(2.0))],
444            asks: vec![(dec!(50001), dec!(1.2)), (dec!(50002), dec!(1.8))],
445            local_timestamp_us: None,
446        };
447
448        let snapshot = event.to_snapshot();
449
450        assert_eq!(snapshot.symbol, "BTC-USDT");
451        assert_eq!(snapshot.timestamp_ns, 1640995200000000000); // us to ns
452        assert_eq!(snapshot.bids.len(), 2);
453        assert_eq!(snapshot.asks.len(), 2);
454
455        assert_eq!(snapshot.bids[0].price, dec!(50000));
456        assert_eq!(snapshot.bids[0].quantity, dec!(1.5));
457    }
458
459    #[test]
460    fn test_adapter_streaming() {
461        let csv_data = r#"1640995200000000,BTCUSDT,bid,50000.00,1.5
4621640995200000000,BTCUSDT,bid,49999.00,2.0
4631640995200000000,BTCUSDT,ask,50001.00,1.2
4641640995200000000,BTCUSDT,ask,50002.00,1.8
4651640995201000000,BTCUSDT,bid,50000.50,1.6
4661640995201000000,BTCUSDT,ask,50001.50,1.3"#;
467
468        let mut adapter = BinanceTardisAdapter::new();
469        let lines: Vec<&str> = csv_data.lines().collect();
470
471        // Process first group
472        let mut snapshots = Vec::new();
473        for line in &lines[0..4] {
474            if let Some(snapshot) = adapter.process_line(line).unwrap() {
475                snapshots.push(snapshot);
476            }
477        }
478        assert_eq!(snapshots.len(), 0); // Still buffering
479
480        // Process first line of second group - should trigger first snapshot
481        if let Some(snapshot) = adapter.process_line(lines[4]).unwrap() {
482            snapshots.push(snapshot);
483        }
484        assert_eq!(snapshots.len(), 1);
485        assert_eq!(snapshots[0].timestamp_ns, 1640995200000000000);
486
487        // Process remaining and flush
488        for line in &lines[5..] {
489            if let Some(snapshot) = adapter.process_line(line).unwrap() {
490                snapshots.push(snapshot);
491            }
492        }
493
494        if let Some(snapshot) = adapter.flush().unwrap() {
495            snapshots.push(snapshot);
496        }
497
498        assert_eq!(snapshots.len(), 2);
499        assert_eq!(snapshots[1].timestamp_ns, 1640995201000000000);
500    }
501}