1use crate::execution_engine::ExecutionReport;
7use flume::Sender;
8use log::error;
9use rust_decimal::Decimal;
10use rusty_common::SmartString;
11use rusty_common::memory::{
12 DefaultOrderProcessing, OrderBufferType, PooledOrderProcessing, with_thread_local_pools,
13};
14#[cfg(test)]
15use rusty_model::venues::Venue;
16use rusty_model::{enums::OrderStatus, instruments::InstrumentId, trading_order::Order};
17use smallvec::SmallVec;
18
19pub struct PooledExecutionReportManager;
21
22impl PooledExecutionReportManager {
23 pub fn create_and_send_report(
25 order_id: &str,
26 status: OrderStatus,
27 instrument_id: InstrumentId,
28 report_sender: &Sender<ExecutionReport>,
29 ) {
30 with_thread_local_pools(|pools| {
31 if let Some(_report_buffer) = DefaultOrderProcessing::get_order_processing_buffer(
32 pools.order_pool(),
33 OrderBufferType::ExecutionReport,
34 ) {
35 let report = ExecutionReport {
37 id: {
38 let mut id = SmartString::new();
39 id.push_str("ems_");
40 id.push_str(order_id);
41 id
42 },
43 order_id: order_id.into(),
44 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
45 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
46 instrument_id,
47 status,
48 filled_quantity: Decimal::ZERO,
49 remaining_quantity: Decimal::ZERO,
50 execution_price: None,
51 reject_reason: None,
52 exchange_execution_id: None,
53 is_final: matches!(
54 status,
55 OrderStatus::Filled | OrderStatus::Cancelled | OrderStatus::Rejected
56 ),
57 };
58
59 if let Err(e) = report_sender.try_send(report) {
60 error!("Failed to send pooled execution report for order {order_id}: {e}");
61 }
62
63 } else {
65 error!("Failed to allocate execution report from pool for order {order_id}");
66
67 Self::create_and_send_fallback_report(
69 order_id,
70 status,
71 instrument_id,
72 report_sender,
73 );
74 }
75 });
76 }
77
78 fn create_and_send_fallback_report(
80 order_id: &str,
81 status: OrderStatus,
82 instrument_id: InstrumentId,
83 report_sender: &Sender<ExecutionReport>,
84 ) {
85 let report = ExecutionReport {
86 id: {
87 let mut id = SmartString::new();
88 id.push_str("ems_");
89 id.push_str(order_id);
90 id
91 },
92 order_id: order_id.into(),
93 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
94 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
95 instrument_id,
96 status,
97 filled_quantity: Decimal::ZERO,
98 remaining_quantity: Decimal::ZERO,
99 execution_price: None,
100 reject_reason: None,
101 exchange_execution_id: None,
102 is_final: matches!(
103 status,
104 OrderStatus::Filled | OrderStatus::Cancelled | OrderStatus::Rejected
105 ),
106 };
107
108 if let Err(e) = report_sender.try_send(report) {
109 error!("Failed to send fallback execution report for order {order_id}: {e}");
110 }
111 }
112
113 pub fn create_rejection_report(
115 order_id: &str,
116 reason: &str,
117 instrument_id: InstrumentId,
118 report_sender: &Sender<ExecutionReport>,
119 ) {
120 with_thread_local_pools(|pools| {
121 if let Some(_report_buffer) = DefaultOrderProcessing::get_order_processing_buffer(
122 pools.order_pool(),
123 OrderBufferType::ExecutionReport,
124 ) {
125 let report = ExecutionReport {
126 id: {
127 let mut id = SmartString::new();
128 id.push_str("ems_");
129 id.push_str(order_id);
130 id
131 },
132 order_id: order_id.into(),
133 exchange_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
134 system_timestamp: rusty_common::time::get_epoch_timestamp_ns(),
135 instrument_id,
136 status: OrderStatus::Rejected,
137 filled_quantity: Decimal::ZERO,
138 remaining_quantity: Decimal::ZERO,
139 execution_price: None,
140 reject_reason: Some(reason.into()),
141 exchange_execution_id: None,
142 is_final: true,
143 };
144
145 if let Err(e) = report_sender.try_send(report) {
146 error!("Failed to send pooled rejection report for order {order_id}: {e}");
147 }
148 } else {
149 error!("Failed to allocate rejection report from pool for order {order_id}");
150 }
151 });
152 }
153}
154
155pub struct PooledOrderManager;
157
158impl PooledOrderManager {
159 #[must_use]
162 pub fn get_order_batch() -> Option<SmallVec<[Order; 16]>> {
163 Some(SmallVec::new())
165 }
166
167 #[must_use]
169 pub fn get_order_id_batch()
170 -> Option<rusty_common::memory::TypedBufferHandle<SmallVec<[SmartString; 8]>>> {
171 with_thread_local_pools(|pools| pools.order_pool().get_order_ids())
172 }
173
174 pub fn process_order_batch<F>(orders: Vec<Order>, processor: F) -> Result<(), String>
176 where
177 F: Fn(&mut SmallVec<[Order; 16]>) -> Result<(), String>,
178 {
179 if let Some(mut batch) = Self::get_order_batch() {
180 batch.clear();
181
182 for order in orders.into_iter().take(16) {
184 batch.push(order);
185 }
186
187 processor(&mut batch)?;
188
189 Ok(())
191 } else {
192 Err("Failed to allocate order batch from pool".to_string())
193 }
194 }
195}
196
197pub struct PooledWebSocketManager;
199
200impl PooledWebSocketManager {
201 pub fn process_websocket_message(
203 message_data: &[u8],
204 message_type: rusty_common::memory::WebSocketMessageType,
205 ) -> Result<Vec<u8>, String> {
206 with_thread_local_pools(|pools| {
207 match message_type {
208 rusty_common::memory::WebSocketMessageType::Text => {
209 if let Some(mut buffer) = pools.websocket_pool().get_text_buffer() {
210 let buffer_size = buffer.size();
211 let buffer_slice = buffer.as_slice_mut();
212
213 if message_data.len() > buffer_size {
214 return Err("Message too large for text buffer".to_string());
215 }
216
217 buffer_slice[..message_data.len()].copy_from_slice(message_data);
218 Ok(buffer_slice[..message_data.len()].to_vec())
219 } else {
220 Err("Failed to allocate text buffer from pool".to_string())
221 }
222 }
223 rusty_common::memory::WebSocketMessageType::Binary => {
224 if let Some(mut buffer) = pools.websocket_pool().get_binary_buffer() {
225 let buffer_size = buffer.size();
226 let buffer_slice = buffer.as_slice_mut();
227
228 if message_data.len() > buffer_size {
229 return Err("Message too large for binary buffer".to_string());
230 }
231
232 buffer_slice[..message_data.len()].copy_from_slice(message_data);
233 Ok(buffer_slice[..message_data.len()].to_vec())
234 } else {
235 Err("Failed to allocate binary buffer from pool".to_string())
236 }
237 }
238 _ => {
239 if let Some(mut buffer) = pools.websocket_pool().get_text_buffer() {
241 let buffer_size = buffer.size();
242 let buffer_slice = buffer.as_slice_mut();
243
244 if message_data.len() > buffer_size {
245 return Err("Message too large for buffer".to_string());
246 }
247
248 buffer_slice[..message_data.len()].copy_from_slice(message_data);
249 Ok(buffer_slice[..message_data.len()].to_vec())
250 } else {
251 Err("Failed to allocate buffer from pool".to_string())
252 }
253 }
254 }
255 })
256 }
257
258 pub fn create_json_message(json_data: &str) -> Result<Vec<u8>, String> {
260 with_thread_local_pools(|pools| {
261 let estimated_size = json_data.len() + 100; if let Some(mut buffer) = pools.json_pool().get_serialization_buffer(estimated_size) {
264 let buffer_size = buffer.size();
265 let buffer_slice = buffer.as_slice_mut();
266 let json_bytes = json_data.as_bytes();
267
268 if json_bytes.len() > buffer_size {
269 return Err("JSON too large for buffer".to_string());
270 }
271
272 buffer_slice[..json_bytes.len()].copy_from_slice(json_bytes);
273 Ok(buffer_slice[..json_bytes.len()].to_vec())
274 } else {
275 Err("Failed to allocate JSON buffer from pool".to_string())
276 }
277 })
278 }
279}
280
281#[derive(Debug, Clone, Copy, Default)]
283pub struct PooledOperationStats {
284 pub execution_reports_pooled: u64,
286 pub execution_reports_fallback: u64,
288 pub order_batches_pooled: u64,
290 pub websocket_messages_pooled: u64,
292 pub json_messages_pooled: u64,
294 pub pool_allocation_failures: u64,
296}
297
298static POOL_STATS: std::sync::RwLock<PooledOperationStats> =
300 std::sync::RwLock::new(PooledOperationStats {
301 execution_reports_pooled: 0,
302 execution_reports_fallback: 0,
303 order_batches_pooled: 0,
304 websocket_messages_pooled: 0,
305 json_messages_pooled: 0,
306 pool_allocation_failures: 0,
307 });
308
309pub fn get_pool_operation_stats() -> PooledOperationStats {
311 *POOL_STATS.read().unwrap()
312}
313
314pub fn reset_pool_operation_stats() {
316 *POOL_STATS.write().unwrap() = PooledOperationStats::default();
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322 use flume::bounded;
323 use rust_decimal_macros::dec;
324 use rusty_model::enums::TimeInForce;
325
326 #[test]
327 fn test_pooled_execution_report_creation() {
328 let (report_tx, report_rx) = bounded(10);
329 let instrument_id = InstrumentId::new("BTC-USDT", Venue::Binance);
330
331 PooledExecutionReportManager::create_and_send_report(
332 "test_order_123",
333 OrderStatus::New,
334 instrument_id.clone(),
335 &report_tx,
336 );
337
338 let report = report_rx.try_recv().unwrap();
340 assert_eq!(report.order_id, "test_order_123");
341 assert_eq!(report.status, OrderStatus::New);
342 assert_eq!(report.instrument_id, instrument_id);
343 }
344
345 #[test]
346 fn test_pooled_rejection_report() {
347 let (report_tx, report_rx) = bounded(10);
348 let instrument_id = InstrumentId::new("ETH-USDT", Venue::Coinbase);
349
350 PooledExecutionReportManager::create_rejection_report(
351 "rejected_order_456",
352 "Insufficient funds",
353 instrument_id,
354 &report_tx,
355 );
356
357 let report = report_rx.try_recv().unwrap();
358 assert_eq!(report.order_id, "rejected_order_456");
359 assert_eq!(report.status, OrderStatus::Rejected);
360 assert_eq!(report.reject_reason, Some("Insufficient funds".into()));
361 assert!(report.is_final);
362 }
363
364 #[test]
365 fn test_pooled_order_batch() {
366 use rust_decimal_macros::dec;
367 use rusty_model::enums::{OrderSide, OrderType};
368 use rusty_model::types::{ClientId, OrderId};
369
370 let test_orders = vec![Order {
371 id: OrderId::new(),
372 client_id: ClientId::new("test_client"),
373 symbol: "BTC-USDT".into(),
374 side: OrderSide::Buy,
375 order_type: OrderType::Limit,
376 quantity: dec!(0.001),
377 price: Some(dec!(50000)),
378 stop_price: None,
379 exchange_order_id: None,
380 venue: Venue::Binance,
381 filled_quantity: dec!(0),
382 average_fill_price: None,
383 status: OrderStatus::New,
384 creation_time_ns: rusty_common::time::get_epoch_timestamp_ns(),
385 update_time_ns: rusty_common::time::get_epoch_timestamp_ns(),
386 time_in_force: TimeInForce::GTC,
387 metadata: rusty_common::json::json!(null),
388 }];
389
390 let result = PooledOrderManager::process_order_batch(test_orders, |batch| {
391 assert_eq!(batch.len(), 1);
392 assert_eq!(batch[0].symbol, "BTC-USDT");
393 Ok(())
394 });
395
396 assert!(result.is_ok());
397 }
398
399 #[test]
400 fn test_pooled_websocket_message() {
401 let message_data = b"Hello, WebSocket!";
402
403 let result = PooledWebSocketManager::process_websocket_message(
404 message_data,
405 rusty_common::memory::WebSocketMessageType::Text,
406 );
407
408 assert!(result.is_ok());
409 let processed_data = result.unwrap();
410 assert_eq!(processed_data, message_data.to_vec());
411 }
412
413 #[test]
414 fn test_pooled_json_message() {
415 let json_data = r#"{"type":"order","symbol":"BTC-USDT","side":"buy"}"#;
416
417 let result = PooledWebSocketManager::create_json_message(json_data);
418
419 assert!(result.is_ok());
420 let json_bytes = result.unwrap();
421 assert_eq!(json_bytes, json_data.as_bytes().to_vec());
422 }
423
424 #[test]
425 fn test_order_id_batch() {
426 let batch_handle = PooledOrderManager::get_order_id_batch();
427 assert!(batch_handle.is_some());
428
429 let mut handle = batch_handle.unwrap();
430 let order_ids = handle.as_mut();
431
432 order_ids.push("order_1".into());
433 order_ids.push("order_2".into());
434 assert_eq!(order_ids.len(), 2);
435
436 }
438
439 #[test]
440 fn test_large_message_handling() {
441 let large_data = vec![0u8; 60 * 1024]; let result = PooledWebSocketManager::process_websocket_message(
444 &large_data,
445 rusty_common::memory::WebSocketMessageType::Binary,
446 );
447
448 assert!(result.is_ok());
450 let processed_data = result.unwrap();
451 assert_eq!(processed_data.len(), large_data.len());
452 }
453
454 #[test]
455 fn test_too_large_message_handling() {
456 let too_large_data = vec![0u8; 2 * 1024 * 1024]; let result = PooledWebSocketManager::process_websocket_message(
459 &too_large_data,
460 rusty_common::memory::WebSocketMessageType::Text,
461 );
462
463 assert!(result.is_err());
465 assert!(result.unwrap_err().contains("too large"));
466 }
467}