1use super::zerocopy_pools::{
8 BufferHandle, GlobalPoolManager, TypedMemoryPool, ZerocopyMemoryPool, ZerocopyPoolConfig,
9};
10use smallvec::SmallVec;
11use smartstring::alias::String as SmartString;
12use std::sync::Arc;
13
14pub struct TradingPoolManager<const N: usize = 4096, const M: usize = 8> {
16 global_pools: GlobalPoolManager,
18
19 websocket_message_pool: Arc<WebSocketMessagePool<N>>,
21 order_processing_pool: Arc<OrderProcessingPool<M>>,
22 market_data_pool: Arc<MarketDataPool>,
23 json_processing_pool: Arc<JsonProcessingPool>,
24}
25
26impl<const N: usize, const M: usize> TradingPoolManager<N, M> {
27 pub fn new() -> Self {
29 let global_pools = GlobalPoolManager::new();
30
31 Self {
32 websocket_message_pool: Arc::new(WebSocketMessagePool::new(
33 global_pools.small_buffer_pool().clone(),
34 global_pools.general_pool().clone(),
35 )),
36 order_processing_pool: Arc::new(OrderProcessingPool::new()),
37 market_data_pool: Arc::new(MarketDataPool::new(
38 global_pools.general_pool().clone(),
39 global_pools.simd_pool().clone(),
40 )),
41 json_processing_pool: Arc::new(JsonProcessingPool::new(
42 global_pools.small_buffer_pool().clone(),
43 global_pools.general_pool().clone(),
44 )),
45 global_pools,
46 }
47 }
48
49 pub const fn websocket_pool(&self) -> &Arc<WebSocketMessagePool<N>> {
51 &self.websocket_message_pool
52 }
53
54 pub const fn order_pool(&self) -> &Arc<OrderProcessingPool<M>> {
56 &self.order_processing_pool
57 }
58
59 pub const fn market_data_pool(&self) -> &Arc<MarketDataPool> {
61 &self.market_data_pool
62 }
63
64 pub const fn json_pool(&self) -> &Arc<JsonProcessingPool> {
66 &self.json_processing_pool
67 }
68
69 pub const fn global_pools(&self) -> &GlobalPoolManager {
71 &self.global_pools
72 }
73}
74
75impl<const N: usize, const M: usize> Default for TradingPoolManager<N, M> {
76 fn default() -> Self {
77 Self::new()
78 }
79}
80
81pub struct WebSocketMessagePool<const N: usize = 4096> {
83 small_buffer_pool: Arc<ZerocopyMemoryPool>, large_buffer_pool: Arc<ZerocopyMemoryPool>, }
86
87impl<const N: usize> WebSocketMessagePool<N> {
88 pub const fn new(
90 small_pool: Arc<ZerocopyMemoryPool>,
91 large_pool: Arc<ZerocopyMemoryPool>,
92 ) -> Self {
93 Self {
94 small_buffer_pool: small_pool,
95 large_buffer_pool: large_pool,
96 }
97 }
98
99 pub fn get_message_buffer(&self, estimated_size: usize) -> Option<BufferHandle> {
101 if estimated_size <= N {
102 self.small_buffer_pool.allocate_buffer()
103 } else {
104 self.large_buffer_pool.allocate_buffer()
105 }
106 }
107
108 pub fn get_binary_buffer(&self) -> Option<BufferHandle> {
110 self.large_buffer_pool.allocate_buffer()
111 }
112
113 pub fn get_text_buffer(&self) -> Option<BufferHandle> {
115 self.small_buffer_pool.allocate_buffer()
116 }
117}
118
119pub struct OrderProcessingPool<const M: usize = 8> {
121 report_buffer_pool: Arc<ZerocopyMemoryPool>,
124 order_buffer_pool: Arc<ZerocopyMemoryPool>,
125 string_pool: Arc<TypedMemoryPool<SmartString>>,
126 order_ids: Arc<TypedMemoryPool<SmallVec<[SmartString; M]>>>,
127}
128
129impl<const M: usize> OrderProcessingPool<M> {
130 pub fn new() -> Self {
132 let report_config = ZerocopyPoolConfig {
133 initial_buffer_count: 128,
134 max_buffer_count: 1024,
135 buffer_size: 4 * 1024, simd_aligned: false,
137 enable_statistics: true,
138 };
139
140 let order_config = ZerocopyPoolConfig {
141 initial_buffer_count: 64,
142 max_buffer_count: 512,
143 buffer_size: 16 * 1024, simd_aligned: false,
145 enable_statistics: true,
146 };
147
148 let string_config = ZerocopyPoolConfig {
149 initial_buffer_count: 256,
150 max_buffer_count: 2048,
151 buffer_size: std::mem::size_of::<SmartString>(),
152 simd_aligned: false,
153 enable_statistics: true,
154 };
155
156 let order_id_config = ZerocopyPoolConfig {
157 initial_buffer_count: 128,
158 max_buffer_count: 1024,
159 buffer_size: std::mem::size_of::<SmallVec<[SmartString; 8]>>(),
160 simd_aligned: false,
161 enable_statistics: true,
162 };
163
164 Self {
165 report_buffer_pool: ZerocopyMemoryPool::new(report_config),
166 order_buffer_pool: ZerocopyMemoryPool::new(order_config),
167 string_pool: TypedMemoryPool::new(string_config),
168 order_ids: TypedMemoryPool::new(order_id_config),
169 }
170 }
171
172 pub fn get_execution_report_buffer(&self) -> Option<BufferHandle> {
174 self.report_buffer_pool.allocate_buffer()
175 }
176
177 pub fn get_order_collection_buffer(&self) -> Option<BufferHandle> {
179 self.order_buffer_pool.allocate_buffer()
180 }
181
182 pub fn get_string(
184 &self,
185 ) -> Option<crate::memory::zerocopy_pools::TypedBufferHandle<SmartString>> {
186 self.string_pool.allocate_typed()
187 }
188
189 pub fn get_order_ids(
191 &self,
192 ) -> Option<crate::memory::zerocopy_pools::TypedBufferHandle<SmallVec<[SmartString; M]>>> {
193 self.order_ids.allocate_typed()
194 }
195}
196
197impl<const M: usize> Default for OrderProcessingPool<M> {
198 fn default() -> Self {
199 Self::new()
200 }
201}
202
203pub struct MarketDataPool {
205 orderbook_buffer_pool: Arc<ZerocopyMemoryPool>,
208 trade_buffer_pool: Arc<ZerocopyMemoryPool>,
209 general_buffer_pool: Arc<ZerocopyMemoryPool>,
210 simd_pool: Arc<crate::memory::zerocopy_pools::SimdMemoryPool>,
211}
212
213impl MarketDataPool {
214 pub fn new(
216 general_pool: Arc<ZerocopyMemoryPool>,
217 simd_pool: Arc<crate::memory::zerocopy_pools::SimdMemoryPool>,
218 ) -> Self {
219 let orderbook_config = ZerocopyPoolConfig {
220 initial_buffer_count: 32,
221 max_buffer_count: 256,
222 buffer_size: 32 * 1024, simd_aligned: true,
224 enable_statistics: true,
225 };
226
227 let trade_config = ZerocopyPoolConfig {
228 initial_buffer_count: 64,
229 max_buffer_count: 512,
230 buffer_size: 16 * 1024, simd_aligned: false,
232 enable_statistics: true,
233 };
234
235 Self {
236 orderbook_buffer_pool: ZerocopyMemoryPool::new(orderbook_config),
237 trade_buffer_pool: ZerocopyMemoryPool::new(trade_config),
238 general_buffer_pool: general_pool,
239 simd_pool,
240 }
241 }
242
243 pub fn get_orderbook_buffer(&self) -> Option<BufferHandle> {
245 self.orderbook_buffer_pool.allocate_buffer()
246 }
247
248 pub fn get_trade_buffer(&self) -> Option<BufferHandle> {
250 self.trade_buffer_pool.allocate_buffer()
251 }
252
253 pub fn get_processing_buffer(&self) -> Option<BufferHandle> {
255 self.general_buffer_pool.allocate_buffer()
256 }
257
258 pub fn get_simd_buffer(&self) -> Option<crate::memory::zerocopy_pools::SimdBufferHandle> {
260 self.simd_pool.allocate_simd_buffer()
261 }
262}
263
264pub struct JsonProcessingPool {
266 small_buffer_pool: Arc<ZerocopyMemoryPool>, large_buffer_pool: Arc<ZerocopyMemoryPool>, serialization_threshold: usize,
269}
270
271impl JsonProcessingPool {
272 pub const fn new(
274 small_pool: Arc<ZerocopyMemoryPool>,
275 large_pool: Arc<ZerocopyMemoryPool>,
276 ) -> Self {
277 Self {
278 small_buffer_pool: small_pool,
279 large_buffer_pool: large_pool,
280 serialization_threshold: 4 * 1024, }
282 }
283
284 pub fn get_serialization_buffer(&self, estimated_size: usize) -> Option<BufferHandle> {
286 if estimated_size <= self.serialization_threshold {
287 self.small_buffer_pool.allocate_buffer()
288 } else {
289 self.large_buffer_pool.allocate_buffer()
290 }
291 }
292
293 pub fn get_parsing_buffer(&self) -> Option<BufferHandle> {
295 self.large_buffer_pool.allocate_buffer()
296 }
297
298 pub fn get_value_buffer(&self) -> Option<BufferHandle> {
300 self.small_buffer_pool.allocate_buffer()
301 }
302}
303
304pub trait PooledWebSocketMessage {
306 fn from_pooled_buffer(
308 pool: &WebSocketMessagePool,
309 message_type: WebSocketMessageType,
310 data: &[u8],
311 ) -> Option<Self>
312 where
313 Self: Sized;
314}
315
316#[derive(Debug, Clone, Copy, PartialEq, Eq)]
318pub enum WebSocketMessageType {
319 Text,
321 Binary,
323 Ping,
325 Pong,
327 Close,
329}
330
331pub trait PooledOrderProcessing {
334 fn get_order_processing_buffer(
336 pool: &OrderProcessingPool,
337 buffer_type: OrderBufferType,
338 ) -> Option<BufferHandle>;
339}
340
341#[derive(Debug, Clone, Copy, PartialEq, Eq)]
343pub enum OrderBufferType {
344 ExecutionReport,
346 OrderCollection,
348 OrderIdBatch,
350}
351
352pub struct DefaultOrderProcessing;
354
355impl PooledOrderProcessing for DefaultOrderProcessing {
356 fn get_order_processing_buffer(
357 pool: &OrderProcessingPool,
358 buffer_type: OrderBufferType,
359 ) -> Option<BufferHandle> {
360 match buffer_type {
361 OrderBufferType::ExecutionReport => pool.get_execution_report_buffer(),
362 OrderBufferType::OrderCollection => pool.get_order_collection_buffer(),
363 OrderBufferType::OrderIdBatch => {
364 pool.get_execution_report_buffer()
366 }
367 }
368 }
369}
370
371thread_local! {
373 static THREAD_LOCAL_POOLS: DefaultTradingPoolManager = DefaultTradingPoolManager::new();
374}
375
376pub fn with_thread_local_pools<F, R>(f: F) -> R
384where
385 F: FnOnce(&TradingPoolManager) -> R,
386{
387 THREAD_LOCAL_POOLS.with(|pools| f(pools))
388}
389
390#[deprecated(since = "0.1.0", note = "Use with_thread_local_pools for safer access")]
397pub fn get_thread_local_pools() -> &'static DefaultTradingPoolManager {
398 get_global_trading_pools()
401}
402
403static GLOBAL_TRADING_POOLS: std::sync::OnceLock<DefaultTradingPoolManager> =
405 std::sync::OnceLock::new();
406
407pub fn get_global_trading_pools() -> &'static DefaultTradingPoolManager {
409 GLOBAL_TRADING_POOLS.get_or_init(DefaultTradingPoolManager::new)
410}
411
412pub fn initialize_trading_pools() -> &'static DefaultTradingPoolManager {
414 get_global_trading_pools()
415}
416
417pub type TradingPoolManager4096 = TradingPoolManager<4096, 8>;
420pub type TradingPoolManager2048 = TradingPoolManager<2048, 16>;
422pub type TradingPoolManager8192 = TradingPoolManager<8192, 4>;
424
425pub type WebSocketMessagePool4096 = WebSocketMessagePool<4096>;
427pub type WebSocketMessagePool2048 = WebSocketMessagePool<2048>;
429pub type WebSocketMessagePool8192 = WebSocketMessagePool<8192>;
431
432pub type OrderProcessingPool8 = OrderProcessingPool<8>;
434pub type OrderProcessingPool16 = OrderProcessingPool<16>;
436pub type OrderProcessingPool4 = OrderProcessingPool<4>;
438
439pub use OrderProcessingPool8 as DefaultOrderProcessingPool;
441pub use TradingPoolManager4096 as DefaultTradingPoolManager;
442pub use WebSocketMessagePool4096 as DefaultWebSocketMessagePool;
443
444#[cfg(test)]
445mod tests {
446 use super::*;
447
448 #[test]
449 fn test_trading_pool_manager_creation() {
450 let manager = DefaultTradingPoolManager::new();
451
452 assert!(manager.websocket_pool().get_text_buffer().is_some());
454 assert!(manager.order_pool().get_execution_report_buffer().is_some());
455 assert!(manager.market_data_pool().get_processing_buffer().is_some());
456 assert!(manager.json_pool().get_parsing_buffer().is_some());
457 }
458
459 #[test]
460 fn test_websocket_message_pool() {
461 let manager = DefaultTradingPoolManager::new();
462 let ws_pool = manager.websocket_pool();
463
464 let small_buffer = ws_pool.get_message_buffer(1024);
466 assert!(small_buffer.is_some());
467 assert_eq!(small_buffer.unwrap().size(), 4 * 1024);
468
469 let large_buffer = ws_pool.get_message_buffer(8 * 1024);
471 assert!(large_buffer.is_some());
472 assert_eq!(large_buffer.unwrap().size(), 64 * 1024);
473
474 assert!(ws_pool.get_binary_buffer().is_some());
476 assert!(ws_pool.get_text_buffer().is_some());
477 }
478
479 #[test]
480 fn test_order_processing_pool() {
481 let manager = DefaultTradingPoolManager::new();
482 let order_pool = manager.order_pool();
483
484 let report_buffer = order_pool.get_execution_report_buffer().unwrap();
486 assert_eq!(report_buffer.size(), 4 * 1024);
487
488 assert!(order_pool.get_order_collection_buffer().is_some());
490 assert!(order_pool.get_string().is_some());
491 assert!(order_pool.get_order_ids().is_some());
492 }
493
494 #[test]
495 fn test_market_data_pool() {
496 let manager = DefaultTradingPoolManager::new();
497 let market_pool = manager.market_data_pool();
498
499 let orderbook_buffer = market_pool.get_orderbook_buffer().unwrap();
501 assert_eq!(orderbook_buffer.size(), 32 * 1024);
502
503 assert!(market_pool.get_trade_buffer().is_some());
505 assert!(market_pool.get_processing_buffer().is_some());
506 assert!(market_pool.get_simd_buffer().is_some());
507 }
508
509 #[test]
510 fn test_json_processing_pool() {
511 let manager = DefaultTradingPoolManager::new();
512 let json_pool = manager.json_pool();
513
514 let small_buffer = json_pool.get_serialization_buffer(1024);
516 assert!(small_buffer.is_some());
517 assert_eq!(small_buffer.unwrap().size(), 4 * 1024);
518
519 let large_buffer = json_pool.get_serialization_buffer(8 * 1024);
520 assert!(large_buffer.is_some());
521 assert_eq!(large_buffer.unwrap().size(), 64 * 1024);
522
523 assert!(json_pool.get_parsing_buffer().is_some());
525 assert!(json_pool.get_value_buffer().is_some());
526 }
527
528 #[test]
529 fn test_pooled_order_processing() {
530 let manager = DefaultTradingPoolManager::new();
531 let order_pool = manager.order_pool();
532
533 let exec_buffer = DefaultOrderProcessing::get_order_processing_buffer(
535 order_pool,
536 OrderBufferType::ExecutionReport,
537 )
538 .unwrap();
539 assert_eq!(exec_buffer.size(), 4 * 1024);
540
541 let order_buffer = DefaultOrderProcessing::get_order_processing_buffer(
542 order_pool,
543 OrderBufferType::OrderCollection,
544 )
545 .unwrap();
546 assert_eq!(order_buffer.size(), 16 * 1024);
547 }
548
549 #[test]
550 fn test_thread_local_pools() {
551 with_thread_local_pools(|pools1| {
553 with_thread_local_pools(|pools2| {
554 assert!(std::ptr::eq(pools1, pools2));
556 });
557
558 assert!(pools1.websocket_pool().get_text_buffer().is_some());
560 });
561 }
562
563 #[test]
564 fn test_global_pools() {
565 let pools1 = get_global_trading_pools();
566 let pools2 = get_global_trading_pools();
567
568 assert!(std::ptr::eq(pools1, pools2));
570
571 assert!(pools1.order_pool().get_execution_report_buffer().is_some());
573 }
574
575 #[test]
576 fn test_websocket_message_type_enum() {
577 use WebSocketMessageType::*;
578
579 let types = [Text, Binary, Ping, Pong, Close];
580
581 for (i, &type1) in types.iter().enumerate() {
583 for (j, &type2) in types.iter().enumerate() {
584 if i != j {
585 assert_ne!(type1, type2);
586 }
587 }
588 }
589 }
590
591 #[test]
592 fn test_pool_buffer_reuse() {
593 let manager = DefaultTradingPoolManager::new();
594 let ws_pool = manager.websocket_pool();
595
596 let _buffer1_ptr = {
598 let buffer = ws_pool.get_text_buffer().unwrap();
599 buffer.as_ptr()
600 }; let buffer2 = ws_pool.get_text_buffer().unwrap();
604
605 assert_eq!(buffer2.size(), 4 * 1024);
607 }
608}