rusty_ems/
instrument_registry.rs

1//! Unified Instrument Registry for centralized instrument/order mapping
2//!
3//! This module provides a centralized system for tracking relationships between
4//! order IDs, instrument IDs, and exchange-specific symbol formats across all venues.
5//! It eliminates 'UNKNOWN' placeholders by maintaining bidirectional mappings.
6
7use parking_lot::RwLock;
8use rust_decimal::Decimal;
9use rusty_common::{SmartString, collections::FxHashMap, time::get_epoch_timestamp_ns};
10use rusty_model::{instruments::InstrumentId, venues::Venue};
11use std::fmt::Write;
12use std::sync::Arc;
13use std::time::Duration;
14
15/// Cache statistics for monitoring and debugging
16#[derive(Debug, Clone, Default)]
17pub struct CacheStats {
18    /// Total number of cached order mappings
19    pub order_mappings: usize,
20    /// Total number of cached symbol normalizations
21    pub symbol_mappings: usize,
22    /// Number of cache hits
23    pub hits: u64,
24    /// Number of cache misses
25    pub misses: u64,
26    /// Cache hit ratio (0.0 - 1.0)
27    pub hit_ratio: f64,
28}
29
30impl CacheStats {
31    /// Update hit ratio after new stats
32    pub fn update_hit_ratio(&mut self) {
33        let total = self.hits + self.misses;
34        self.hit_ratio = if total > 0 {
35            self.hits as f64 / total as f64
36        } else {
37            0.0
38        };
39    }
40}
41
42/// Cached order mapping with TTL expiration
43#[derive(Debug, Clone)]
44struct OrderMapping {
45    /// Associated instrument
46    instrument_id: InstrumentId,
47    /// Cache entry timestamp for TTL
48    timestamp_ns: u64,
49    /// Additional metadata
50    metadata: Option<OrderMetadata>,
51}
52
53/// Additional order metadata for enhanced tracking
54#[derive(Debug, Clone)]
55pub struct OrderMetadata {
56    /// Original order size
57    pub size: Option<Decimal>,
58    /// Order type (limit, market, etc.)
59    pub order_type: Option<SmartString>,
60    /// Order side (buy/sell)
61    pub side: Option<SmartString>,
62}
63
64/// Cached symbol normalization
65#[derive(Debug, Clone)]
66struct SymbolMapping {
67    /// Normalized symbol format
68    normalized: SmartString,
69    /// Cache timestamp
70    timestamp_ns: u64,
71}
72
73/// Trait for unified instrument registry operations
74pub trait InstrumentRegistry: Send + Sync {
75    /// Cache bidirectional order ID to instrument mapping
76    fn cache_order_mapping(
77        &self,
78        order_id: &str,
79        instrument_id: InstrumentId,
80        metadata: Option<OrderMetadata>,
81    );
82
83    /// Lookup instrument ID by order ID
84    fn lookup_by_order_id(&self, order_id: &str) -> Option<InstrumentId>;
85
86    /// Normalize exchange-specific symbol to standard format
87    fn normalize_symbol(&self, exchange_symbol: &str, venue: Venue) -> SmartString;
88
89    /// Remove order mapping (e.g., when order is fully filled)
90    fn remove_mapping(&self, order_id: &str);
91
92    /// Cleanup expired entries
93    fn cleanup_expired(&self);
94
95    /// Get cache statistics
96    fn get_cache_stats(&self) -> CacheStats;
97
98    /// Clear all mappings (for testing)
99    fn clear_all(&self);
100}
101
102/// Default implementation of instrument registry
103pub struct DefaultInstrumentRegistry {
104    /// Order ID to instrument mapping
105    order_mappings: Arc<RwLock<FxHashMap<SmartString, OrderMapping>>>,
106
107    /// Exchange symbol to normalized symbol mapping
108    symbol_mappings: Arc<RwLock<FxHashMap<(Venue, SmartString), SymbolMapping>>>,
109
110    /// Cache statistics
111    stats: Arc<RwLock<CacheStats>>,
112
113    /// TTL for order mappings (default: 24 hours)
114    order_ttl_ns: u64,
115
116    /// TTL for symbol mappings (default: 1 hour)
117    symbol_ttl_ns: u64,
118}
119
120impl Default for DefaultInstrumentRegistry {
121    fn default() -> Self {
122        Self::new(
123            Duration::from_secs(24 * 60 * 60), // Order mapping TTL: 24 hours
124            Duration::from_secs(60 * 60),      // Symbol mapping TTL: 1 hour
125        )
126    }
127}
128
129impl DefaultInstrumentRegistry {
130    /// Create new instrument registry with custom TTL settings
131    #[must_use]
132    pub fn new(order_ttl: Duration, symbol_ttl: Duration) -> Self {
133        Self {
134            order_mappings: Arc::new(RwLock::new(FxHashMap::default())),
135            symbol_mappings: Arc::new(RwLock::new(FxHashMap::default())),
136            stats: Arc::new(RwLock::new(CacheStats::default())),
137            order_ttl_ns: order_ttl.as_nanos() as u64,
138            symbol_ttl_ns: symbol_ttl.as_nanos() as u64,
139        }
140    }
141
142    /// Check if timestamp is expired
143    fn is_expired(&self, timestamp_ns: u64, ttl_ns: u64) -> bool {
144        let now = get_epoch_timestamp_ns();
145        now.saturating_sub(timestamp_ns) > ttl_ns
146    }
147
148    /// Record cache hit
149    fn record_hit(&self) {
150        let mut stats = self.stats.write();
151        stats.hits += 1;
152        stats.update_hit_ratio();
153    }
154
155    /// Record cache miss
156    fn record_miss(&self) {
157        let mut stats = self.stats.write();
158        stats.misses += 1;
159        stats.update_hit_ratio();
160    }
161}
162
163impl InstrumentRegistry for DefaultInstrumentRegistry {
164    fn cache_order_mapping(
165        &self,
166        order_id: &str,
167        instrument_id: InstrumentId,
168        metadata: Option<OrderMetadata>,
169    ) {
170        let mapping = OrderMapping {
171            instrument_id,
172            timestamp_ns: get_epoch_timestamp_ns(),
173            metadata,
174        };
175
176        let mut mappings = self.order_mappings.write();
177        mappings.insert(SmartString::from(order_id), mapping);
178
179        // Update stats
180        let mut stats = self.stats.write();
181        stats.order_mappings = mappings.len();
182    }
183
184    fn lookup_by_order_id(&self, order_id: &str) -> Option<InstrumentId> {
185        let mappings = self.order_mappings.read();
186
187        if let Some(mapping) = mappings.get(order_id) {
188            // Check if expired
189            if self.is_expired(mapping.timestamp_ns, self.order_ttl_ns) {
190                drop(mappings);
191                self.remove_mapping(order_id);
192                self.record_miss();
193                return None;
194            }
195
196            self.record_hit();
197            Some(mapping.instrument_id.clone())
198        } else {
199            self.record_miss();
200            None
201        }
202    }
203
204    fn normalize_symbol(&self, exchange_symbol: &str, venue: Venue) -> SmartString {
205        let key = (venue, SmartString::from(exchange_symbol));
206
207        // Check cache first
208        {
209            let mappings = self.symbol_mappings.read();
210            if let Some(mapping) = mappings.get(&key)
211                && !self.is_expired(mapping.timestamp_ns, self.symbol_ttl_ns)
212            {
213                self.record_hit();
214                return mapping.normalized.clone();
215            }
216        }
217
218        // Cache miss - compute normalization
219        self.record_miss();
220        let normalized = Self::compute_normalization(exchange_symbol, venue);
221
222        // Cache the result
223        let mapping = SymbolMapping {
224            normalized: normalized.clone(),
225            timestamp_ns: get_epoch_timestamp_ns(),
226        };
227
228        let mut mappings = self.symbol_mappings.write();
229        mappings.insert(key, mapping);
230
231        // Update stats
232        let mut stats = self.stats.write();
233        stats.symbol_mappings = mappings.len();
234
235        normalized
236    }
237
238    fn remove_mapping(&self, order_id: &str) {
239        let mut mappings = self.order_mappings.write();
240        mappings.remove(order_id);
241
242        // Update stats
243        let mut stats = self.stats.write();
244        stats.order_mappings = mappings.len();
245    }
246
247    fn cleanup_expired(&self) {
248        let now = get_epoch_timestamp_ns();
249
250        // Clean expired order mappings
251        {
252            let mut mappings = self.order_mappings.write();
253            mappings.retain(|_, mapping| !self.is_expired(mapping.timestamp_ns, self.order_ttl_ns));
254        }
255
256        // Clean expired symbol mappings
257        {
258            let mut mappings = self.symbol_mappings.write();
259            mappings
260                .retain(|_, mapping| !self.is_expired(mapping.timestamp_ns, self.symbol_ttl_ns));
261        }
262
263        // Update stats
264        let order_count = self.order_mappings.read().len();
265        let symbol_count = self.symbol_mappings.read().len();
266
267        let mut stats = self.stats.write();
268        stats.order_mappings = order_count;
269        stats.symbol_mappings = symbol_count;
270    }
271
272    fn get_cache_stats(&self) -> CacheStats {
273        self.stats.read().clone()
274    }
275
276    fn clear_all(&self) {
277        self.order_mappings.write().clear();
278        self.symbol_mappings.write().clear();
279
280        let mut stats = self.stats.write();
281        stats.order_mappings = 0;
282        stats.symbol_mappings = 0;
283    }
284}
285
286impl DefaultInstrumentRegistry {
287    /// Compute symbol normalization based on exchange format
288    fn compute_normalization(exchange_symbol: &str, venue: Venue) -> SmartString {
289        match venue {
290            Venue::Binance => {
291                // Binance: BTCUSDT -> BTC/USDT
292                Self::normalize_binance_symbol(exchange_symbol)
293            }
294            Venue::Coinbase => {
295                // Coinbase: BTC-USD -> BTC/USD
296                SmartString::from(exchange_symbol.replace('-', "/"))
297            }
298            Venue::Bybit => {
299                // Bybit: BTCUSDT -> BTC/USDT (same as Binance)
300                Self::normalize_binance_symbol(exchange_symbol)
301            }
302            Venue::Upbit => {
303                // Upbit: KRW-BTC -> BTC/KRW
304                if let Some((quote, base)) = exchange_symbol.split_once('-') {
305                    let mut s = SmartString::new();
306                    write!(s, "{base}/{quote}").expect("Failed to format Upbit symbol");
307                    s
308                } else {
309                    SmartString::from(exchange_symbol)
310                }
311            }
312            Venue::Bithumb => {
313                // Bithumb: BTC_KRW -> BTC/KRW
314                SmartString::from(exchange_symbol.replace('_', "/"))
315            }
316            _ => {
317                // Default: return as-is
318                SmartString::from(exchange_symbol)
319            }
320        }
321    }
322
323    /// Normalize Binance/Bybit style symbols
324    fn normalize_binance_symbol(symbol: &str) -> SmartString {
325        // Common quote currencies (order by length desc for proper matching)
326        const QUOTE_CURRENCIES: &[&str] = &[
327            "USDT", "BUSD", "USDC", "TUSD", "DAI", "EUR", "GBP", "BRL", "TRY", "RUB", "BTC", "ETH",
328            "BNB", "XRP", "ADA", "DOT", "SOL", "DOGE", "AVAX", "MATIC", "USD", "NGN", "UAH", "VAI",
329            "BIDR", "BVND", "GYEN", "IDRT",
330        ];
331
332        for quote in QUOTE_CURRENCIES {
333            if let Some(base) = symbol.strip_suffix(quote)
334                && !base.is_empty()
335            {
336                let mut s = SmartString::new();
337                write!(s, "{base}/{quote}").expect("Failed to format Binance symbol");
338                return s;
339            }
340        }
341
342        // Fallback: return as-is if no quote currency matched
343        SmartString::from(symbol)
344    }
345}
346
347/// Create a shared instrument registry instance
348#[must_use]
349pub fn create_shared_registry() -> Arc<dyn InstrumentRegistry> {
350    Arc::new(DefaultInstrumentRegistry::default())
351}
352
353/// Create a shared instrument registry with custom TTL
354#[must_use]
355pub fn create_shared_registry_with_ttl(
356    order_ttl: Duration,
357    symbol_ttl: Duration,
358) -> Arc<dyn InstrumentRegistry> {
359    Arc::new(DefaultInstrumentRegistry::new(order_ttl, symbol_ttl))
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365    use rusty_model::venues::Venue;
366    use std::time::Duration;
367
368    #[test]
369    fn test_symbol_normalization() {
370        let registry = DefaultInstrumentRegistry::default();
371
372        // Test Binance format
373        assert_eq!(
374            registry.normalize_symbol("BTCUSDT", Venue::Binance),
375            "BTC/USDT"
376        );
377        assert_eq!(
378            registry.normalize_symbol("ETHBTC", Venue::Binance),
379            "ETH/BTC"
380        );
381
382        // Test Coinbase format
383        assert_eq!(
384            registry.normalize_symbol("BTC-USD", Venue::Coinbase),
385            "BTC/USD"
386        );
387        assert_eq!(
388            registry.normalize_symbol("ETH-EUR", Venue::Coinbase),
389            "ETH/EUR"
390        );
391
392        // Test Bybit format (same as Binance)
393        assert_eq!(
394            registry.normalize_symbol("BTCUSDT", Venue::Bybit),
395            "BTC/USDT"
396        );
397
398        // Test Upbit format
399        assert_eq!(
400            registry.normalize_symbol("KRW-BTC", Venue::Upbit),
401            "BTC/KRW"
402        );
403
404        // Test Bithumb format
405        assert_eq!(
406            registry.normalize_symbol("BTC_KRW", Venue::Bithumb),
407            "BTC/KRW"
408        );
409    }
410
411    #[test]
412    fn test_order_mapping_lifecycle() {
413        let registry = DefaultInstrumentRegistry::default();
414        let instrument_id = InstrumentId::new("BTC/USDT", Venue::Binance);
415
416        // Test cache miss
417        assert!(registry.lookup_by_order_id("order123").is_none());
418
419        // Cache mapping
420        registry.cache_order_mapping("order123", instrument_id.clone(), None);
421
422        // Test cache hit
423        assert_eq!(registry.lookup_by_order_id("order123"), Some(instrument_id));
424
425        // Remove mapping
426        registry.remove_mapping("order123");
427
428        // Test cache miss after removal
429        assert!(registry.lookup_by_order_id("order123").is_none());
430    }
431
432    #[test]
433    fn test_cache_stats() {
434        let registry = DefaultInstrumentRegistry::default();
435
436        // Initial stats
437        let stats = registry.get_cache_stats();
438        assert_eq!(stats.hits, 0);
439        assert_eq!(stats.misses, 0);
440        assert_eq!(stats.hit_ratio, 0.0);
441
442        // Generate some misses
443        registry.lookup_by_order_id("nonexistent1");
444        registry.lookup_by_order_id("nonexistent2");
445
446        // Cache something
447        let instrument_id = InstrumentId::new("BTC/USDT", Venue::Binance);
448        registry.cache_order_mapping("order1", instrument_id, None);
449
450        // Generate a hit
451        registry.lookup_by_order_id("order1");
452
453        let stats = registry.get_cache_stats();
454        assert_eq!(stats.hits, 1);
455        assert_eq!(stats.misses, 2);
456        assert!((stats.hit_ratio - 0.333).abs() < 0.01); // ~33.3%
457    }
458
459    #[test]
460    fn test_binance_symbol_normalization() {
461        // Test common pairs
462        assert_eq!(
463            DefaultInstrumentRegistry::normalize_binance_symbol("BTCUSDT"),
464            "BTC/USDT"
465        );
466        assert_eq!(
467            DefaultInstrumentRegistry::normalize_binance_symbol("ETHBTC"),
468            "ETH/BTC"
469        );
470        assert_eq!(
471            DefaultInstrumentRegistry::normalize_binance_symbol("ADAUSDT"),
472            "ADA/USDT"
473        );
474
475        // Test edge cases
476        assert_eq!(
477            DefaultInstrumentRegistry::normalize_binance_symbol("USDTBUSD"),
478            "USDT/BUSD"
479        );
480        assert_eq!(
481            DefaultInstrumentRegistry::normalize_binance_symbol("BTCEUR"),
482            "BTC/EUR"
483        );
484
485        // Test unknown format (fallback)
486        assert_eq!(
487            DefaultInstrumentRegistry::normalize_binance_symbol("UNKNOWN"),
488            "UNKNOWN"
489        );
490    }
491
492    #[test]
493    fn test_metadata_storage() {
494        let registry = DefaultInstrumentRegistry::default();
495        let instrument_id = InstrumentId::new("BTC/USDT", Venue::Binance);
496
497        let metadata = OrderMetadata {
498            size: Some(Decimal::new(1, 0)),
499            order_type: Some("LIMIT".into()),
500            side: Some("BUY".into()),
501        };
502
503        registry.cache_order_mapping("order123", instrument_id.clone(), Some(metadata));
504
505        // Verify mapping exists
506        assert_eq!(registry.lookup_by_order_id("order123"), Some(instrument_id));
507    }
508
509    #[test]
510    fn test_ttl_expiration() {
511        let registry = DefaultInstrumentRegistry::new(
512            Duration::from_millis(1), // Very short TTL for testing
513            Duration::from_millis(1),
514        );
515
516        let instrument_id = InstrumentId::new("BTC/USDT", Venue::Binance);
517        registry.cache_order_mapping("order123", instrument_id, None);
518
519        // Should exist initially
520        assert!(registry.lookup_by_order_id("order123").is_some());
521
522        // Wait for expiration
523        std::thread::sleep(Duration::from_millis(2));
524
525        // Should be expired and removed
526        assert!(registry.lookup_by_order_id("order123").is_none());
527    }
528}