rusty_bin/monitor/
zerocopy_pipeline.rs

1//! Zero-copy optimized data pipeline for high-frequency monitoring
2//!
3//! This module provides zerocopy optimizations for the monitoring system to eliminate
4//! allocations in critical paths and achieve sub-microsecond latency.
5
6use crate::monitor::collector::{MarketDataEvent, Result};
7use crate::monitor::schema::{OrderBookRecord, OrderBookSerializer, TradeRecord, TradeSerializer};
8use crate::monitor::storage::{CompressionMode, manager::StorageManager};
9use flume::{Receiver, Sender};
10use parking_lot::RwLock;
11use quanta::Instant as QuantaInstant;
12use rust_decimal::prelude::ToPrimitive;
13use simd_aligned::VecSimd;
14use std::sync::Arc;
15use tokio::task::JoinHandle;
16use wide::{CmpGt, f64x4};
17
18/// Zero-copy optimized pipeline configuration
19#[derive(Debug, Clone)]
20pub struct ZerocopyPipelineConfig {
21    /// Pre-allocated buffer pool size
22    pub buffer_pool_size: usize,
23    /// Buffer size for each worker
24    pub worker_buffer_size: usize,
25    /// SIMD-aligned batch size for processing
26    pub simd_batch_size: usize,
27    /// Number of worker tasks for processing events
28    pub worker_count: usize,
29    /// Compression mode for storage
30    pub compression_mode: CompressionMode,
31    /// Pre-allocated serialization buffer size
32    pub serialization_buffer_size: usize,
33}
34
35impl Default for ZerocopyPipelineConfig {
36    fn default() -> Self {
37        Self {
38            buffer_pool_size: 64,
39            worker_buffer_size: 64 * 1024, // 64KB per worker
40            simd_batch_size: 32,           // Process 32 events at once using SIMD
41            worker_count: 4,
42            compression_mode: CompressionMode::Realtime,
43            serialization_buffer_size: 128 * 1024, // 128KB serialization buffer
44        }
45    }
46}
47
48/// Zero-copy optimized pipeline statistics
49#[derive(Debug, Clone, Default)]
50pub struct ZerocopyPipelineStats {
51    /// Total number of market data events processed through the pipeline
52    pub events_processed: u64,
53    /// Total number of trade events processed
54    pub trades_processed: u64,
55    /// Total number of orderbook updates processed
56    pub orderbooks_processed: u64,
57    /// Total bytes serialized by the pipeline
58    pub bytes_serialized: u64,
59    /// Average compression ratio achieved (compressed_size / original_size)
60    pub compression_ratio: f64,
61    /// Average processing latency in nanoseconds
62    pub avg_latency_nanos: u64,
63    /// Number of SIMD-optimized batch operations performed
64    pub simd_batches_processed: u64,
65    /// Number of times buffers were reused from the pool
66    pub buffer_reuse_count: u64,
67    /// Total number of zero-copy operations performed
68    pub zero_copy_operations: u64,
69}
70
71/// Pre-allocated buffer pool for zero-copy operations
72#[derive(Debug)]
73struct ZerocopyBufferPool {
74    serialization_buffers: Vec<Vec<u8>>,
75    compression_buffers: Vec<Vec<u8>>,
76    simd_buffers: Vec<VecSimd<f64x4>>,
77    available_serialization: Vec<usize>,
78    available_compression: Vec<usize>,
79    available_simd: Vec<usize>,
80}
81
82impl ZerocopyBufferPool {
83    fn new(config: &ZerocopyPipelineConfig) -> Self {
84        let pool_size = config.buffer_pool_size;
85
86        // Pre-allocate serialization buffers
87        let mut serialization_buffers = Vec::with_capacity(pool_size);
88        let mut available_serialization = Vec::with_capacity(pool_size);
89        for i in 0..pool_size {
90            serialization_buffers.push(Vec::with_capacity(config.serialization_buffer_size));
91            available_serialization.push(i);
92        }
93
94        // Pre-allocate compression buffers
95        let mut compression_buffers = Vec::with_capacity(pool_size);
96        let mut available_compression = Vec::with_capacity(pool_size);
97        for i in 0..pool_size {
98            compression_buffers.push(Vec::with_capacity(config.worker_buffer_size));
99            available_compression.push(i);
100        }
101
102        // Pre-allocate SIMD-aligned buffers
103        let mut simd_buffers = Vec::with_capacity(pool_size);
104        let mut available_simd = Vec::with_capacity(pool_size);
105        for i in 0..pool_size {
106            simd_buffers.push(VecSimd::<f64x4>::with(0.0, config.simd_batch_size / 4));
107            available_simd.push(i);
108        }
109
110        Self {
111            serialization_buffers,
112            compression_buffers,
113            simd_buffers,
114            available_serialization,
115            available_compression,
116            available_simd,
117        }
118    }
119
120    fn get_serialization_buffer(&mut self) -> Option<usize> {
121        self.available_serialization.pop()
122    }
123
124    fn return_serialization_buffer(&mut self, index: usize) {
125        // Clear buffer but keep capacity
126        self.serialization_buffers[index].clear();
127        self.available_serialization.push(index);
128    }
129
130    fn get_compression_buffer(&mut self) -> Option<usize> {
131        self.available_compression.pop()
132    }
133
134    fn return_compression_buffer(&mut self, index: usize) {
135        // Clear buffer but keep capacity
136        self.compression_buffers[index].clear();
137        self.available_compression.push(index);
138    }
139
140    fn get_simd_buffer(&mut self) -> Option<usize> {
141        self.available_simd.pop()
142    }
143
144    fn return_simd_buffer(&mut self, index: usize) {
145        // Reset SIMD buffer
146        for i in 0..self.simd_buffers[index].len() {
147            self.simd_buffers[index][i] = f64x4::splat(0.0);
148        }
149        self.available_simd.push(index);
150    }
151}
152
153/// Worker context with pre-allocated buffers
154#[derive(Debug)]
155struct ZerocopyWorkerContext {
156    serialization_buffer: Vec<u8>,
157    compression_buffer: Vec<u8>,
158    simd_prices: VecSimd<f64x4>,
159    simd_quantities: VecSimd<f64x4>,
160    processing_timestamps: Vec<u64>,
161}
162
163impl ZerocopyWorkerContext {
164    fn new(config: &ZerocopyPipelineConfig) -> Self {
165        Self {
166            serialization_buffer: Vec::with_capacity(config.serialization_buffer_size),
167            compression_buffer: Vec::with_capacity(config.worker_buffer_size),
168            simd_prices: VecSimd::<f64x4>::with(0.0, config.simd_batch_size / 4),
169            simd_quantities: VecSimd::<f64x4>::with(0.0, config.simd_batch_size / 4),
170            processing_timestamps: Vec::with_capacity(config.simd_batch_size),
171        }
172    }
173
174    fn reset(&mut self) {
175        self.serialization_buffer.clear();
176        self.compression_buffer.clear();
177        self.processing_timestamps.clear();
178
179        // Reset SIMD buffers to zero
180        for i in 0..self.simd_prices.len() {
181            self.simd_prices[i] = f64x4::splat(0.0);
182            self.simd_quantities[i] = f64x4::splat(0.0);
183        }
184    }
185}
186
187/// Zero-copy optimized data pipeline
188#[derive(Debug)]
189pub struct ZerocopyDataPipeline {
190    config: ZerocopyPipelineConfig,
191    storage_manager: Arc<StorageManager>,
192    buffer_pool: Arc<RwLock<ZerocopyBufferPool>>,
193    worker_handles: Vec<JoinHandle<()>>,
194    stats: Arc<RwLock<ZerocopyPipelineStats>>,
195    shutdown_sender: Option<Sender<()>>,
196}
197
198impl ZerocopyDataPipeline {
199    /// Create a new zero-copy optimized data pipeline
200    #[must_use]
201    pub fn new(config: ZerocopyPipelineConfig, storage_manager: Arc<StorageManager>) -> Self {
202        let buffer_pool = Arc::new(RwLock::new(ZerocopyBufferPool::new(&config)));
203
204        Self {
205            config,
206            storage_manager,
207            buffer_pool,
208            worker_handles: Vec::new(),
209            stats: Arc::new(RwLock::new(ZerocopyPipelineStats::default())),
210            shutdown_sender: None,
211        }
212    }
213
214    /// Start the zero-copy pipeline processing
215    pub async fn start(&mut self, event_receiver: Receiver<MarketDataEvent>) -> Result<()> {
216        log::info!(
217            "Starting zero-copy data pipeline with {} workers",
218            self.config.worker_count
219        );
220
221        let (shutdown_sender, shutdown_receiver) = flume::unbounded();
222        self.shutdown_sender = Some(shutdown_sender);
223
224        // Start worker tasks with pre-allocated contexts
225        for worker_id in 0..self.config.worker_count {
226            let worker_handle = self
227                .spawn_zerocopy_worker(worker_id, event_receiver.clone(), shutdown_receiver.clone())
228                .await;
229            self.worker_handles.push(worker_handle);
230        }
231
232        log::info!("Zero-copy data pipeline started successfully");
233        Ok(())
234    }
235
236    /// Stop the zero-copy pipeline processing
237    pub async fn stop(&mut self) -> Result<()> {
238        log::info!("Stopping zero-copy data pipeline");
239
240        // Send shutdown signal
241        if let Some(shutdown_sender) = self.shutdown_sender.take() {
242            let _ = shutdown_sender.send(());
243        }
244
245        // Wait for all workers to finish
246        for handle in self.worker_handles.drain(..) {
247            handle.abort();
248            let _ = handle.await;
249        }
250
251        log::info!("Zero-copy data pipeline stopped");
252        Ok(())
253    }
254
255    /// Get current zero-copy pipeline statistics
256    pub fn get_stats(&self) -> ZerocopyPipelineStats {
257        self.stats.read().clone()
258    }
259
260    /// Reset pipeline statistics
261    pub fn reset_stats(&self) {
262        *self.stats.write() = ZerocopyPipelineStats::default();
263    }
264
265    /// Spawn a zero-copy worker task
266    async fn spawn_zerocopy_worker(
267        &self,
268        worker_id: usize,
269        event_receiver: Receiver<MarketDataEvent>,
270        shutdown_receiver: Receiver<()>,
271    ) -> JoinHandle<()> {
272        let storage_manager = self.storage_manager.clone();
273        let buffer_pool = self.buffer_pool.clone();
274        let stats = self.stats.clone();
275        let config = self.config.clone();
276
277        tokio::spawn(async move {
278            log::info!("Starting zero-copy pipeline worker {worker_id}");
279
280            let mut worker_context = ZerocopyWorkerContext::new(&config);
281            let mut event_batch = Vec::with_capacity(config.simd_batch_size);
282
283            loop {
284                tokio::select! {
285                    // Check for shutdown signal
286                    _ = shutdown_receiver.recv_async() => {
287                        log::info!("Zero-copy worker {worker_id} received shutdown signal");
288                        break;
289                    }
290
291                    // Receive events and process in SIMD batches
292                    event_result = event_receiver.recv_async() => {
293                        match event_result {
294                            Ok(event) => {
295                                event_batch.push(event);
296
297                                // Process batch when it reaches SIMD size
298                                if event_batch.len() >= config.simd_batch_size {
299                                    Self::process_zerocopy_batch(
300                                        &event_batch,
301                                        &mut worker_context,
302                                        &storage_manager,
303                                        &buffer_pool,
304                                        &stats,
305                                        &config,
306                                    ).await;
307                                    event_batch.clear();
308                                }
309                            }
310                            Err(_) => {
311                                log::info!("Zero-copy worker {worker_id} event channel closed");
312                                break;
313                            }
314                        }
315                    }
316                }
317            }
318
319            // Process remaining events in batch
320            if !event_batch.is_empty() {
321                Self::process_zerocopy_batch(
322                    &event_batch,
323                    &mut worker_context,
324                    &storage_manager,
325                    &buffer_pool,
326                    &stats,
327                    &config,
328                )
329                .await;
330            }
331
332            log::info!("Zero-copy pipeline worker {worker_id} stopped");
333        })
334    }
335
336    /// Process a batch of events using zero-copy patterns
337    async fn process_zerocopy_batch(
338        batch: &[MarketDataEvent],
339        worker_context: &mut ZerocopyWorkerContext,
340        storage_manager: &StorageManager,
341        buffer_pool: &Arc<RwLock<ZerocopyBufferPool>>,
342        stats: &Arc<RwLock<ZerocopyPipelineStats>>,
343        config: &ZerocopyPipelineConfig,
344    ) {
345        let start_time = QuantaInstant::now();
346        worker_context.reset();
347
348        // Separate trades and orderbooks for batch processing
349        let mut trades = Vec::new();
350        let mut orderbooks = Vec::new();
351
352        for event in batch {
353            match event {
354                MarketDataEvent::Trade(trade) => trades.push(trade),
355                MarketDataEvent::OrderBook(orderbook) => orderbooks.push(orderbook),
356                _ => {} // Skip status/error events
357            }
358        }
359
360        let mut total_bytes_serialized = 0u64;
361        let mut zero_copy_operations = 0u64;
362
363        // Process trades using SIMD batching
364        if !trades.is_empty()
365            && let Ok(bytes) = Self::process_trade_batch_simd(
366                &trades,
367                worker_context,
368                storage_manager,
369                buffer_pool,
370            )
371            .await
372        {
373            total_bytes_serialized += bytes;
374            zero_copy_operations += trades.len() as u64;
375        }
376
377        // Process orderbooks using SIMD batching
378        if !orderbooks.is_empty()
379            && let Ok(bytes) = Self::process_orderbook_batch_simd(
380                &orderbooks,
381                worker_context,
382                storage_manager,
383                buffer_pool,
384            )
385            .await
386        {
387            total_bytes_serialized += bytes;
388            zero_copy_operations += orderbooks.len() as u64;
389        }
390
391        // Update statistics
392        let processing_time_nanos = start_time.elapsed().as_nanos() as u64;
393        {
394            let mut stats_guard = stats.write();
395            stats_guard.events_processed += batch.len() as u64;
396            stats_guard.trades_processed += trades.len() as u64;
397            stats_guard.orderbooks_processed += orderbooks.len() as u64;
398            stats_guard.bytes_serialized += total_bytes_serialized;
399            stats_guard.avg_latency_nanos =
400                (stats_guard.avg_latency_nanos * 9 + processing_time_nanos) / 10;
401            stats_guard.simd_batches_processed += 1;
402            stats_guard.zero_copy_operations += zero_copy_operations;
403            stats_guard.buffer_reuse_count += 1;
404        }
405
406        log::debug!(
407            "Processed zero-copy batch of {} events in {}ns",
408            batch.len(),
409            processing_time_nanos
410        );
411    }
412
413    /// Process trade batch using SIMD operations
414    async fn process_trade_batch_simd(
415        trades: &[&TradeRecord],
416        worker_context: &mut ZerocopyWorkerContext,
417        storage_manager: &StorageManager,
418        _buffer_pool: &Arc<RwLock<ZerocopyBufferPool>>,
419    ) -> Result<u64> {
420        if trades.is_empty() {
421            return Ok(0);
422        }
423
424        let mut total_bytes = 0u64;
425
426        // Fill SIMD buffers with trade data for vectorized processing
427        let simd_chunks = trades.len() / 4;
428        let prices_flat = worker_context.simd_prices.flat_mut();
429        let quantities_flat = worker_context.simd_quantities.flat_mut();
430
431        for (i, trade) in trades.iter().enumerate().take(simd_chunks * 4) {
432            prices_flat[i] = trade.price.to_f64().unwrap_or_else(|| {
433                #[cfg(debug_assertions)]
434                eprintln!(
435                    "Warning: Trade price conversion failed at index {}: {}",
436                    i, trade.price
437                );
438                f64::NAN
439            });
440            quantities_flat[i] = trade.quantity.to_f64().unwrap_or_else(|| {
441                #[cfg(debug_assertions)]
442                eprintln!(
443                    "Warning: Trade quantity conversion failed at index {}: {}",
444                    i, trade.quantity
445                );
446                f64::NAN
447            });
448            worker_context
449                .processing_timestamps
450                .push(trade.timestamp_exchange);
451        }
452
453        // Use SIMD operations for validation and processing
454        if simd_chunks > 0 {
455            for chunk in 0..simd_chunks {
456                let price_simd = worker_context.simd_prices[chunk];
457                let quantity_simd = worker_context.simd_quantities[chunk];
458
459                // SIMD validation: check for valid prices and quantities
460                let zero_vec = f64x4::splat(0.0);
461                let price_valid = price_simd.cmp_gt(zero_vec);
462                let quantity_valid = quantity_simd.cmp_gt(zero_vec);
463                let all_valid = price_valid & quantity_valid;
464
465                // Process valid trades in the SIMD chunk
466                if all_valid.all() {
467                    // All trades in this chunk are valid, process them
468                    for i in 0..4 {
469                        let trade_index = chunk * 4 + i;
470                        if trade_index < trades.len() {
471                            // Serialize using zero-copy patterns
472                            worker_context.serialization_buffer.clear();
473                            if let Ok(serialized) =
474                                TradeSerializer::serialize_trade(trades[trade_index])
475                                && (storage_manager.write_trade(trades[trade_index]).await).is_ok()
476                            {
477                                total_bytes += serialized.len() as u64;
478                            }
479                        }
480                    }
481                }
482            }
483        }
484
485        // Process remaining trades that don't fit into SIMD chunks
486        for trade in trades.iter().skip(simd_chunks * 4) {
487            worker_context.serialization_buffer.clear();
488            if let Ok(serialized) = TradeSerializer::serialize_trade(trade)
489                && (storage_manager.write_trade(trade).await).is_ok()
490            {
491                total_bytes += serialized.len() as u64;
492            }
493        }
494
495        Ok(total_bytes)
496    }
497
498    /// Process orderbook batch using SIMD operations
499    async fn process_orderbook_batch_simd(
500        orderbooks: &[&OrderBookRecord],
501        worker_context: &mut ZerocopyWorkerContext,
502        storage_manager: &StorageManager,
503        _buffer_pool: &Arc<RwLock<ZerocopyBufferPool>>,
504    ) -> Result<u64> {
505        if orderbooks.is_empty() {
506            return Ok(0);
507        }
508
509        let mut total_bytes = 0u64;
510
511        // Process orderbooks with efficient serialization
512        for orderbook in orderbooks {
513            worker_context.serialization_buffer.clear();
514            if let Ok(serialized) = OrderBookSerializer::serialize_orderbook(orderbook)
515                && (storage_manager.write_orderbook(orderbook).await).is_ok()
516            {
517                total_bytes += serialized.len() as u64;
518            }
519        }
520
521        Ok(total_bytes)
522    }
523}
524
525impl Drop for ZerocopyDataPipeline {
526    fn drop(&mut self) {
527        // Send shutdown signal if still available
528        if let Some(shutdown_sender) = self.shutdown_sender.take() {
529            let _ = shutdown_sender.try_send(());
530        }
531
532        // Abort all worker handles
533        for handle in &self.worker_handles {
534            handle.abort();
535        }
536    }
537}
538
539#[cfg(test)]
540mod tests {
541    use super::*;
542    use crate::monitor::config::storage::StorageConfig;
543
544    use tempfile::TempDir;
545
546    #[tokio::test]
547    async fn test_zerocopy_pipeline_creation() {
548        let temp_dir = TempDir::new().unwrap();
549        let storage_config = StorageConfig {
550            market_data_path: temp_dir.path().join("market_data"),
551            ..Default::default()
552        };
553        let storage_manager = Arc::new(StorageManager::new(storage_config).await.unwrap());
554
555        let config = ZerocopyPipelineConfig::default();
556        let pipeline = ZerocopyDataPipeline::new(config, storage_manager);
557
558        assert_eq!(pipeline.worker_handles.len(), 0);
559        assert_eq!(pipeline.get_stats().events_processed, 0);
560    }
561
562    #[tokio::test]
563    async fn test_zerocopy_buffer_pool() {
564        let config = ZerocopyPipelineConfig::default();
565        let mut pool = ZerocopyBufferPool::new(&config);
566
567        // Test serialization buffer allocation and return
568        let buffer_index = pool.get_serialization_buffer().unwrap();
569        assert_eq!(
570            pool.available_serialization.len(),
571            config.buffer_pool_size - 1
572        );
573
574        pool.return_serialization_buffer(buffer_index);
575        assert_eq!(pool.available_serialization.len(), config.buffer_pool_size);
576
577        // Test SIMD buffer allocation
578        let simd_index = pool.get_simd_buffer().unwrap();
579        assert_eq!(pool.available_simd.len(), config.buffer_pool_size - 1);
580
581        pool.return_simd_buffer(simd_index);
582        assert_eq!(pool.available_simd.len(), config.buffer_pool_size);
583    }
584
585    #[test]
586    fn test_worker_context_reset() {
587        let config = ZerocopyPipelineConfig::default();
588        let mut context = ZerocopyWorkerContext::new(&config);
589
590        // Add some data
591        context.serialization_buffer.extend_from_slice(b"test");
592        context.processing_timestamps.push(123456789);
593
594        // Reset should clear but maintain capacity
595        let original_capacity = context.serialization_buffer.capacity();
596        context.reset();
597
598        assert_eq!(context.serialization_buffer.len(), 0);
599        assert_eq!(context.serialization_buffer.capacity(), original_capacity);
600        assert_eq!(context.processing_timestamps.len(), 0);
601    }
602}