1use parking_lot::RwLock;
8use rust_decimal::Decimal;
9use rusty_common::{SmartString, collections::FxHashMap, time::get_epoch_timestamp_ns};
10use rusty_model::{instruments::InstrumentId, venues::Venue};
11use std::fmt::Write;
12use std::sync::Arc;
13use std::time::Duration;
14
15#[derive(Debug, Clone, Default)]
17pub struct CacheStats {
18 pub order_mappings: usize,
20 pub symbol_mappings: usize,
22 pub hits: u64,
24 pub misses: u64,
26 pub hit_ratio: f64,
28}
29
30impl CacheStats {
31 pub fn update_hit_ratio(&mut self) {
33 let total = self.hits + self.misses;
34 self.hit_ratio = if total > 0 {
35 self.hits as f64 / total as f64
36 } else {
37 0.0
38 };
39 }
40}
41
42#[derive(Debug, Clone)]
44struct OrderMapping {
45 instrument_id: InstrumentId,
47 timestamp_ns: u64,
49 metadata: Option<OrderMetadata>,
51}
52
53#[derive(Debug, Clone)]
55pub struct OrderMetadata {
56 pub size: Option<Decimal>,
58 pub order_type: Option<SmartString>,
60 pub side: Option<SmartString>,
62}
63
64#[derive(Debug, Clone)]
66struct SymbolMapping {
67 normalized: SmartString,
69 timestamp_ns: u64,
71}
72
73pub trait InstrumentRegistry: Send + Sync {
75 fn cache_order_mapping(
77 &self,
78 order_id: &str,
79 instrument_id: InstrumentId,
80 metadata: Option<OrderMetadata>,
81 );
82
83 fn lookup_by_order_id(&self, order_id: &str) -> Option<InstrumentId>;
85
86 fn normalize_symbol(&self, exchange_symbol: &str, venue: Venue) -> SmartString;
88
89 fn remove_mapping(&self, order_id: &str);
91
92 fn cleanup_expired(&self);
94
95 fn get_cache_stats(&self) -> CacheStats;
97
98 fn clear_all(&self);
100}
101
102pub struct DefaultInstrumentRegistry {
104 order_mappings: Arc<RwLock<FxHashMap<SmartString, OrderMapping>>>,
106
107 symbol_mappings: Arc<RwLock<FxHashMap<(Venue, SmartString), SymbolMapping>>>,
109
110 stats: Arc<RwLock<CacheStats>>,
112
113 order_ttl_ns: u64,
115
116 symbol_ttl_ns: u64,
118}
119
120impl Default for DefaultInstrumentRegistry {
121 fn default() -> Self {
122 Self::new(
123 Duration::from_secs(24 * 60 * 60), Duration::from_secs(60 * 60), )
126 }
127}
128
129impl DefaultInstrumentRegistry {
130 #[must_use]
132 pub fn new(order_ttl: Duration, symbol_ttl: Duration) -> Self {
133 Self {
134 order_mappings: Arc::new(RwLock::new(FxHashMap::default())),
135 symbol_mappings: Arc::new(RwLock::new(FxHashMap::default())),
136 stats: Arc::new(RwLock::new(CacheStats::default())),
137 order_ttl_ns: order_ttl.as_nanos() as u64,
138 symbol_ttl_ns: symbol_ttl.as_nanos() as u64,
139 }
140 }
141
142 fn is_expired(&self, timestamp_ns: u64, ttl_ns: u64) -> bool {
144 let now = get_epoch_timestamp_ns();
145 now.saturating_sub(timestamp_ns) > ttl_ns
146 }
147
148 fn record_hit(&self) {
150 let mut stats = self.stats.write();
151 stats.hits += 1;
152 stats.update_hit_ratio();
153 }
154
155 fn record_miss(&self) {
157 let mut stats = self.stats.write();
158 stats.misses += 1;
159 stats.update_hit_ratio();
160 }
161}
162
163impl InstrumentRegistry for DefaultInstrumentRegistry {
164 fn cache_order_mapping(
165 &self,
166 order_id: &str,
167 instrument_id: InstrumentId,
168 metadata: Option<OrderMetadata>,
169 ) {
170 let mapping = OrderMapping {
171 instrument_id,
172 timestamp_ns: get_epoch_timestamp_ns(),
173 metadata,
174 };
175
176 let mut mappings = self.order_mappings.write();
177 mappings.insert(SmartString::from(order_id), mapping);
178
179 let mut stats = self.stats.write();
181 stats.order_mappings = mappings.len();
182 }
183
184 fn lookup_by_order_id(&self, order_id: &str) -> Option<InstrumentId> {
185 let mappings = self.order_mappings.read();
186
187 if let Some(mapping) = mappings.get(order_id) {
188 if self.is_expired(mapping.timestamp_ns, self.order_ttl_ns) {
190 drop(mappings);
191 self.remove_mapping(order_id);
192 self.record_miss();
193 return None;
194 }
195
196 self.record_hit();
197 Some(mapping.instrument_id.clone())
198 } else {
199 self.record_miss();
200 None
201 }
202 }
203
204 fn normalize_symbol(&self, exchange_symbol: &str, venue: Venue) -> SmartString {
205 let key = (venue, SmartString::from(exchange_symbol));
206
207 {
209 let mappings = self.symbol_mappings.read();
210 if let Some(mapping) = mappings.get(&key)
211 && !self.is_expired(mapping.timestamp_ns, self.symbol_ttl_ns)
212 {
213 self.record_hit();
214 return mapping.normalized.clone();
215 }
216 }
217
218 self.record_miss();
220 let normalized = Self::compute_normalization(exchange_symbol, venue);
221
222 let mapping = SymbolMapping {
224 normalized: normalized.clone(),
225 timestamp_ns: get_epoch_timestamp_ns(),
226 };
227
228 let mut mappings = self.symbol_mappings.write();
229 mappings.insert(key, mapping);
230
231 let mut stats = self.stats.write();
233 stats.symbol_mappings = mappings.len();
234
235 normalized
236 }
237
238 fn remove_mapping(&self, order_id: &str) {
239 let mut mappings = self.order_mappings.write();
240 mappings.remove(order_id);
241
242 let mut stats = self.stats.write();
244 stats.order_mappings = mappings.len();
245 }
246
247 fn cleanup_expired(&self) {
248 let now = get_epoch_timestamp_ns();
249
250 {
252 let mut mappings = self.order_mappings.write();
253 mappings.retain(|_, mapping| !self.is_expired(mapping.timestamp_ns, self.order_ttl_ns));
254 }
255
256 {
258 let mut mappings = self.symbol_mappings.write();
259 mappings
260 .retain(|_, mapping| !self.is_expired(mapping.timestamp_ns, self.symbol_ttl_ns));
261 }
262
263 let order_count = self.order_mappings.read().len();
265 let symbol_count = self.symbol_mappings.read().len();
266
267 let mut stats = self.stats.write();
268 stats.order_mappings = order_count;
269 stats.symbol_mappings = symbol_count;
270 }
271
272 fn get_cache_stats(&self) -> CacheStats {
273 self.stats.read().clone()
274 }
275
276 fn clear_all(&self) {
277 self.order_mappings.write().clear();
278 self.symbol_mappings.write().clear();
279
280 let mut stats = self.stats.write();
281 stats.order_mappings = 0;
282 stats.symbol_mappings = 0;
283 }
284}
285
286impl DefaultInstrumentRegistry {
287 fn compute_normalization(exchange_symbol: &str, venue: Venue) -> SmartString {
289 match venue {
290 Venue::Binance => {
291 Self::normalize_binance_symbol(exchange_symbol)
293 }
294 Venue::Coinbase => {
295 SmartString::from(exchange_symbol.replace('-', "/"))
297 }
298 Venue::Bybit => {
299 Self::normalize_binance_symbol(exchange_symbol)
301 }
302 Venue::Upbit => {
303 if let Some((quote, base)) = exchange_symbol.split_once('-') {
305 let mut s = SmartString::new();
306 write!(s, "{base}/{quote}").expect("Failed to format Upbit symbol");
307 s
308 } else {
309 SmartString::from(exchange_symbol)
310 }
311 }
312 Venue::Bithumb => {
313 SmartString::from(exchange_symbol.replace('_', "/"))
315 }
316 _ => {
317 SmartString::from(exchange_symbol)
319 }
320 }
321 }
322
323 fn normalize_binance_symbol(symbol: &str) -> SmartString {
325 const QUOTE_CURRENCIES: &[&str] = &[
327 "USDT", "BUSD", "USDC", "TUSD", "DAI", "EUR", "GBP", "BRL", "TRY", "RUB", "BTC", "ETH",
328 "BNB", "XRP", "ADA", "DOT", "SOL", "DOGE", "AVAX", "MATIC", "USD", "NGN", "UAH", "VAI",
329 "BIDR", "BVND", "GYEN", "IDRT",
330 ];
331
332 for quote in QUOTE_CURRENCIES {
333 if let Some(base) = symbol.strip_suffix(quote)
334 && !base.is_empty()
335 {
336 let mut s = SmartString::new();
337 write!(s, "{base}/{quote}").expect("Failed to format Binance symbol");
338 return s;
339 }
340 }
341
342 SmartString::from(symbol)
344 }
345}
346
347#[must_use]
349pub fn create_shared_registry() -> Arc<dyn InstrumentRegistry> {
350 Arc::new(DefaultInstrumentRegistry::default())
351}
352
353#[must_use]
355pub fn create_shared_registry_with_ttl(
356 order_ttl: Duration,
357 symbol_ttl: Duration,
358) -> Arc<dyn InstrumentRegistry> {
359 Arc::new(DefaultInstrumentRegistry::new(order_ttl, symbol_ttl))
360}
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365 use rusty_model::venues::Venue;
366 use std::time::Duration;
367
368 #[test]
369 fn test_symbol_normalization() {
370 let registry = DefaultInstrumentRegistry::default();
371
372 assert_eq!(
374 registry.normalize_symbol("BTCUSDT", Venue::Binance),
375 "BTC/USDT"
376 );
377 assert_eq!(
378 registry.normalize_symbol("ETHBTC", Venue::Binance),
379 "ETH/BTC"
380 );
381
382 assert_eq!(
384 registry.normalize_symbol("BTC-USD", Venue::Coinbase),
385 "BTC/USD"
386 );
387 assert_eq!(
388 registry.normalize_symbol("ETH-EUR", Venue::Coinbase),
389 "ETH/EUR"
390 );
391
392 assert_eq!(
394 registry.normalize_symbol("BTCUSDT", Venue::Bybit),
395 "BTC/USDT"
396 );
397
398 assert_eq!(
400 registry.normalize_symbol("KRW-BTC", Venue::Upbit),
401 "BTC/KRW"
402 );
403
404 assert_eq!(
406 registry.normalize_symbol("BTC_KRW", Venue::Bithumb),
407 "BTC/KRW"
408 );
409 }
410
411 #[test]
412 fn test_order_mapping_lifecycle() {
413 let registry = DefaultInstrumentRegistry::default();
414 let instrument_id = InstrumentId::new("BTC/USDT", Venue::Binance);
415
416 assert!(registry.lookup_by_order_id("order123").is_none());
418
419 registry.cache_order_mapping("order123", instrument_id.clone(), None);
421
422 assert_eq!(registry.lookup_by_order_id("order123"), Some(instrument_id));
424
425 registry.remove_mapping("order123");
427
428 assert!(registry.lookup_by_order_id("order123").is_none());
430 }
431
432 #[test]
433 fn test_cache_stats() {
434 let registry = DefaultInstrumentRegistry::default();
435
436 let stats = registry.get_cache_stats();
438 assert_eq!(stats.hits, 0);
439 assert_eq!(stats.misses, 0);
440 assert_eq!(stats.hit_ratio, 0.0);
441
442 registry.lookup_by_order_id("nonexistent1");
444 registry.lookup_by_order_id("nonexistent2");
445
446 let instrument_id = InstrumentId::new("BTC/USDT", Venue::Binance);
448 registry.cache_order_mapping("order1", instrument_id, None);
449
450 registry.lookup_by_order_id("order1");
452
453 let stats = registry.get_cache_stats();
454 assert_eq!(stats.hits, 1);
455 assert_eq!(stats.misses, 2);
456 assert!((stats.hit_ratio - 0.333).abs() < 0.01); }
458
459 #[test]
460 fn test_binance_symbol_normalization() {
461 assert_eq!(
463 DefaultInstrumentRegistry::normalize_binance_symbol("BTCUSDT"),
464 "BTC/USDT"
465 );
466 assert_eq!(
467 DefaultInstrumentRegistry::normalize_binance_symbol("ETHBTC"),
468 "ETH/BTC"
469 );
470 assert_eq!(
471 DefaultInstrumentRegistry::normalize_binance_symbol("ADAUSDT"),
472 "ADA/USDT"
473 );
474
475 assert_eq!(
477 DefaultInstrumentRegistry::normalize_binance_symbol("USDTBUSD"),
478 "USDT/BUSD"
479 );
480 assert_eq!(
481 DefaultInstrumentRegistry::normalize_binance_symbol("BTCEUR"),
482 "BTC/EUR"
483 );
484
485 assert_eq!(
487 DefaultInstrumentRegistry::normalize_binance_symbol("UNKNOWN"),
488 "UNKNOWN"
489 );
490 }
491
492 #[test]
493 fn test_metadata_storage() {
494 let registry = DefaultInstrumentRegistry::default();
495 let instrument_id = InstrumentId::new("BTC/USDT", Venue::Binance);
496
497 let metadata = OrderMetadata {
498 size: Some(Decimal::new(1, 0)),
499 order_type: Some("LIMIT".into()),
500 side: Some("BUY".into()),
501 };
502
503 registry.cache_order_mapping("order123", instrument_id.clone(), Some(metadata));
504
505 assert_eq!(registry.lookup_by_order_id("order123"), Some(instrument_id));
507 }
508
509 #[test]
510 fn test_ttl_expiration() {
511 let registry = DefaultInstrumentRegistry::new(
512 Duration::from_millis(1), Duration::from_millis(1),
514 );
515
516 let instrument_id = InstrumentId::new("BTC/USDT", Venue::Binance);
517 registry.cache_order_mapping("order123", instrument_id, None);
518
519 assert!(registry.lookup_by_order_id("order123").is_some());
521
522 std::thread::sleep(Duration::from_millis(2));
524
525 assert!(registry.lookup_by_order_id("order123").is_none());
527 }
528}