rusty_bin/monitor/collector/
pipeline.rs

1//! Data pipeline for processing market data events and sending them to storage
2//!
3//! This module handles the flow of market data from the collection system
4//! to the storage system with FlatBuffers serialization and compression.
5//!
6//! Why don't we archittect it like original pipeline?
7//! Because we want to more performant. not for scalability.
8
9use crate::monitor::collector::{CollectionError, MarketDataEvent, Result};
10use crate::monitor::schema::{OrderBookRecord, TradeRecord};
11use crate::monitor::storage::{CompressionMode, manager::StorageManager, roller::FileRoller};
12use flume::{Receiver, Sender};
13use parking_lot::RwLock;
14use quanta::Instant as QuantaInstant;
15use rusty_common::collections::FxHashMap;
16use smartstring::alias::String as SmartString;
17use std::sync::Arc;
18use tokio::task::JoinHandle;
19use tokio::time::{Duration, Instant};
20
21/// Configuration for the data pipeline
22#[derive(Debug, Clone)]
23pub struct PipelineConfig {
24    /// Buffer size for batching events before processing
25    pub batch_size: usize,
26    /// Maximum time to wait before flushing a batch
27    pub batch_timeout_ms: u64,
28    /// Number of worker tasks for processing events
29    pub worker_count: usize,
30    /// Compression mode for storage
31    pub compression_mode: CompressionMode,
32    /// File rotation check interval in seconds
33    pub rotation_check_interval_seconds: u64,
34}
35
36impl Default for PipelineConfig {
37    fn default() -> Self {
38        Self {
39            batch_size: 1000,
40            batch_timeout_ms: 100,
41            worker_count: 4,
42            compression_mode: CompressionMode::None,
43            rotation_check_interval_seconds: 300, // Check every 5 minutes
44        }
45    }
46}
47
48/// Statistics for pipeline processing
49#[derive(Debug, Clone, Default)]
50pub struct PipelineStats {
51    /// Total number of events processed
52    pub events_processed: u64,
53    /// Number of trades processed
54    pub trades_processed: u64,
55    /// Number of order books processed
56    pub orderbooks_processed: u64,
57    /// Number of errors encountered
58    pub errors_count: u64,
59    /// Total bytes written to storage
60    pub bytes_written: u64,
61    /// Average latency in nanoseconds
62    pub avg_latency_nanos: u64,
63    /// Timestamp of last processed event in nanoseconds
64    pub last_processed_time: Option<u64>,
65    /// Number of files rotated
66    pub files_rotated: u64,
67    /// Number of rotation errors
68    pub rotation_errors: u64,
69}
70
71/// Data pipeline that processes market data events and stores them
72#[derive(Debug)]
73pub struct DataPipeline {
74    /// Pipeline configuration
75    config: PipelineConfig,
76    /// Storage manager for persisting data
77    storage_manager: Arc<StorageManager>,
78    /// File roller for managing file rotation
79    file_roller: Option<FileRoller>,
80    /// Worker task handles
81    worker_handles: Vec<JoinHandle<()>>,
82    /// File rotation task handle
83    rotation_handle: Option<JoinHandle<()>>,
84    /// Pipeline statistics
85    stats: Arc<RwLock<PipelineStats>>,
86    /// Shutdown signal sender
87    shutdown_sender: Option<Sender<()>>,
88}
89
90impl DataPipeline {
91    /// Create a new data pipeline
92    #[must_use]
93    pub fn new(config: PipelineConfig, storage_manager: Arc<StorageManager>) -> Self {
94        // Create FileRoller with storage manager's config
95        let file_roller = FileRoller::new(
96            storage_manager.config.market_data_path.clone(),
97            storage_manager.config.max_file_size_mb,
98            storage_manager.config.max_records_per_file,
99            storage_manager.config.retention_days,
100        );
101
102        Self {
103            config,
104            storage_manager,
105            file_roller: Some(file_roller),
106            worker_handles: Vec::new(),
107            rotation_handle: None,
108            stats: Arc::new(RwLock::new(PipelineStats::default())),
109            shutdown_sender: None,
110        }
111    }
112
113    /// Start the pipeline processing
114    pub async fn start(&mut self, event_receiver: Receiver<MarketDataEvent>) -> Result<()> {
115        log::info!(
116            "Starting data pipeline with {} workers",
117            self.config.worker_count
118        );
119
120        let (shutdown_sender, shutdown_receiver) = flume::unbounded();
121        self.shutdown_sender = Some(shutdown_sender);
122
123        // Start rotation watcher
124        if let Err(e) = self.start_file_rotation_watcher(&shutdown_receiver) {
125            log::error!("Failed to start file rotation watcher: {e}");
126        }
127
128        // Start worker tasks
129        for worker_id in 0..self.config.worker_count {
130            let worker_handle = self
131                .spawn_worker(worker_id, event_receiver.clone(), shutdown_receiver.clone())
132                .await;
133            self.worker_handles.push(worker_handle);
134        }
135
136        log::info!("Data pipeline started successfully");
137        Ok(())
138    }
139
140    /// Stop the pipeline processing
141    pub async fn stop(&mut self) -> Result<()> {
142        log::info!("Stopping data pipeline");
143
144        // Send shutdown signal
145        if let Some(shutdown_sender) = self.shutdown_sender.take() {
146            let _ = shutdown_sender.send(());
147        }
148
149        // Wait for rotation task to finish
150        if let Some(handle) = self.rotation_handle.take() {
151            handle.abort();
152            let _ = handle.await;
153        }
154
155        // Wait for all workers to finish
156        for handle in self.worker_handles.drain(..) {
157            handle.abort();
158            let _ = handle.await;
159        }
160
161        log::info!("Data pipeline stopped");
162        Ok(())
163    }
164
165    /// Start file rotation watcher that runs in background
166    fn start_file_rotation_watcher(&mut self, shutdown_receiver: &Receiver<()>) -> Result<()> {
167        let file_roller = self
168            .file_roller
169            .take()
170            .ok_or(CollectionError::DataProcessing(
171                "File roller not found".to_string(),
172            ))?;
173
174        let stats = self.stats.clone();
175        let shutdown_receiver = shutdown_receiver.clone();
176        let check_interval = Duration::from_secs(self.config.rotation_check_interval_seconds);
177
178        let handle = tokio::spawn(async move {
179            log::info!(
180                "Starting file rotation watcher (check interval: {}s)",
181                check_interval.as_secs()
182            );
183
184            let mut interval = tokio::time::interval(check_interval);
185            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
186
187            loop {
188                tokio::select! {
189                    // Check for shutdown signal
190                    _ = shutdown_receiver.recv_async() => {
191                        log::info!("File rotation watcher received shutdown signal");
192                        break;
193                    }
194
195                    // Periodic rotation check
196                    _ = interval.tick() => {
197                        let rotated_files = match file_roller.execute_rotation_check().await {
198                            Ok(files) => files,
199                            Err(e) => {
200                                log::error!("File rotation check failed: {e}");
201
202                                // Update error stats
203                                let mut stats_guard = stats.write();
204                                stats_guard.rotation_errors += 1;
205                                continue;
206                            }
207                        };
208
209                        if !rotated_files.is_empty() {
210                            let count = rotated_files.len() as u64;
211                            log::info!("Rotated {count} files: {rotated_files:?}");
212
213                            // Update stats
214                            let mut stats_guard = stats.write();
215                            stats_guard.files_rotated += count;
216                        }
217
218                        // Run cleanup periodically (every 10th check)
219                        if stats.read().files_rotated.is_multiple_of(10) {
220                            match file_roller.cleanup_old_files().await {
221                                Ok(deleted_count) => {
222                                    if deleted_count > 0 {
223                                        log::info!("Cleaned up {deleted_count} old files");
224                                    }
225                                }
226                                Err(e) => {
227                                    log::error!("File cleanup failed: {e}");
228                                }
229                            }
230                        }
231
232                    }
233                }
234            }
235
236            log::info!("File rotation watcher stopped");
237        });
238
239        self.rotation_handle = Some(handle);
240
241        Ok(())
242    }
243
244    /// Get current pipeline statistics
245    pub fn get_stats(&self) -> PipelineStats {
246        self.stats.read().clone()
247    }
248
249    /// Reset pipeline statistics
250    pub fn reset_stats(&self) {
251        *self.stats.write() = PipelineStats::default();
252    }
253
254    /// Spawn a worker task to process events
255    async fn spawn_worker(
256        &self,
257        worker_id: usize,
258        event_receiver: Receiver<MarketDataEvent>,
259        shutdown_receiver: Receiver<()>,
260    ) -> JoinHandle<()> {
261        let storage_manager = self.storage_manager.clone();
262        let stats = self.stats.clone();
263        let config = self.config.clone();
264
265        tokio::spawn(async move {
266            log::info!("Starting pipeline worker {worker_id}");
267
268            let mut batch = Vec::with_capacity(config.batch_size);
269            let mut last_flush = Instant::now();
270            let batch_timeout = Duration::from_millis(config.batch_timeout_ms);
271
272            loop {
273                tokio::select! {
274                    // Check for shutdown signal
275                    _ = shutdown_receiver.recv_async() => {
276                        log::info!("Worker {worker_id} received shutdown signal");
277                        break;
278                    }
279
280                    // Check for timeout to flush batch
281                    _ = tokio::time::sleep_until(last_flush + batch_timeout) => {
282                        if !batch.is_empty() {
283                            Self::process_batch(&batch, &storage_manager, &stats, &config).await;
284                            batch.clear();
285                            last_flush = Instant::now();
286                        }
287                    }
288
289                    // Receive events
290                    event_result = event_receiver.recv_async() => {
291                        match event_result {
292                            Ok(event) => {
293                                batch.push(event);
294
295                                // Process batch if it's full
296                                if batch.len() >= config.batch_size {
297                                    Self::process_batch(&batch, &storage_manager, &stats, &config).await;
298                                    batch.clear();
299                                    last_flush = Instant::now();
300                                }
301                            }
302                            Err(_) => {
303                                log::info!("Worker {worker_id} event channel closed");
304                                break;
305                            }
306                        }
307                    }
308                }
309            }
310
311            // Process remaining events in batch
312            if !batch.is_empty() {
313                Self::process_batch(&batch, &storage_manager, &stats, &config).await;
314            }
315
316            log::info!("Pipeline worker {worker_id} stopped");
317        })
318    }
319
320    /// Process a batch of events
321    async fn process_batch(
322        batch: &[MarketDataEvent],
323        storage_manager: &StorageManager,
324        stats: &Arc<RwLock<PipelineStats>>,
325        config: &PipelineConfig,
326    ) {
327        let start_time = QuantaInstant::now();
328
329        // Group events by exchange and symbol for efficient processing
330        let mut trade_groups: FxHashMap<(SmartString, SmartString), Vec<&TradeRecord>> =
331            FxHashMap::default();
332        let mut orderbook_groups: FxHashMap<(SmartString, SmartString), Vec<&OrderBookRecord>> =
333            FxHashMap::default();
334
335        for event in batch {
336            match event {
337                MarketDataEvent::Trade(trade) => {
338                    let key = (trade.exchange.clone(), trade.symbol.clone());
339                    trade_groups.entry(key).or_default().push(trade);
340                }
341                MarketDataEvent::OrderBook(orderbook) => {
342                    let key = (orderbook.exchange.clone(), orderbook.symbol.clone());
343                    orderbook_groups.entry(key).or_default().push(orderbook);
344                }
345                MarketDataEvent::ConnectionStatus { .. } | MarketDataEvent::Error { .. } => {
346                    // Log status/error events but don't store them
347                    log::debug!("Received status/error event: {event:?}");
348                }
349            }
350        }
351
352        let mut total_bytes_written = 0u64;
353        let mut trades_processed = 0u64;
354        let mut orderbooks_processed = 0u64;
355        let mut errors_count = 0u64;
356
357        // Process trade groups
358        for ((exchange, symbol), trades) in trade_groups {
359            let trade_count = trades.len() as u64;
360            match Self::process_trade_batch(&exchange, &symbol, trades, storage_manager, config)
361                .await
362            {
363                Ok(bytes_written) => {
364                    total_bytes_written += bytes_written;
365                    trades_processed += trade_count;
366                }
367                Err(e) => {
368                    log::error!("Failed to process trade batch for {exchange}:{symbol}: {e}");
369                    errors_count += 1;
370                }
371            }
372        }
373
374        // Process orderbook groups
375        for ((exchange, symbol), orderbooks) in orderbook_groups {
376            let orderbook_count = orderbooks.len() as u64;
377            match Self::process_orderbook_batch(
378                &exchange,
379                &symbol,
380                orderbooks,
381                storage_manager,
382                config,
383            )
384            .await
385            {
386                Ok(bytes_written) => {
387                    total_bytes_written += bytes_written;
388                    orderbooks_processed += orderbook_count;
389                }
390                Err(e) => {
391                    log::error!("Failed to process orderbook batch for {exchange}:{symbol}: {e}");
392                    errors_count += 1;
393                }
394            }
395        }
396
397        // Update statistics
398        let processing_time_nanos = start_time.elapsed().as_nanos() as u64;
399        {
400            let mut stats_guard = stats.write();
401            stats_guard.events_processed += batch.len() as u64;
402            stats_guard.trades_processed += trades_processed;
403            stats_guard.orderbooks_processed += orderbooks_processed;
404            stats_guard.errors_count += errors_count;
405            stats_guard.bytes_written += total_bytes_written;
406            stats_guard.avg_latency_nanos =
407                (stats_guard.avg_latency_nanos * 9 + processing_time_nanos) / 10;
408            stats_guard.last_processed_time = Some(crate::monitor::schema::timestamp::now_nanos());
409        }
410
411        log::debug!(
412            "Processed batch of {} events in {}μs",
413            batch.len(),
414            processing_time_nanos / 1000
415        );
416    }
417
418    /// Process a batch of trades for a specific exchange/symbol
419    async fn process_trade_batch(
420        _exchange: &str,
421        _symbol: &str,
422        trades: Vec<&TradeRecord>,
423        storage_manager: &StorageManager,
424        _config: &PipelineConfig,
425    ) -> Result<u64> {
426        if trades.is_empty() {
427            return Ok(0);
428        }
429
430        let mut bytes_written = 0u64;
431
432        // Write each trade to storage
433        for trade in trades {
434            storage_manager.write_trade(trade).await.map_err(|e| {
435                CollectionError::DataProcessing(format!("Storage write failed: {e}"))
436            })?;
437
438            // Estimate bytes written (approximate)
439            bytes_written += std::mem::size_of::<TradeRecord>() as u64;
440        }
441
442        Ok(bytes_written)
443    }
444
445    /// Process a batch of orderbooks for a specific exchange/symbol
446    async fn process_orderbook_batch(
447        _exchange: &str,
448        _symbol: &str,
449        orderbooks: Vec<&OrderBookRecord>,
450        storage_manager: &StorageManager,
451        _config: &PipelineConfig,
452    ) -> Result<u64> {
453        if orderbooks.is_empty() {
454            return Ok(0);
455        }
456
457        let mut bytes_written = 0u64;
458
459        // Write each orderbook to storage
460        for orderbook in orderbooks {
461            storage_manager
462                .write_orderbook(orderbook)
463                .await
464                .map_err(|e| {
465                    CollectionError::DataProcessing(format!("Storage write failed: {e}"))
466                })?;
467
468            // Estimate bytes written (approximate)
469            bytes_written += std::mem::size_of::<OrderBookRecord>() as u64;
470        }
471
472        Ok(bytes_written)
473    }
474}
475
476impl Drop for DataPipeline {
477    fn drop(&mut self) {
478        // Send shutdown signal if still available
479        if let Some(shutdown_sender) = self.shutdown_sender.take() {
480            let _ = shutdown_sender.try_send(());
481        }
482
483        // Abort all worker handles
484        for handle in &self.worker_handles {
485            handle.abort();
486        }
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493    use crate::monitor::config::storage::StorageConfig;
494    use tempfile::TempDir;
495
496    #[tokio::test]
497    async fn test_pipeline_creation() {
498        let temp_dir = TempDir::new().unwrap();
499        let storage_config = StorageConfig {
500            market_data_path: temp_dir.path().join("market_data"),
501            ..Default::default()
502        };
503        let storage_manager = Arc::new(StorageManager::new(storage_config).await.unwrap());
504
505        let config = PipelineConfig::default();
506        let pipeline = DataPipeline::new(config, storage_manager);
507
508        assert_eq!(pipeline.worker_handles.len(), 0);
509        assert_eq!(pipeline.get_stats().events_processed, 0);
510    }
511
512    #[tokio::test]
513    async fn test_pipeline_stats() {
514        let temp_dir = TempDir::new().unwrap();
515        let storage_config = StorageConfig {
516            market_data_path: temp_dir.path().join("market_data"),
517            ..Default::default()
518        };
519        let storage_manager = Arc::new(StorageManager::new(storage_config).await.unwrap());
520
521        let config = PipelineConfig::default();
522        let pipeline = DataPipeline::new(config, storage_manager);
523
524        let initial_stats = pipeline.get_stats();
525        assert_eq!(initial_stats.events_processed, 0);
526        assert_eq!(initial_stats.trades_processed, 0);
527        assert_eq!(initial_stats.orderbooks_processed, 0);
528
529        pipeline.reset_stats();
530        let reset_stats = pipeline.get_stats();
531        assert_eq!(reset_stats.events_processed, 0);
532    }
533}