rusty_common/memory/
trading_pools.rs

1//! Trading-specific memory pools for zero-allocation hot paths
2//!
3//! This module provides specialized memory pool integrations designed for
4//! high-frequency trading operations, eliminating allocations in critical paths
5//! such as order processing, WebSocket message handling, and market data processing.
6
7use super::zerocopy_pools::{
8    BufferHandle, GlobalPoolManager, TypedMemoryPool, ZerocopyMemoryPool, ZerocopyPoolConfig,
9};
10use smallvec::SmallVec;
11use smartstring::alias::String as SmartString;
12use std::sync::Arc;
13
14/// Trading-specific memory pool manager for zero-allocation operations with const generic buffer sizing
15pub struct TradingPoolManager<const N: usize = 4096, const M: usize = 8> {
16    /// Global pool manager for basic buffer operations
17    global_pools: GlobalPoolManager,
18
19    /// Specialized pools for trading operations
20    websocket_message_pool: Arc<WebSocketMessagePool<N>>,
21    order_processing_pool: Arc<OrderProcessingPool<M>>,
22    market_data_pool: Arc<MarketDataPool>,
23    json_processing_pool: Arc<JsonProcessingPool>,
24}
25
26impl<const N: usize, const M: usize> TradingPoolManager<N, M> {
27    /// Create a new trading pool manager with optimized configurations
28    pub fn new() -> Self {
29        let global_pools = GlobalPoolManager::new();
30
31        Self {
32            websocket_message_pool: Arc::new(WebSocketMessagePool::new(
33                global_pools.small_buffer_pool().clone(),
34                global_pools.general_pool().clone(),
35            )),
36            order_processing_pool: Arc::new(OrderProcessingPool::new()),
37            market_data_pool: Arc::new(MarketDataPool::new(
38                global_pools.general_pool().clone(),
39                global_pools.simd_pool().clone(),
40            )),
41            json_processing_pool: Arc::new(JsonProcessingPool::new(
42                global_pools.small_buffer_pool().clone(),
43                global_pools.general_pool().clone(),
44            )),
45            global_pools,
46        }
47    }
48
49    /// Get WebSocket message pool
50    pub const fn websocket_pool(&self) -> &Arc<WebSocketMessagePool<N>> {
51        &self.websocket_message_pool
52    }
53
54    /// Get order processing pool
55    pub const fn order_pool(&self) -> &Arc<OrderProcessingPool<M>> {
56        &self.order_processing_pool
57    }
58
59    /// Get market data pool
60    pub const fn market_data_pool(&self) -> &Arc<MarketDataPool> {
61        &self.market_data_pool
62    }
63
64    /// Get JSON processing pool
65    pub const fn json_pool(&self) -> &Arc<JsonProcessingPool> {
66        &self.json_processing_pool
67    }
68
69    /// Get underlying global pools for direct access
70    pub const fn global_pools(&self) -> &GlobalPoolManager {
71        &self.global_pools
72    }
73}
74
75impl<const N: usize, const M: usize> Default for TradingPoolManager<N, M> {
76    fn default() -> Self {
77        Self::new()
78    }
79}
80
81/// WebSocket message pool for zero-allocation message processing with const generic threshold
82pub struct WebSocketMessagePool<const N: usize = 4096> {
83    small_buffer_pool: Arc<ZerocopyMemoryPool>, // For small messages (< N bytes)
84    large_buffer_pool: Arc<ZerocopyMemoryPool>, // For large messages (< 64KB)
85}
86
87impl<const N: usize> WebSocketMessagePool<N> {
88    /// Create a new WebSocket message pool with const generic threshold
89    pub const fn new(
90        small_pool: Arc<ZerocopyMemoryPool>,
91        large_pool: Arc<ZerocopyMemoryPool>,
92    ) -> Self {
93        Self {
94            small_buffer_pool: small_pool,
95            large_buffer_pool: large_pool,
96        }
97    }
98
99    /// Get a buffer for WebSocket message processing using const generic threshold
100    pub fn get_message_buffer(&self, estimated_size: usize) -> Option<BufferHandle> {
101        if estimated_size <= N {
102            self.small_buffer_pool.allocate_buffer()
103        } else {
104            self.large_buffer_pool.allocate_buffer()
105        }
106    }
107
108    /// Get a buffer for binary WebSocket messages
109    pub fn get_binary_buffer(&self) -> Option<BufferHandle> {
110        self.large_buffer_pool.allocate_buffer()
111    }
112
113    /// Get a buffer for text WebSocket messages
114    pub fn get_text_buffer(&self) -> Option<BufferHandle> {
115        self.small_buffer_pool.allocate_buffer()
116    }
117}
118
119/// Order processing pool for zero-allocation order operations with const generic SmallVec capacity
120pub struct OrderProcessingPool<const M: usize = 8> {
121    // Note: ExecutionReport and Order types would need to be imported from appropriate crate
122    // For now, using generic buffer and string pools
123    report_buffer_pool: Arc<ZerocopyMemoryPool>,
124    order_buffer_pool: Arc<ZerocopyMemoryPool>,
125    string_pool: Arc<TypedMemoryPool<SmartString>>,
126    order_ids: Arc<TypedMemoryPool<SmallVec<[SmartString; M]>>>,
127}
128
129impl<const M: usize> OrderProcessingPool<M> {
130    /// Create a new order processing pool with const generic SmallVec capacity
131    pub fn new() -> Self {
132        let report_config = ZerocopyPoolConfig {
133            initial_buffer_count: 128,
134            max_buffer_count: 1024,
135            buffer_size: 4 * 1024, // 4KB for execution reports
136            simd_aligned: false,
137            enable_statistics: true,
138        };
139
140        let order_config = ZerocopyPoolConfig {
141            initial_buffer_count: 64,
142            max_buffer_count: 512,
143            buffer_size: 16 * 1024, // 16KB for order collections
144            simd_aligned: false,
145            enable_statistics: true,
146        };
147
148        let string_config = ZerocopyPoolConfig {
149            initial_buffer_count: 256,
150            max_buffer_count: 2048,
151            buffer_size: std::mem::size_of::<SmartString>(),
152            simd_aligned: false,
153            enable_statistics: true,
154        };
155
156        let order_id_config = ZerocopyPoolConfig {
157            initial_buffer_count: 128,
158            max_buffer_count: 1024,
159            buffer_size: std::mem::size_of::<SmallVec<[SmartString; 8]>>(),
160            simd_aligned: false,
161            enable_statistics: true,
162        };
163
164        Self {
165            report_buffer_pool: ZerocopyMemoryPool::new(report_config),
166            order_buffer_pool: ZerocopyMemoryPool::new(order_config),
167            string_pool: TypedMemoryPool::new(string_config),
168            order_ids: TypedMemoryPool::new(order_id_config),
169        }
170    }
171
172    /// Get a buffer for execution report processing
173    pub fn get_execution_report_buffer(&self) -> Option<BufferHandle> {
174        self.report_buffer_pool.allocate_buffer()
175    }
176
177    /// Get a buffer for order collection processing
178    pub fn get_order_collection_buffer(&self) -> Option<BufferHandle> {
179        self.order_buffer_pool.allocate_buffer()
180    }
181
182    /// Get a pooled string
183    pub fn get_string(
184        &self,
185    ) -> Option<crate::memory::zerocopy_pools::TypedBufferHandle<SmartString>> {
186        self.string_pool.allocate_typed()
187    }
188
189    /// Get a pooled order ID collection with const generic capacity
190    pub fn get_order_ids(
191        &self,
192    ) -> Option<crate::memory::zerocopy_pools::TypedBufferHandle<SmallVec<[SmartString; M]>>> {
193        self.order_ids.allocate_typed()
194    }
195}
196
197impl<const M: usize> Default for OrderProcessingPool<M> {
198    fn default() -> Self {
199        Self::new()
200    }
201}
202
203/// Market data pool for zero-allocation market data processing
204pub struct MarketDataPool {
205    // Note: OrderBook and Trade types would need to be defined or imported
206    // For now, using generic buffer pools
207    orderbook_buffer_pool: Arc<ZerocopyMemoryPool>,
208    trade_buffer_pool: Arc<ZerocopyMemoryPool>,
209    general_buffer_pool: Arc<ZerocopyMemoryPool>,
210    simd_pool: Arc<crate::memory::zerocopy_pools::SimdMemoryPool>,
211}
212
213impl MarketDataPool {
214    /// Create a new market data pool
215    pub fn new(
216        general_pool: Arc<ZerocopyMemoryPool>,
217        simd_pool: Arc<crate::memory::zerocopy_pools::SimdMemoryPool>,
218    ) -> Self {
219        let orderbook_config = ZerocopyPoolConfig {
220            initial_buffer_count: 32,
221            max_buffer_count: 256,
222            buffer_size: 32 * 1024, // 32KB for orderbook data
223            simd_aligned: true,
224            enable_statistics: true,
225        };
226
227        let trade_config = ZerocopyPoolConfig {
228            initial_buffer_count: 64,
229            max_buffer_count: 512,
230            buffer_size: 16 * 1024, // 16KB for trade collections
231            simd_aligned: false,
232            enable_statistics: true,
233        };
234
235        Self {
236            orderbook_buffer_pool: ZerocopyMemoryPool::new(orderbook_config),
237            trade_buffer_pool: ZerocopyMemoryPool::new(trade_config),
238            general_buffer_pool: general_pool,
239            simd_pool,
240        }
241    }
242
243    /// Get a buffer for orderbook processing
244    pub fn get_orderbook_buffer(&self) -> Option<BufferHandle> {
245        self.orderbook_buffer_pool.allocate_buffer()
246    }
247
248    /// Get a buffer for trade collection processing
249    pub fn get_trade_buffer(&self) -> Option<BufferHandle> {
250        self.trade_buffer_pool.allocate_buffer()
251    }
252
253    /// Get a general buffer for market data processing
254    pub fn get_processing_buffer(&self) -> Option<BufferHandle> {
255        self.general_buffer_pool.allocate_buffer()
256    }
257
258    /// Get a SIMD buffer for vectorized market data operations
259    pub fn get_simd_buffer(&self) -> Option<crate::memory::zerocopy_pools::SimdBufferHandle> {
260        self.simd_pool.allocate_simd_buffer()
261    }
262}
263
264/// JSON processing pool for zero-allocation JSON operations
265pub struct JsonProcessingPool {
266    small_buffer_pool: Arc<ZerocopyMemoryPool>, // For JSON strings < 4KB
267    large_buffer_pool: Arc<ZerocopyMemoryPool>, // For JSON strings < 64KB
268    serialization_threshold: usize,
269}
270
271impl JsonProcessingPool {
272    /// Create a new JSON processing pool
273    pub const fn new(
274        small_pool: Arc<ZerocopyMemoryPool>,
275        large_pool: Arc<ZerocopyMemoryPool>,
276    ) -> Self {
277        Self {
278            small_buffer_pool: small_pool,
279            large_buffer_pool: large_pool,
280            serialization_threshold: 4 * 1024, // 4KB threshold
281        }
282    }
283
284    /// Get a buffer for JSON serialization
285    pub fn get_serialization_buffer(&self, estimated_size: usize) -> Option<BufferHandle> {
286        if estimated_size <= self.serialization_threshold {
287            self.small_buffer_pool.allocate_buffer()
288        } else {
289            self.large_buffer_pool.allocate_buffer()
290        }
291    }
292
293    /// Get a buffer for JSON parsing
294    pub fn get_parsing_buffer(&self) -> Option<BufferHandle> {
295        self.large_buffer_pool.allocate_buffer()
296    }
297
298    /// Get a buffer for JSON value storage (generic buffer)
299    pub fn get_value_buffer(&self) -> Option<BufferHandle> {
300        self.small_buffer_pool.allocate_buffer()
301    }
302}
303
304/// Helper trait for zero-allocation WebSocket message processing
305pub trait PooledWebSocketMessage {
306    /// Create a message using pooled buffers
307    fn from_pooled_buffer(
308        pool: &WebSocketMessagePool,
309        message_type: WebSocketMessageType,
310        data: &[u8],
311    ) -> Option<Self>
312    where
313        Self: Sized;
314}
315
316/// WebSocket message types for pool optimization
317#[derive(Debug, Clone, Copy, PartialEq, Eq)]
318pub enum WebSocketMessageType {
319    /// Text-based WebSocket message (typically JSON)
320    Text,
321    /// Binary WebSocket message (raw bytes)
322    Binary,
323    /// Ping control frame for connection keep-alive
324    Ping,
325    /// Pong control frame in response to ping
326    Pong,
327    /// Close control frame to terminate connection
328    Close,
329}
330
331/// Helper trait for zero-allocation order processing
332/// Note: Concrete implementations should be done in crates that have access to order types
333pub trait PooledOrderProcessing {
334    /// Get a buffer for order processing operations
335    fn get_order_processing_buffer(
336        pool: &OrderProcessingPool,
337        buffer_type: OrderBufferType,
338    ) -> Option<BufferHandle>;
339}
340
341/// Types of order processing buffers
342#[derive(Debug, Clone, Copy, PartialEq, Eq)]
343pub enum OrderBufferType {
344    /// Buffer for processing execution reports from exchanges
345    ExecutionReport,
346    /// Buffer for storing collections of orders
347    OrderCollection,
348    /// Buffer for batch processing of order identifiers
349    OrderIdBatch,
350}
351
352/// Default implementation for buffer allocation
353pub struct DefaultOrderProcessing;
354
355impl PooledOrderProcessing for DefaultOrderProcessing {
356    fn get_order_processing_buffer(
357        pool: &OrderProcessingPool,
358        buffer_type: OrderBufferType,
359    ) -> Option<BufferHandle> {
360        match buffer_type {
361            OrderBufferType::ExecutionReport => pool.get_execution_report_buffer(),
362            OrderBufferType::OrderCollection => pool.get_order_collection_buffer(),
363            OrderBufferType::OrderIdBatch => {
364                // For order ID batches, use execution report buffer as they're similar size
365                pool.get_execution_report_buffer()
366            }
367        }
368    }
369}
370
371// Thread-local pool manager for maximum performance
372thread_local! {
373    static THREAD_LOCAL_POOLS: DefaultTradingPoolManager = DefaultTradingPoolManager::new();
374}
375
376/// Get thread-local trading pools for maximum performance
377///
378/// # Safety Note
379///
380/// This function uses the `thread_local!` macro to safely access thread-local pools.
381/// The returned reference is valid for the lifetime of the closure where it's used.
382/// Each thread maintains its own independent instance of `TradingPoolManager`.
383pub fn with_thread_local_pools<F, R>(f: F) -> R
384where
385    F: FnOnce(&TradingPoolManager) -> R,
386{
387    THREAD_LOCAL_POOLS.with(|pools| f(pools))
388}
389
390/// Get thread-local trading pools (legacy API for compatibility)
391///
392/// # Deprecated
393///
394/// This function is deprecated due to lifetime safety concerns.
395/// Use `with_thread_local_pools` instead.
396#[deprecated(since = "0.1.0", note = "Use with_thread_local_pools for safer access")]
397pub fn get_thread_local_pools() -> &'static DefaultTradingPoolManager {
398    // Return the global pools as a safe fallback
399    // This maintains API compatibility while being lifetime-safe
400    get_global_trading_pools()
401}
402
403/// Global trading pool manager instance
404static GLOBAL_TRADING_POOLS: std::sync::OnceLock<DefaultTradingPoolManager> =
405    std::sync::OnceLock::new();
406
407/// Get global trading pools (use for initialization and statistics)
408pub fn get_global_trading_pools() -> &'static DefaultTradingPoolManager {
409    GLOBAL_TRADING_POOLS.get_or_init(DefaultTradingPoolManager::new)
410}
411
412/// Initialize trading pools with custom configuration
413pub fn initialize_trading_pools() -> &'static DefaultTradingPoolManager {
414    get_global_trading_pools()
415}
416
417// Type aliases for backward compatibility
418/// Trading pool manager with 4KB WebSocket threshold and 8-element SmallVec capacity
419pub type TradingPoolManager4096 = TradingPoolManager<4096, 8>;
420/// Trading pool manager with 2KB WebSocket threshold and 16-element SmallVec capacity
421pub type TradingPoolManager2048 = TradingPoolManager<2048, 16>;
422/// Trading pool manager with 8KB WebSocket threshold and 4-element SmallVec capacity
423pub type TradingPoolManager8192 = TradingPoolManager<8192, 4>;
424
425/// WebSocket message pool with 4KB threshold for small messages
426pub type WebSocketMessagePool4096 = WebSocketMessagePool<4096>;
427/// WebSocket message pool with 2KB threshold for small messages
428pub type WebSocketMessagePool2048 = WebSocketMessagePool<2048>;
429/// WebSocket message pool with 8KB threshold for small messages
430pub type WebSocketMessagePool8192 = WebSocketMessagePool<8192>;
431
432/// Order processing pool with SmallVec capacity of 8 elements
433pub type OrderProcessingPool8 = OrderProcessingPool<8>;
434/// Order processing pool with SmallVec capacity of 16 elements
435pub type OrderProcessingPool16 = OrderProcessingPool<16>;
436/// Order processing pool with SmallVec capacity of 4 elements
437pub type OrderProcessingPool4 = OrderProcessingPool<4>;
438
439// Default type aliases for seamless migration
440pub use OrderProcessingPool8 as DefaultOrderProcessingPool;
441pub use TradingPoolManager4096 as DefaultTradingPoolManager;
442pub use WebSocketMessagePool4096 as DefaultWebSocketMessagePool;
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447
448    #[test]
449    fn test_trading_pool_manager_creation() {
450        let manager = DefaultTradingPoolManager::new();
451
452        // Test that all pools are created
453        assert!(manager.websocket_pool().get_text_buffer().is_some());
454        assert!(manager.order_pool().get_execution_report_buffer().is_some());
455        assert!(manager.market_data_pool().get_processing_buffer().is_some());
456        assert!(manager.json_pool().get_parsing_buffer().is_some());
457    }
458
459    #[test]
460    fn test_websocket_message_pool() {
461        let manager = DefaultTradingPoolManager::new();
462        let ws_pool = manager.websocket_pool();
463
464        // Test small message buffer
465        let small_buffer = ws_pool.get_message_buffer(1024);
466        assert!(small_buffer.is_some());
467        assert_eq!(small_buffer.unwrap().size(), 4 * 1024);
468
469        // Test large message buffer
470        let large_buffer = ws_pool.get_message_buffer(8 * 1024);
471        assert!(large_buffer.is_some());
472        assert_eq!(large_buffer.unwrap().size(), 64 * 1024);
473
474        // Test specific buffer types
475        assert!(ws_pool.get_binary_buffer().is_some());
476        assert!(ws_pool.get_text_buffer().is_some());
477    }
478
479    #[test]
480    fn test_order_processing_pool() {
481        let manager = DefaultTradingPoolManager::new();
482        let order_pool = manager.order_pool();
483
484        // Test execution report buffer allocation
485        let report_buffer = order_pool.get_execution_report_buffer().unwrap();
486        assert_eq!(report_buffer.size(), 4 * 1024);
487
488        // Test other allocations
489        assert!(order_pool.get_order_collection_buffer().is_some());
490        assert!(order_pool.get_string().is_some());
491        assert!(order_pool.get_order_ids().is_some());
492    }
493
494    #[test]
495    fn test_market_data_pool() {
496        let manager = DefaultTradingPoolManager::new();
497        let market_pool = manager.market_data_pool();
498
499        // Test orderbook buffer allocation
500        let orderbook_buffer = market_pool.get_orderbook_buffer().unwrap();
501        assert_eq!(orderbook_buffer.size(), 32 * 1024);
502
503        // Test other allocations
504        assert!(market_pool.get_trade_buffer().is_some());
505        assert!(market_pool.get_processing_buffer().is_some());
506        assert!(market_pool.get_simd_buffer().is_some());
507    }
508
509    #[test]
510    fn test_json_processing_pool() {
511        let manager = DefaultTradingPoolManager::new();
512        let json_pool = manager.json_pool();
513
514        // Test buffer allocation based on size
515        let small_buffer = json_pool.get_serialization_buffer(1024);
516        assert!(small_buffer.is_some());
517        assert_eq!(small_buffer.unwrap().size(), 4 * 1024);
518
519        let large_buffer = json_pool.get_serialization_buffer(8 * 1024);
520        assert!(large_buffer.is_some());
521        assert_eq!(large_buffer.unwrap().size(), 64 * 1024);
522
523        // Test other allocations
524        assert!(json_pool.get_parsing_buffer().is_some());
525        assert!(json_pool.get_value_buffer().is_some());
526    }
527
528    #[test]
529    fn test_pooled_order_processing() {
530        let manager = DefaultTradingPoolManager::new();
531        let order_pool = manager.order_pool();
532
533        // Test different buffer types
534        let exec_buffer = DefaultOrderProcessing::get_order_processing_buffer(
535            order_pool,
536            OrderBufferType::ExecutionReport,
537        )
538        .unwrap();
539        assert_eq!(exec_buffer.size(), 4 * 1024);
540
541        let order_buffer = DefaultOrderProcessing::get_order_processing_buffer(
542            order_pool,
543            OrderBufferType::OrderCollection,
544        )
545        .unwrap();
546        assert_eq!(order_buffer.size(), 16 * 1024);
547    }
548
549    #[test]
550    fn test_thread_local_pools() {
551        // Test with the new closure-based API
552        with_thread_local_pools(|pools1| {
553            with_thread_local_pools(|pools2| {
554                // Should be the same instance within the same thread
555                assert!(std::ptr::eq(pools1, pools2));
556            });
557
558            // Should be able to allocate from thread-local pools
559            assert!(pools1.websocket_pool().get_text_buffer().is_some());
560        });
561    }
562
563    #[test]
564    fn test_global_pools() {
565        let pools1 = get_global_trading_pools();
566        let pools2 = get_global_trading_pools();
567
568        // Should be the same instance globally
569        assert!(std::ptr::eq(pools1, pools2));
570
571        // Should be able to allocate from global pools
572        assert!(pools1.order_pool().get_execution_report_buffer().is_some());
573    }
574
575    #[test]
576    fn test_websocket_message_type_enum() {
577        use WebSocketMessageType::*;
578
579        let types = [Text, Binary, Ping, Pong, Close];
580
581        // Test that all types are distinct
582        for (i, &type1) in types.iter().enumerate() {
583            for (j, &type2) in types.iter().enumerate() {
584                if i != j {
585                    assert_ne!(type1, type2);
586                }
587            }
588        }
589    }
590
591    #[test]
592    fn test_pool_buffer_reuse() {
593        let manager = DefaultTradingPoolManager::new();
594        let ws_pool = manager.websocket_pool();
595
596        // Allocate and drop a buffer
597        let _buffer1_ptr = {
598            let buffer = ws_pool.get_text_buffer().unwrap();
599            buffer.as_ptr()
600        }; // Buffer is dropped here
601
602        // Allocate another buffer - should potentially reuse the same memory
603        let buffer2 = ws_pool.get_text_buffer().unwrap();
604
605        // The test passes if we can allocate successfully (reuse is implementation detail)
606        assert_eq!(buffer2.size(), 4 * 1024);
607    }
608}