1use crate::monitor::collector::{MarketDataEvent, Result};
7use crate::monitor::schema::{OrderBookRecord, OrderBookSerializer, TradeRecord, TradeSerializer};
8use crate::monitor::storage::{CompressionMode, manager::StorageManager};
9use flume::{Receiver, Sender};
10use parking_lot::RwLock;
11use quanta::Instant as QuantaInstant;
12use rust_decimal::prelude::ToPrimitive;
13use simd_aligned::VecSimd;
14use std::sync::Arc;
15use tokio::task::JoinHandle;
16use wide::{CmpGt, f64x4};
17
18#[derive(Debug, Clone)]
20pub struct ZerocopyPipelineConfig {
21 pub buffer_pool_size: usize,
23 pub worker_buffer_size: usize,
25 pub simd_batch_size: usize,
27 pub worker_count: usize,
29 pub compression_mode: CompressionMode,
31 pub serialization_buffer_size: usize,
33}
34
35impl Default for ZerocopyPipelineConfig {
36 fn default() -> Self {
37 Self {
38 buffer_pool_size: 64,
39 worker_buffer_size: 64 * 1024, simd_batch_size: 32, worker_count: 4,
42 compression_mode: CompressionMode::Realtime,
43 serialization_buffer_size: 128 * 1024, }
45 }
46}
47
48#[derive(Debug, Clone, Default)]
50pub struct ZerocopyPipelineStats {
51 pub events_processed: u64,
53 pub trades_processed: u64,
55 pub orderbooks_processed: u64,
57 pub bytes_serialized: u64,
59 pub compression_ratio: f64,
61 pub avg_latency_nanos: u64,
63 pub simd_batches_processed: u64,
65 pub buffer_reuse_count: u64,
67 pub zero_copy_operations: u64,
69}
70
71#[derive(Debug)]
73struct ZerocopyBufferPool {
74 serialization_buffers: Vec<Vec<u8>>,
75 compression_buffers: Vec<Vec<u8>>,
76 simd_buffers: Vec<VecSimd<f64x4>>,
77 available_serialization: Vec<usize>,
78 available_compression: Vec<usize>,
79 available_simd: Vec<usize>,
80}
81
82impl ZerocopyBufferPool {
83 fn new(config: &ZerocopyPipelineConfig) -> Self {
84 let pool_size = config.buffer_pool_size;
85
86 let mut serialization_buffers = Vec::with_capacity(pool_size);
88 let mut available_serialization = Vec::with_capacity(pool_size);
89 for i in 0..pool_size {
90 serialization_buffers.push(Vec::with_capacity(config.serialization_buffer_size));
91 available_serialization.push(i);
92 }
93
94 let mut compression_buffers = Vec::with_capacity(pool_size);
96 let mut available_compression = Vec::with_capacity(pool_size);
97 for i in 0..pool_size {
98 compression_buffers.push(Vec::with_capacity(config.worker_buffer_size));
99 available_compression.push(i);
100 }
101
102 let mut simd_buffers = Vec::with_capacity(pool_size);
104 let mut available_simd = Vec::with_capacity(pool_size);
105 for i in 0..pool_size {
106 simd_buffers.push(VecSimd::<f64x4>::with(0.0, config.simd_batch_size / 4));
107 available_simd.push(i);
108 }
109
110 Self {
111 serialization_buffers,
112 compression_buffers,
113 simd_buffers,
114 available_serialization,
115 available_compression,
116 available_simd,
117 }
118 }
119
120 fn get_serialization_buffer(&mut self) -> Option<usize> {
121 self.available_serialization.pop()
122 }
123
124 fn return_serialization_buffer(&mut self, index: usize) {
125 self.serialization_buffers[index].clear();
127 self.available_serialization.push(index);
128 }
129
130 fn get_compression_buffer(&mut self) -> Option<usize> {
131 self.available_compression.pop()
132 }
133
134 fn return_compression_buffer(&mut self, index: usize) {
135 self.compression_buffers[index].clear();
137 self.available_compression.push(index);
138 }
139
140 fn get_simd_buffer(&mut self) -> Option<usize> {
141 self.available_simd.pop()
142 }
143
144 fn return_simd_buffer(&mut self, index: usize) {
145 for i in 0..self.simd_buffers[index].len() {
147 self.simd_buffers[index][i] = f64x4::splat(0.0);
148 }
149 self.available_simd.push(index);
150 }
151}
152
153#[derive(Debug)]
155struct ZerocopyWorkerContext {
156 serialization_buffer: Vec<u8>,
157 compression_buffer: Vec<u8>,
158 simd_prices: VecSimd<f64x4>,
159 simd_quantities: VecSimd<f64x4>,
160 processing_timestamps: Vec<u64>,
161}
162
163impl ZerocopyWorkerContext {
164 fn new(config: &ZerocopyPipelineConfig) -> Self {
165 Self {
166 serialization_buffer: Vec::with_capacity(config.serialization_buffer_size),
167 compression_buffer: Vec::with_capacity(config.worker_buffer_size),
168 simd_prices: VecSimd::<f64x4>::with(0.0, config.simd_batch_size / 4),
169 simd_quantities: VecSimd::<f64x4>::with(0.0, config.simd_batch_size / 4),
170 processing_timestamps: Vec::with_capacity(config.simd_batch_size),
171 }
172 }
173
174 fn reset(&mut self) {
175 self.serialization_buffer.clear();
176 self.compression_buffer.clear();
177 self.processing_timestamps.clear();
178
179 for i in 0..self.simd_prices.len() {
181 self.simd_prices[i] = f64x4::splat(0.0);
182 self.simd_quantities[i] = f64x4::splat(0.0);
183 }
184 }
185}
186
187#[derive(Debug)]
189pub struct ZerocopyDataPipeline {
190 config: ZerocopyPipelineConfig,
191 storage_manager: Arc<StorageManager>,
192 buffer_pool: Arc<RwLock<ZerocopyBufferPool>>,
193 worker_handles: Vec<JoinHandle<()>>,
194 stats: Arc<RwLock<ZerocopyPipelineStats>>,
195 shutdown_sender: Option<Sender<()>>,
196}
197
198impl ZerocopyDataPipeline {
199 #[must_use]
201 pub fn new(config: ZerocopyPipelineConfig, storage_manager: Arc<StorageManager>) -> Self {
202 let buffer_pool = Arc::new(RwLock::new(ZerocopyBufferPool::new(&config)));
203
204 Self {
205 config,
206 storage_manager,
207 buffer_pool,
208 worker_handles: Vec::new(),
209 stats: Arc::new(RwLock::new(ZerocopyPipelineStats::default())),
210 shutdown_sender: None,
211 }
212 }
213
214 pub async fn start(&mut self, event_receiver: Receiver<MarketDataEvent>) -> Result<()> {
216 log::info!(
217 "Starting zero-copy data pipeline with {} workers",
218 self.config.worker_count
219 );
220
221 let (shutdown_sender, shutdown_receiver) = flume::unbounded();
222 self.shutdown_sender = Some(shutdown_sender);
223
224 for worker_id in 0..self.config.worker_count {
226 let worker_handle = self
227 .spawn_zerocopy_worker(worker_id, event_receiver.clone(), shutdown_receiver.clone())
228 .await;
229 self.worker_handles.push(worker_handle);
230 }
231
232 log::info!("Zero-copy data pipeline started successfully");
233 Ok(())
234 }
235
236 pub async fn stop(&mut self) -> Result<()> {
238 log::info!("Stopping zero-copy data pipeline");
239
240 if let Some(shutdown_sender) = self.shutdown_sender.take() {
242 let _ = shutdown_sender.send(());
243 }
244
245 for handle in self.worker_handles.drain(..) {
247 handle.abort();
248 let _ = handle.await;
249 }
250
251 log::info!("Zero-copy data pipeline stopped");
252 Ok(())
253 }
254
255 pub fn get_stats(&self) -> ZerocopyPipelineStats {
257 self.stats.read().clone()
258 }
259
260 pub fn reset_stats(&self) {
262 *self.stats.write() = ZerocopyPipelineStats::default();
263 }
264
265 async fn spawn_zerocopy_worker(
267 &self,
268 worker_id: usize,
269 event_receiver: Receiver<MarketDataEvent>,
270 shutdown_receiver: Receiver<()>,
271 ) -> JoinHandle<()> {
272 let storage_manager = self.storage_manager.clone();
273 let buffer_pool = self.buffer_pool.clone();
274 let stats = self.stats.clone();
275 let config = self.config.clone();
276
277 tokio::spawn(async move {
278 log::info!("Starting zero-copy pipeline worker {worker_id}");
279
280 let mut worker_context = ZerocopyWorkerContext::new(&config);
281 let mut event_batch = Vec::with_capacity(config.simd_batch_size);
282
283 loop {
284 tokio::select! {
285 _ = shutdown_receiver.recv_async() => {
287 log::info!("Zero-copy worker {worker_id} received shutdown signal");
288 break;
289 }
290
291 event_result = event_receiver.recv_async() => {
293 match event_result {
294 Ok(event) => {
295 event_batch.push(event);
296
297 if event_batch.len() >= config.simd_batch_size {
299 Self::process_zerocopy_batch(
300 &event_batch,
301 &mut worker_context,
302 &storage_manager,
303 &buffer_pool,
304 &stats,
305 &config,
306 ).await;
307 event_batch.clear();
308 }
309 }
310 Err(_) => {
311 log::info!("Zero-copy worker {worker_id} event channel closed");
312 break;
313 }
314 }
315 }
316 }
317 }
318
319 if !event_batch.is_empty() {
321 Self::process_zerocopy_batch(
322 &event_batch,
323 &mut worker_context,
324 &storage_manager,
325 &buffer_pool,
326 &stats,
327 &config,
328 )
329 .await;
330 }
331
332 log::info!("Zero-copy pipeline worker {worker_id} stopped");
333 })
334 }
335
336 async fn process_zerocopy_batch(
338 batch: &[MarketDataEvent],
339 worker_context: &mut ZerocopyWorkerContext,
340 storage_manager: &StorageManager,
341 buffer_pool: &Arc<RwLock<ZerocopyBufferPool>>,
342 stats: &Arc<RwLock<ZerocopyPipelineStats>>,
343 config: &ZerocopyPipelineConfig,
344 ) {
345 let start_time = QuantaInstant::now();
346 worker_context.reset();
347
348 let mut trades = Vec::new();
350 let mut orderbooks = Vec::new();
351
352 for event in batch {
353 match event {
354 MarketDataEvent::Trade(trade) => trades.push(trade),
355 MarketDataEvent::OrderBook(orderbook) => orderbooks.push(orderbook),
356 _ => {} }
358 }
359
360 let mut total_bytes_serialized = 0u64;
361 let mut zero_copy_operations = 0u64;
362
363 if !trades.is_empty()
365 && let Ok(bytes) = Self::process_trade_batch_simd(
366 &trades,
367 worker_context,
368 storage_manager,
369 buffer_pool,
370 )
371 .await
372 {
373 total_bytes_serialized += bytes;
374 zero_copy_operations += trades.len() as u64;
375 }
376
377 if !orderbooks.is_empty()
379 && let Ok(bytes) = Self::process_orderbook_batch_simd(
380 &orderbooks,
381 worker_context,
382 storage_manager,
383 buffer_pool,
384 )
385 .await
386 {
387 total_bytes_serialized += bytes;
388 zero_copy_operations += orderbooks.len() as u64;
389 }
390
391 let processing_time_nanos = start_time.elapsed().as_nanos() as u64;
393 {
394 let mut stats_guard = stats.write();
395 stats_guard.events_processed += batch.len() as u64;
396 stats_guard.trades_processed += trades.len() as u64;
397 stats_guard.orderbooks_processed += orderbooks.len() as u64;
398 stats_guard.bytes_serialized += total_bytes_serialized;
399 stats_guard.avg_latency_nanos =
400 (stats_guard.avg_latency_nanos * 9 + processing_time_nanos) / 10;
401 stats_guard.simd_batches_processed += 1;
402 stats_guard.zero_copy_operations += zero_copy_operations;
403 stats_guard.buffer_reuse_count += 1;
404 }
405
406 log::debug!(
407 "Processed zero-copy batch of {} events in {}ns",
408 batch.len(),
409 processing_time_nanos
410 );
411 }
412
413 async fn process_trade_batch_simd(
415 trades: &[&TradeRecord],
416 worker_context: &mut ZerocopyWorkerContext,
417 storage_manager: &StorageManager,
418 _buffer_pool: &Arc<RwLock<ZerocopyBufferPool>>,
419 ) -> Result<u64> {
420 if trades.is_empty() {
421 return Ok(0);
422 }
423
424 let mut total_bytes = 0u64;
425
426 let simd_chunks = trades.len() / 4;
428 let prices_flat = worker_context.simd_prices.flat_mut();
429 let quantities_flat = worker_context.simd_quantities.flat_mut();
430
431 for (i, trade) in trades.iter().enumerate().take(simd_chunks * 4) {
432 prices_flat[i] = trade.price.to_f64().unwrap_or_else(|| {
433 #[cfg(debug_assertions)]
434 eprintln!(
435 "Warning: Trade price conversion failed at index {}: {}",
436 i, trade.price
437 );
438 f64::NAN
439 });
440 quantities_flat[i] = trade.quantity.to_f64().unwrap_or_else(|| {
441 #[cfg(debug_assertions)]
442 eprintln!(
443 "Warning: Trade quantity conversion failed at index {}: {}",
444 i, trade.quantity
445 );
446 f64::NAN
447 });
448 worker_context
449 .processing_timestamps
450 .push(trade.timestamp_exchange);
451 }
452
453 if simd_chunks > 0 {
455 for chunk in 0..simd_chunks {
456 let price_simd = worker_context.simd_prices[chunk];
457 let quantity_simd = worker_context.simd_quantities[chunk];
458
459 let zero_vec = f64x4::splat(0.0);
461 let price_valid = price_simd.cmp_gt(zero_vec);
462 let quantity_valid = quantity_simd.cmp_gt(zero_vec);
463 let all_valid = price_valid & quantity_valid;
464
465 if all_valid.all() {
467 for i in 0..4 {
469 let trade_index = chunk * 4 + i;
470 if trade_index < trades.len() {
471 worker_context.serialization_buffer.clear();
473 if let Ok(serialized) =
474 TradeSerializer::serialize_trade(trades[trade_index])
475 && (storage_manager.write_trade(trades[trade_index]).await).is_ok()
476 {
477 total_bytes += serialized.len() as u64;
478 }
479 }
480 }
481 }
482 }
483 }
484
485 for trade in trades.iter().skip(simd_chunks * 4) {
487 worker_context.serialization_buffer.clear();
488 if let Ok(serialized) = TradeSerializer::serialize_trade(trade)
489 && (storage_manager.write_trade(trade).await).is_ok()
490 {
491 total_bytes += serialized.len() as u64;
492 }
493 }
494
495 Ok(total_bytes)
496 }
497
498 async fn process_orderbook_batch_simd(
500 orderbooks: &[&OrderBookRecord],
501 worker_context: &mut ZerocopyWorkerContext,
502 storage_manager: &StorageManager,
503 _buffer_pool: &Arc<RwLock<ZerocopyBufferPool>>,
504 ) -> Result<u64> {
505 if orderbooks.is_empty() {
506 return Ok(0);
507 }
508
509 let mut total_bytes = 0u64;
510
511 for orderbook in orderbooks {
513 worker_context.serialization_buffer.clear();
514 if let Ok(serialized) = OrderBookSerializer::serialize_orderbook(orderbook)
515 && (storage_manager.write_orderbook(orderbook).await).is_ok()
516 {
517 total_bytes += serialized.len() as u64;
518 }
519 }
520
521 Ok(total_bytes)
522 }
523}
524
525impl Drop for ZerocopyDataPipeline {
526 fn drop(&mut self) {
527 if let Some(shutdown_sender) = self.shutdown_sender.take() {
529 let _ = shutdown_sender.try_send(());
530 }
531
532 for handle in &self.worker_handles {
534 handle.abort();
535 }
536 }
537}
538
539#[cfg(test)]
540mod tests {
541 use super::*;
542 use crate::monitor::config::storage::StorageConfig;
543
544 use tempfile::TempDir;
545
546 #[tokio::test]
547 async fn test_zerocopy_pipeline_creation() {
548 let temp_dir = TempDir::new().unwrap();
549 let storage_config = StorageConfig {
550 market_data_path: temp_dir.path().join("market_data"),
551 ..Default::default()
552 };
553 let storage_manager = Arc::new(StorageManager::new(storage_config).await.unwrap());
554
555 let config = ZerocopyPipelineConfig::default();
556 let pipeline = ZerocopyDataPipeline::new(config, storage_manager);
557
558 assert_eq!(pipeline.worker_handles.len(), 0);
559 assert_eq!(pipeline.get_stats().events_processed, 0);
560 }
561
562 #[tokio::test]
563 async fn test_zerocopy_buffer_pool() {
564 let config = ZerocopyPipelineConfig::default();
565 let mut pool = ZerocopyBufferPool::new(&config);
566
567 let buffer_index = pool.get_serialization_buffer().unwrap();
569 assert_eq!(
570 pool.available_serialization.len(),
571 config.buffer_pool_size - 1
572 );
573
574 pool.return_serialization_buffer(buffer_index);
575 assert_eq!(pool.available_serialization.len(), config.buffer_pool_size);
576
577 let simd_index = pool.get_simd_buffer().unwrap();
579 assert_eq!(pool.available_simd.len(), config.buffer_pool_size - 1);
580
581 pool.return_simd_buffer(simd_index);
582 assert_eq!(pool.available_simd.len(), config.buffer_pool_size);
583 }
584
585 #[test]
586 fn test_worker_context_reset() {
587 let config = ZerocopyPipelineConfig::default();
588 let mut context = ZerocopyWorkerContext::new(&config);
589
590 context.serialization_buffer.extend_from_slice(b"test");
592 context.processing_timestamps.push(123456789);
593
594 let original_capacity = context.serialization_buffer.capacity();
596 context.reset();
597
598 assert_eq!(context.serialization_buffer.len(), 0);
599 assert_eq!(context.serialization_buffer.capacity(), original_capacity);
600 assert_eq!(context.processing_timestamps.len(), 0);
601 }
602}