rusty_common/memory/
hft_pools.rs

1//! High-frequency trading object pools for zero-allocation operations
2//!
3//! This module provides specialized object pools for HFT operations,
4//! including pools for orders, trades, and message buffers.
5
6use crate::SmartString;
7use once_cell::sync::Lazy;
8use parking_lot::Mutex;
9use rust_decimal::Decimal;
10use std::ops::{Deref, DerefMut};
11use std::sync::Arc;
12
13/// Configuration for HFT object pools
14#[derive(Debug, Clone)]
15pub struct HftPoolConfig {
16    /// Initial pool size for orders
17    pub order_pool_size: usize,
18    /// Initial pool size for trades
19    pub trade_pool_size: usize,
20    /// Initial pool size for buffers
21    pub buffer_pool_size: usize,
22    /// Buffer size in bytes
23    pub buffer_size: usize,
24}
25
26impl Default for HftPoolConfig {
27    fn default() -> Self {
28        Self {
29            order_pool_size: 1024,
30            trade_pool_size: 2048,
31            buffer_pool_size: 128,
32            buffer_size: 64 * 1024, // 64KB
33        }
34    }
35}
36
37/// Pooled order object for zero-allocation order processing
38#[derive(Debug, Clone)]
39pub struct PooledOrder {
40    /// Unique identifier for the order assigned by the exchange
41    pub order_id: SmartString,
42    /// Client-provided order identifier for tracking purposes
43    pub client_order_id: SmartString,
44    /// Trading symbol (e.g., "BTC-USDT")
45    pub symbol: SmartString,
46    /// Order side ("buy" or "sell")
47    pub side: SmartString,
48    /// Order type (e.g., "limit", "market", "stop")
49    pub order_type: SmartString,
50    /// Order price in decimal precision
51    pub price: Decimal,
52    /// Total order quantity in decimal precision
53    pub quantity: Decimal,
54    /// Quantity that has been filled so far
55    pub filled_quantity: Decimal,
56    /// Quantity remaining to be filled
57    pub remaining_quantity: Decimal,
58    /// Current order status (e.g., "new", "filled", "cancelled")
59    pub status: SmartString,
60    /// System timestamp when the order was created (nanoseconds)
61    pub timestamp: u64,
62    /// Exchange timestamp when the order was received (nanoseconds)
63    pub exchange_timestamp: u64,
64}
65
66impl PooledOrder {
67    /// Create a new pooled order with default values
68    fn new() -> Self {
69        Self {
70            order_id: SmartString::new(),
71            client_order_id: SmartString::new(),
72            symbol: SmartString::new(),
73            side: SmartString::new(),
74            order_type: SmartString::new(),
75            price: Decimal::ZERO,
76            quantity: Decimal::ZERO,
77            filled_quantity: Decimal::ZERO,
78            remaining_quantity: Decimal::ZERO,
79            status: SmartString::new(),
80            timestamp: 0,
81            exchange_timestamp: 0,
82        }
83    }
84
85    /// Reset the order to default state for reuse
86    fn reset(&mut self) {
87        self.order_id.clear();
88        self.client_order_id.clear();
89        self.symbol.clear();
90        self.side.clear();
91        self.order_type.clear();
92        self.price = Decimal::ZERO;
93        self.quantity = Decimal::ZERO;
94        self.filled_quantity = Decimal::ZERO;
95        self.remaining_quantity = Decimal::ZERO;
96        self.status.clear();
97        self.timestamp = 0;
98        self.exchange_timestamp = 0;
99    }
100}
101
102/// Pooled trade object for zero-allocation trade processing
103#[derive(Debug, Clone)]
104pub struct PooledTrade {
105    /// Unique identifier for the trade assigned by the exchange
106    pub trade_id: SmartString,
107    /// Associated order identifier that generated this trade
108    pub order_id: SmartString,
109    /// Trading symbol (e.g., "BTC-USDT")
110    pub symbol: SmartString,
111    /// Execution price in decimal precision
112    pub price: Decimal,
113    /// Trade quantity in decimal precision
114    pub quantity: Decimal,
115    /// Trade side ("buy" or "sell")
116    pub side: SmartString,
117    /// System timestamp when the trade was processed (nanoseconds)
118    pub timestamp: u64,
119    /// Exchange timestamp when the trade occurred (nanoseconds)
120    pub exchange_timestamp: u64,
121    /// Whether this trade was executed as a maker (liquidity provider)
122    pub is_maker: bool,
123}
124
125impl PooledTrade {
126    /// Create a new pooled trade with default values
127    fn new() -> Self {
128        Self {
129            trade_id: SmartString::new(),
130            order_id: SmartString::new(),
131            symbol: SmartString::new(),
132            price: Decimal::ZERO,
133            quantity: Decimal::ZERO,
134            side: SmartString::new(),
135            timestamp: 0,
136            exchange_timestamp: 0,
137            is_maker: false,
138        }
139    }
140
141    /// Reset the trade to default state for reuse
142    fn reset(&mut self) {
143        self.trade_id.clear();
144        self.order_id.clear();
145        self.symbol.clear();
146        self.price = Decimal::ZERO;
147        self.quantity = Decimal::ZERO;
148        self.side.clear();
149        self.timestamp = 0;
150        self.exchange_timestamp = 0;
151        self.is_maker = false;
152    }
153}
154
155/// Object pool for orders with const generic capacity
156pub struct OrderPool<const N: usize = 1024> {
157    pool: Arc<Mutex<Vec<PooledOrder>>>,
158}
159
160impl<const N: usize> Default for OrderPool<N> {
161    fn default() -> Self {
162        Self::new()
163    }
164}
165
166impl<const N: usize> OrderPool<N> {
167    /// Create a new order pool with const generic capacity
168    pub fn new() -> Self {
169        let mut pool = Vec::with_capacity(N);
170        for _ in 0..N {
171            pool.push(PooledOrder::new());
172        }
173
174        Self {
175            pool: Arc::new(Mutex::new(pool)),
176        }
177    }
178
179    /// Acquire an order from the pool
180    pub fn acquire(&self) -> OrderHandle {
181        let mut pool = self.pool.lock();
182        let order = pool.pop().unwrap_or_else(PooledOrder::new);
183
184        OrderHandle {
185            order: Some(order),
186            pool: Arc::clone(&self.pool),
187        }
188    }
189
190    /// Get the current pool size
191    pub fn size(&self) -> usize {
192        self.pool.lock().len()
193    }
194}
195
196/// Handle to a pooled order that returns to pool on drop
197pub struct OrderHandle {
198    order: Option<PooledOrder>,
199    pool: Arc<Mutex<Vec<PooledOrder>>>,
200}
201
202impl Drop for OrderHandle {
203    fn drop(&mut self) {
204        if let Some(mut order) = self.order.take() {
205            order.reset();
206            let mut pool = self.pool.lock();
207            if pool.len() < pool.capacity() {
208                pool.push(order);
209            }
210        }
211    }
212}
213
214impl Deref for OrderHandle {
215    type Target = PooledOrder;
216
217    fn deref(&self) -> &Self::Target {
218        self.order.as_ref().unwrap()
219    }
220}
221
222impl DerefMut for OrderHandle {
223    fn deref_mut(&mut self) -> &mut Self::Target {
224        self.order.as_mut().unwrap()
225    }
226}
227
228/// Object pool for trades with const generic capacity
229pub struct TradePool<const N: usize = 2048> {
230    pool: Arc<Mutex<Vec<PooledTrade>>>,
231}
232
233impl<const N: usize> Default for TradePool<N> {
234    fn default() -> Self {
235        Self::new()
236    }
237}
238
239impl<const N: usize> TradePool<N> {
240    /// Create a new trade pool with const generic capacity
241    pub fn new() -> Self {
242        let mut pool = Vec::with_capacity(N);
243        for _ in 0..N {
244            pool.push(PooledTrade::new());
245        }
246
247        Self {
248            pool: Arc::new(Mutex::new(pool)),
249        }
250    }
251
252    /// Acquire a trade from the pool
253    pub fn acquire(&self) -> TradeHandle {
254        let mut pool = self.pool.lock();
255        let trade = pool.pop().unwrap_or_else(PooledTrade::new);
256
257        TradeHandle {
258            trade: Some(trade),
259            pool: Arc::clone(&self.pool),
260        }
261    }
262
263    /// Get the current pool size
264    pub fn size(&self) -> usize {
265        self.pool.lock().len()
266    }
267}
268
269/// Handle to a pooled trade that returns to pool on drop
270pub struct TradeHandle {
271    trade: Option<PooledTrade>,
272    pool: Arc<Mutex<Vec<PooledTrade>>>,
273}
274
275impl Drop for TradeHandle {
276    fn drop(&mut self) {
277        if let Some(mut trade) = self.trade.take() {
278            trade.reset();
279            let mut pool = self.pool.lock();
280            if pool.len() < pool.capacity() {
281                pool.push(trade);
282            }
283        }
284    }
285}
286
287impl Deref for TradeHandle {
288    type Target = PooledTrade;
289
290    fn deref(&self) -> &Self::Target {
291        self.trade.as_ref().unwrap()
292    }
293}
294
295impl DerefMut for TradeHandle {
296    fn deref_mut(&mut self) -> &mut Self::Target {
297        self.trade.as_mut().unwrap()
298    }
299}
300
301/// Buffer pool for message processing with const generic capacity and buffer size
302pub struct HftBufferPool<const N: usize = 128, const M: usize = 65536> {
303    pool: Arc<Mutex<Vec<Vec<u8>>>>,
304}
305
306impl<const N: usize, const M: usize> Default for HftBufferPool<N, M> {
307    fn default() -> Self {
308        Self::new()
309    }
310}
311
312impl<const N: usize, const M: usize> HftBufferPool<N, M> {
313    /// Create a new buffer pool with const generic capacity and buffer size
314    pub fn new() -> Self {
315        let mut pool = Vec::with_capacity(N);
316        for _ in 0..N {
317            pool.push(Vec::with_capacity(M));
318        }
319
320        Self {
321            pool: Arc::new(Mutex::new(pool)),
322        }
323    }
324
325    /// Acquire a buffer from the pool
326    pub fn acquire(&self) -> HftBufferHandle {
327        let mut pool = self.pool.lock();
328        let buffer = pool.pop().unwrap_or_else(|| Vec::with_capacity(M));
329
330        HftBufferHandle {
331            buffer: Some(buffer),
332            pool: Arc::clone(&self.pool),
333        }
334    }
335
336    /// Get the current pool size
337    pub fn size(&self) -> usize {
338        self.pool.lock().len()
339    }
340}
341
342/// Handle to a pooled buffer that returns to pool on drop
343pub struct HftBufferHandle {
344    buffer: Option<Vec<u8>>,
345    pool: Arc<Mutex<Vec<Vec<u8>>>>,
346}
347
348impl Drop for HftBufferHandle {
349    fn drop(&mut self) {
350        if let Some(mut buffer) = self.buffer.take() {
351            buffer.clear();
352            let mut pool = self.pool.lock();
353            if pool.len() < pool.capacity() {
354                pool.push(buffer);
355            }
356        }
357    }
358}
359
360impl Deref for HftBufferHandle {
361    type Target = Vec<u8>;
362
363    fn deref(&self) -> &Self::Target {
364        self.buffer.as_ref().unwrap()
365    }
366}
367
368impl DerefMut for HftBufferHandle {
369    fn deref_mut(&mut self) -> &mut Self::Target {
370        self.buffer.as_mut().unwrap()
371    }
372}
373
374/// Combined HFT pool manager
375pub struct HftPoolManager {
376    order_pool: DefaultOrderPool,
377    trade_pool: DefaultTradePool,
378    buffer_pool: DefaultHftBufferPool,
379}
380
381impl HftPoolManager {
382    /// Create a new HFT pool manager with default configuration
383    pub fn new() -> Self {
384        Self {
385            order_pool: DefaultOrderPool::new(),
386            trade_pool: DefaultTradePool::new(),
387            buffer_pool: DefaultHftBufferPool::new(),
388        }
389    }
390
391    /// Create a new HFT pool manager with custom configuration
392    /// Note: This method is deprecated in favor of const generics.
393    /// Use specific type aliases like OrderPool<N> for custom sizes.
394    #[deprecated(note = "Use const generics with specific type aliases instead")]
395    pub fn with_config(config: HftPoolConfig) -> Self {
396        // For backward compatibility, use default pools
397        Self::new()
398    }
399
400    /// Acquire an order from the pool
401    #[inline]
402    pub fn acquire_order(&self) -> OrderHandle {
403        self.order_pool.acquire()
404    }
405
406    /// Acquire a trade from the pool
407    #[inline]
408    pub fn acquire_trade(&self) -> TradeHandle {
409        self.trade_pool.acquire()
410    }
411
412    /// Acquire a buffer from the pool
413    #[inline]
414    pub fn acquire_buffer(&self) -> HftBufferHandle {
415        self.buffer_pool.acquire()
416    }
417
418    /// Get pool statistics
419    pub fn stats(&self) -> HftPoolStats {
420        HftPoolStats {
421            order_pool_size: self.order_pool.size(),
422            trade_pool_size: self.trade_pool.size(),
423            buffer_pool_size: self.buffer_pool.size(),
424        }
425    }
426}
427
428impl Default for HftPoolManager {
429    fn default() -> Self {
430        Self::new()
431    }
432}
433
434/// Statistics for HFT pools
435#[derive(Debug, Clone)]
436pub struct HftPoolStats {
437    /// Current number of orders available in the order pool
438    pub order_pool_size: usize,
439    /// Current number of trades available in the trade pool
440    pub trade_pool_size: usize,
441    /// Current number of buffers available in the buffer pool
442    pub buffer_pool_size: usize,
443}
444
445// Thread-local HFT pool manager
446thread_local! {
447    static LOCAL_HFT_POOLS: std::cell::RefCell<HftPoolManager> =
448        std::cell::RefCell::new(HftPoolManager::new());
449}
450
451/// Access thread-local HFT pools
452pub fn with_hft_pools<F, R>(f: F) -> R
453where
454    F: FnOnce(&mut HftPoolManager) -> R,
455{
456    LOCAL_HFT_POOLS.with(|pools| f(&mut pools.borrow_mut()))
457}
458
459/// Global HFT pool manager (for cross-thread usage)
460static GLOBAL_HFT_POOLS: Lazy<HftPoolManager> = Lazy::new(HftPoolManager::new);
461
462/// Get the global HFT pool manager
463pub fn global_hft_pools() -> &'static HftPoolManager {
464    &GLOBAL_HFT_POOLS
465}
466
467// Type aliases for backward compatibility
468/// Order pool with capacity for 1024 orders
469pub type OrderPool1024 = OrderPool<1024>;
470/// Order pool with capacity for 512 orders
471pub type OrderPool512 = OrderPool<512>;
472/// Order pool with capacity for 2048 orders
473pub type OrderPool2048 = OrderPool<2048>;
474
475/// Trade pool with capacity for 2048 trades
476pub type TradePool2048 = TradePool<2048>;
477/// Trade pool with capacity for 1024 trades
478pub type TradePool1024 = TradePool<1024>;
479/// Trade pool with capacity for 4096 trades
480pub type TradePool4096 = TradePool<4096>;
481
482/// HFT buffer pool with 128 buffers of 64KB each
483pub type HftBufferPool128 = HftBufferPool<128, 65536>;
484/// HFT buffer pool with 64 buffers of 32KB each
485pub type HftBufferPool64 = HftBufferPool<64, 32768>;
486/// HFT buffer pool with 256 buffers of 128KB each
487pub type HftBufferPool256 = HftBufferPool<256, 131072>;
488
489// Default type aliases for seamless migration
490pub use HftBufferPool128 as DefaultHftBufferPool;
491pub use OrderPool1024 as DefaultOrderPool;
492pub use TradePool2048 as DefaultTradePool;
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497
498    #[test]
499    fn test_order_pool() {
500        let pool = OrderPool::<10>::new();
501
502        // Acquire and use an order
503        {
504            let mut order = pool.acquire();
505            order.order_id = SmartString::from("TEST123");
506            order.symbol = SmartString::from("BTC-USDT");
507            order.price = Decimal::from(50000);
508
509            assert_eq!(order.order_id.as_str(), "TEST123");
510            assert_eq!(order.symbol.as_str(), "BTC-USDT");
511            assert_eq!(order.price, Decimal::from(50000));
512        }
513
514        // Order should be returned to pool
515        assert!(pool.size() > 0);
516
517        // Next order should be reset
518        {
519            let order = pool.acquire();
520            assert!(order.order_id.is_empty());
521            assert_eq!(order.price, Decimal::ZERO);
522        }
523    }
524
525    #[test]
526    fn test_trade_pool() {
527        let pool = TradePool::<10>::new();
528
529        {
530            let mut trade = pool.acquire();
531            trade.trade_id = SmartString::from("TRADE456");
532            trade.price = Decimal::from(49999);
533            trade.quantity = Decimal::from_str_exact("0.001").unwrap();
534            trade.is_maker = true;
535        }
536
537        assert!(pool.size() > 0);
538    }
539
540    #[test]
541    fn test_buffer_pool() {
542        let pool = HftBufferPool::<5, 1024>::new();
543
544        {
545            let mut buffer = pool.acquire();
546            buffer.extend_from_slice(b"test data");
547            assert_eq!(&buffer[..], b"test data");
548        }
549
550        // Buffer should be cleared and returned
551        assert!(pool.size() > 0);
552
553        {
554            let buffer = pool.acquire();
555            assert!(buffer.is_empty());
556        }
557    }
558
559    #[test]
560    fn test_hft_pool_manager() {
561        let manager = HftPoolManager::new();
562
563        // Test acquiring from each pool
564        let _order = manager.acquire_order();
565        let _trade = manager.acquire_trade();
566        let _buffer = manager.acquire_buffer();
567
568        let _stats = manager.stats();
569        // Pools should have items returned after handles are dropped
570    }
571
572    #[test]
573    fn test_thread_local_pools() {
574        with_hft_pools(|pools| {
575            let mut order = pools.acquire_order();
576            order.order_id = SmartString::from("LOCAL123");
577
578            let _stats = pools.stats();
579            // One order is currently acquired
580        });
581    }
582}