rusty_ems/
memory_integration.rs

1//! Memory pool integration for EMS operations
2//!
3//! This module provides zero-allocation helpers for order processing,
4//! execution reports, and WebSocket message handling using memory pools.
5
6use crate::execution_engine::ExecutionReport;
7use flume::Sender;
8use log::error;
9use rust_decimal::Decimal;
10use rusty_common::SmartString;
11use rusty_common::memory::{
12    DefaultOrderProcessing, OrderBufferType, PooledOrderProcessing, with_thread_local_pools,
13};
14#[cfg(test)]
15use rusty_model::venues::Venue;
16use rusty_model::{enums::OrderStatus, instruments::InstrumentId, trading_order::Order};
17use smallvec::SmallVec;
18
19/// Memory pool integration for execution reports
20pub struct PooledExecutionReportManager;
21
22impl PooledExecutionReportManager {
23    /// Create and send an execution report using memory pools for zero-allocation
24    pub fn create_and_send_report(
25        order_id: &str,
26        status: OrderStatus,
27        instrument_id: InstrumentId,
28        report_sender: &Sender<ExecutionReport>,
29    ) {
30        with_thread_local_pools(|pools| {
31            if let Some(_report_buffer) = DefaultOrderProcessing::get_order_processing_buffer(
32                pools.order_pool(),
33                OrderBufferType::ExecutionReport,
34            ) {
35                // Create execution report using the pooled buffer
36                let report = ExecutionReport {
37                    id: {
38                        let mut id = SmartString::new();
39                        id.push_str("ems_");
40                        id.push_str(order_id);
41                        id
42                    },
43                    order_id: order_id.into(),
44                    exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
45                    system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
46                    instrument_id,
47                    status,
48                    filled_quantity: Decimal::ZERO,
49                    remaining_quantity: Decimal::ZERO,
50                    execution_price: None,
51                    reject_reason: None,
52                    exchange_execution_id: None,
53                    is_final: matches!(
54                        status,
55                        OrderStatus::Filled | OrderStatus::Cancelled | OrderStatus::Rejected
56                    ),
57                };
58
59                if let Err(e) = report_sender.try_send(report) {
60                    error!("Failed to send pooled execution report for order {order_id}: {e}");
61                }
62
63                // Buffer automatically returns to pool when dropped
64            } else {
65                error!("Failed to allocate execution report from pool for order {order_id}");
66
67                // Fallback to regular allocation
68                Self::create_and_send_fallback_report(
69                    order_id,
70                    status,
71                    instrument_id,
72                    report_sender,
73                );
74            }
75        });
76    }
77
78    /// Fallback method using regular allocation
79    fn create_and_send_fallback_report(
80        order_id: &str,
81        status: OrderStatus,
82        instrument_id: InstrumentId,
83        report_sender: &Sender<ExecutionReport>,
84    ) {
85        let report = ExecutionReport {
86            id: {
87                let mut id = SmartString::new();
88                id.push_str("ems_");
89                id.push_str(order_id);
90                id
91            },
92            order_id: order_id.into(),
93            exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
94            system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
95            instrument_id,
96            status,
97            filled_quantity: Decimal::ZERO,
98            remaining_quantity: Decimal::ZERO,
99            execution_price: None,
100            reject_reason: None,
101            exchange_execution_id: None,
102            is_final: matches!(
103                status,
104                OrderStatus::Filled | OrderStatus::Cancelled | OrderStatus::Rejected
105            ),
106        };
107
108        if let Err(e) = report_sender.try_send(report) {
109            error!("Failed to send fallback execution report for order {order_id}: {e}");
110        }
111    }
112
113    /// Create a rejection report using memory pools
114    pub fn create_rejection_report(
115        order_id: &str,
116        reason: &str,
117        instrument_id: InstrumentId,
118        report_sender: &Sender<ExecutionReport>,
119    ) {
120        with_thread_local_pools(|pools| {
121            if let Some(_report_buffer) = DefaultOrderProcessing::get_order_processing_buffer(
122                pools.order_pool(),
123                OrderBufferType::ExecutionReport,
124            ) {
125                let report = ExecutionReport {
126                    id: {
127                        let mut id = SmartString::new();
128                        id.push_str("ems_");
129                        id.push_str(order_id);
130                        id
131                    },
132                    order_id: order_id.into(),
133                    exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
134                    system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
135                    instrument_id,
136                    status: OrderStatus::Rejected,
137                    filled_quantity: Decimal::ZERO,
138                    remaining_quantity: Decimal::ZERO,
139                    execution_price: None,
140                    reject_reason: Some(reason.into()),
141                    exchange_execution_id: None,
142                    is_final: true,
143                };
144
145                if let Err(e) = report_sender.try_send(report) {
146                    error!("Failed to send pooled rejection report for order {order_id}: {e}");
147                }
148            } else {
149                error!("Failed to allocate rejection report from pool for order {order_id}");
150            }
151        });
152    }
153}
154
155/// Memory pool integration for order collections
156pub struct PooledOrderManager;
157
158impl PooledOrderManager {
159    /// Get a pooled collection for batch order operations
160    /// Note: This is a placeholder implementation using regular allocation
161    #[must_use]
162    pub fn get_order_batch() -> Option<SmallVec<[Order; 16]>> {
163        // For now, just return a regular SmallVec as we don't have typed handles implemented
164        Some(SmallVec::new())
165    }
166
167    /// Get a pooled collection for order IDs
168    #[must_use]
169    pub fn get_order_id_batch()
170    -> Option<rusty_common::memory::TypedBufferHandle<SmallVec<[SmartString; 8]>>> {
171        with_thread_local_pools(|pools| pools.order_pool().get_order_ids())
172    }
173
174    /// Process a batch of orders using pooled memory
175    pub fn process_order_batch<F>(orders: Vec<Order>, processor: F) -> Result<(), String>
176    where
177        F: Fn(&mut SmallVec<[Order; 16]>) -> Result<(), String>,
178    {
179        if let Some(mut batch) = Self::get_order_batch() {
180            batch.clear();
181
182            // Add orders to batch (up to capacity)
183            for order in orders.into_iter().take(16) {
184                batch.push(order);
185            }
186
187            processor(&mut batch)?;
188
189            // Note: In a full implementation, this would use actual pooled memory
190            Ok(())
191        } else {
192            Err("Failed to allocate order batch from pool".to_string())
193        }
194    }
195}
196
197/// Memory pool integration for WebSocket operations in EMS
198pub struct PooledWebSocketManager;
199
200impl PooledWebSocketManager {
201    /// Process WebSocket message using pooled buffers
202    pub fn process_websocket_message(
203        message_data: &[u8],
204        message_type: rusty_common::memory::WebSocketMessageType,
205    ) -> Result<Vec<u8>, String> {
206        with_thread_local_pools(|pools| {
207            match message_type {
208                rusty_common::memory::WebSocketMessageType::Text => {
209                    if let Some(mut buffer) = pools.websocket_pool().get_text_buffer() {
210                        let buffer_size = buffer.size();
211                        let buffer_slice = buffer.as_slice_mut();
212
213                        if message_data.len() > buffer_size {
214                            return Err("Message too large for text buffer".to_string());
215                        }
216
217                        buffer_slice[..message_data.len()].copy_from_slice(message_data);
218                        Ok(buffer_slice[..message_data.len()].to_vec())
219                    } else {
220                        Err("Failed to allocate text buffer from pool".to_string())
221                    }
222                }
223                rusty_common::memory::WebSocketMessageType::Binary => {
224                    if let Some(mut buffer) = pools.websocket_pool().get_binary_buffer() {
225                        let buffer_size = buffer.size();
226                        let buffer_slice = buffer.as_slice_mut();
227
228                        if message_data.len() > buffer_size {
229                            return Err("Message too large for binary buffer".to_string());
230                        }
231
232                        buffer_slice[..message_data.len()].copy_from_slice(message_data);
233                        Ok(buffer_slice[..message_data.len()].to_vec())
234                    } else {
235                        Err("Failed to allocate binary buffer from pool".to_string())
236                    }
237                }
238                _ => {
239                    // For other message types, use small buffer
240                    if let Some(mut buffer) = pools.websocket_pool().get_text_buffer() {
241                        let buffer_size = buffer.size();
242                        let buffer_slice = buffer.as_slice_mut();
243
244                        if message_data.len() > buffer_size {
245                            return Err("Message too large for buffer".to_string());
246                        }
247
248                        buffer_slice[..message_data.len()].copy_from_slice(message_data);
249                        Ok(buffer_slice[..message_data.len()].to_vec())
250                    } else {
251                        Err("Failed to allocate buffer from pool".to_string())
252                    }
253                }
254            }
255        })
256    }
257
258    /// Create JSON message using pooled buffer
259    pub fn create_json_message(json_data: &str) -> Result<Vec<u8>, String> {
260        with_thread_local_pools(|pools| {
261            let estimated_size = json_data.len() + 100; // Add some padding
262
263            if let Some(mut buffer) = pools.json_pool().get_serialization_buffer(estimated_size) {
264                let buffer_size = buffer.size();
265                let buffer_slice = buffer.as_slice_mut();
266                let json_bytes = json_data.as_bytes();
267
268                if json_bytes.len() > buffer_size {
269                    return Err("JSON too large for buffer".to_string());
270                }
271
272                buffer_slice[..json_bytes.len()].copy_from_slice(json_bytes);
273                Ok(buffer_slice[..json_bytes.len()].to_vec())
274            } else {
275                Err("Failed to allocate JSON buffer from pool".to_string())
276            }
277        })
278    }
279}
280
281/// Performance statistics for memory pool usage in EMS
282#[derive(Debug, Clone, Copy, Default)]
283pub struct PooledOperationStats {
284    /// Number of execution reports created using memory pools
285    pub execution_reports_pooled: u64,
286    /// Number of execution reports that fell back to regular allocation
287    pub execution_reports_fallback: u64,
288    /// Number of order batches processed using memory pools
289    pub order_batches_pooled: u64,
290    /// Number of WebSocket messages processed using memory pools
291    pub websocket_messages_pooled: u64,
292    /// Number of JSON messages created using memory pools
293    pub json_messages_pooled: u64,
294    /// Number of pool allocation failures that required fallback
295    pub pool_allocation_failures: u64,
296}
297
298/// Global statistics tracker for memory pool usage
299static POOL_STATS: std::sync::RwLock<PooledOperationStats> =
300    std::sync::RwLock::new(PooledOperationStats {
301        execution_reports_pooled: 0,
302        execution_reports_fallback: 0,
303        order_batches_pooled: 0,
304        websocket_messages_pooled: 0,
305        json_messages_pooled: 0,
306        pool_allocation_failures: 0,
307    });
308
309/// Get current memory pool operation statistics
310pub fn get_pool_operation_stats() -> PooledOperationStats {
311    *POOL_STATS.read().unwrap()
312}
313
314/// Reset memory pool operation statistics
315pub fn reset_pool_operation_stats() {
316    *POOL_STATS.write().unwrap() = PooledOperationStats::default();
317}
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322    use flume::bounded;
323    use rust_decimal_macros::dec;
324    use rusty_model::enums::TimeInForce;
325
326    #[test]
327    fn test_pooled_execution_report_creation() {
328        let (report_tx, report_rx) = bounded(10);
329        let instrument_id = InstrumentId::new("BTC-USDT", Venue::Binance);
330
331        PooledExecutionReportManager::create_and_send_report(
332            "test_order_123",
333            OrderStatus::New,
334            instrument_id.clone(),
335            &report_tx,
336        );
337
338        // Should receive the report
339        let report = report_rx.try_recv().unwrap();
340        assert_eq!(report.order_id, "test_order_123");
341        assert_eq!(report.status, OrderStatus::New);
342        assert_eq!(report.instrument_id, instrument_id);
343    }
344
345    #[test]
346    fn test_pooled_rejection_report() {
347        let (report_tx, report_rx) = bounded(10);
348        let instrument_id = InstrumentId::new("ETH-USDT", Venue::Coinbase);
349
350        PooledExecutionReportManager::create_rejection_report(
351            "rejected_order_456",
352            "Insufficient funds",
353            instrument_id,
354            &report_tx,
355        );
356
357        let report = report_rx.try_recv().unwrap();
358        assert_eq!(report.order_id, "rejected_order_456");
359        assert_eq!(report.status, OrderStatus::Rejected);
360        assert_eq!(report.reject_reason, Some("Insufficient funds".into()));
361        assert!(report.is_final);
362    }
363
364    #[test]
365    fn test_pooled_order_batch() {
366        use rust_decimal_macros::dec;
367        use rusty_model::enums::{OrderSide, OrderType};
368        use rusty_model::types::{ClientId, OrderId};
369
370        let test_orders = vec![Order {
371            id: OrderId::new(),
372            client_id: ClientId::new("test_client"),
373            symbol: "BTC-USDT".into(),
374            side: OrderSide::Buy,
375            order_type: OrderType::Limit,
376            quantity: dec!(0.001),
377            price: Some(dec!(50000)),
378            stop_price: None,
379            exchange_order_id: None,
380            venue: Venue::Binance,
381            filled_quantity: dec!(0),
382            average_fill_price: None,
383            status: OrderStatus::New,
384            creation_time_ns: rusty_common::time::get_epoch_timestamp_ns(),
385            update_time_ns: rusty_common::time::get_epoch_timestamp_ns(),
386            time_in_force: TimeInForce::GTC,
387            metadata: rusty_common::json::json!(null),
388        }];
389
390        let result = PooledOrderManager::process_order_batch(test_orders, |batch| {
391            assert_eq!(batch.len(), 1);
392            assert_eq!(batch[0].symbol, "BTC-USDT");
393            Ok(())
394        });
395
396        assert!(result.is_ok());
397    }
398
399    #[test]
400    fn test_pooled_websocket_message() {
401        let message_data = b"Hello, WebSocket!";
402
403        let result = PooledWebSocketManager::process_websocket_message(
404            message_data,
405            rusty_common::memory::WebSocketMessageType::Text,
406        );
407
408        assert!(result.is_ok());
409        let processed_data = result.unwrap();
410        assert_eq!(processed_data, message_data.to_vec());
411    }
412
413    #[test]
414    fn test_pooled_json_message() {
415        let json_data = r#"{"type":"order","symbol":"BTC-USDT","side":"buy"}"#;
416
417        let result = PooledWebSocketManager::create_json_message(json_data);
418
419        assert!(result.is_ok());
420        let json_bytes = result.unwrap();
421        assert_eq!(json_bytes, json_data.as_bytes().to_vec());
422    }
423
424    #[test]
425    fn test_order_id_batch() {
426        let batch_handle = PooledOrderManager::get_order_id_batch();
427        assert!(batch_handle.is_some());
428
429        let mut handle = batch_handle.unwrap();
430        let order_ids = handle.as_mut();
431
432        order_ids.push("order_1".into());
433        order_ids.push("order_2".into());
434        assert_eq!(order_ids.len(), 2);
435
436        // Handle automatically returns to pool when dropped
437    }
438
439    #[test]
440    fn test_large_message_handling() {
441        let large_data = vec![0u8; 60 * 1024]; // 60KB (fits in 64KB buffer)
442
443        let result = PooledWebSocketManager::process_websocket_message(
444            &large_data,
445            rusty_common::memory::WebSocketMessageType::Binary,
446        );
447
448        // Should succeed with large buffer
449        assert!(result.is_ok());
450        let processed_data = result.unwrap();
451        assert_eq!(processed_data.len(), large_data.len());
452    }
453
454    #[test]
455    fn test_too_large_message_handling() {
456        let too_large_data = vec![0u8; 2 * 1024 * 1024]; // 2MB (larger than pool buffers)
457
458        let result = PooledWebSocketManager::process_websocket_message(
459            &too_large_data,
460            rusty_common::memory::WebSocketMessageType::Text,
461        );
462
463        // Should fail gracefully
464        assert!(result.is_err());
465        assert!(result.unwrap_err().contains("too large"));
466    }
467}