rusty_bin/monitor/collector/
pipeline.rs1use crate::monitor::collector::{CollectionError, MarketDataEvent, Result};
10use crate::monitor::schema::{OrderBookRecord, TradeRecord};
11use crate::monitor::storage::{CompressionMode, manager::StorageManager, roller::FileRoller};
12use flume::{Receiver, Sender};
13use parking_lot::RwLock;
14use quanta::Instant as QuantaInstant;
15use rusty_common::collections::FxHashMap;
16use smartstring::alias::String as SmartString;
17use std::sync::Arc;
18use tokio::task::JoinHandle;
19use tokio::time::{Duration, Instant};
20
21#[derive(Debug, Clone)]
23pub struct PipelineConfig {
24 pub batch_size: usize,
26 pub batch_timeout_ms: u64,
28 pub worker_count: usize,
30 pub compression_mode: CompressionMode,
32 pub rotation_check_interval_seconds: u64,
34}
35
36impl Default for PipelineConfig {
37 fn default() -> Self {
38 Self {
39 batch_size: 1000,
40 batch_timeout_ms: 100,
41 worker_count: 4,
42 compression_mode: CompressionMode::None,
43 rotation_check_interval_seconds: 300, }
45 }
46}
47
48#[derive(Debug, Clone, Default)]
50pub struct PipelineStats {
51 pub events_processed: u64,
53 pub trades_processed: u64,
55 pub orderbooks_processed: u64,
57 pub errors_count: u64,
59 pub bytes_written: u64,
61 pub avg_latency_nanos: u64,
63 pub last_processed_time: Option<u64>,
65 pub files_rotated: u64,
67 pub rotation_errors: u64,
69}
70
71#[derive(Debug)]
73pub struct DataPipeline {
74 config: PipelineConfig,
76 storage_manager: Arc<StorageManager>,
78 file_roller: Option<FileRoller>,
80 worker_handles: Vec<JoinHandle<()>>,
82 rotation_handle: Option<JoinHandle<()>>,
84 stats: Arc<RwLock<PipelineStats>>,
86 shutdown_sender: Option<Sender<()>>,
88}
89
90impl DataPipeline {
91 #[must_use]
93 pub fn new(config: PipelineConfig, storage_manager: Arc<StorageManager>) -> Self {
94 let file_roller = FileRoller::new(
96 storage_manager.config.market_data_path.clone(),
97 storage_manager.config.max_file_size_mb,
98 storage_manager.config.max_records_per_file,
99 storage_manager.config.retention_days,
100 );
101
102 Self {
103 config,
104 storage_manager,
105 file_roller: Some(file_roller),
106 worker_handles: Vec::new(),
107 rotation_handle: None,
108 stats: Arc::new(RwLock::new(PipelineStats::default())),
109 shutdown_sender: None,
110 }
111 }
112
113 pub async fn start(&mut self, event_receiver: Receiver<MarketDataEvent>) -> Result<()> {
115 log::info!(
116 "Starting data pipeline with {} workers",
117 self.config.worker_count
118 );
119
120 let (shutdown_sender, shutdown_receiver) = flume::unbounded();
121 self.shutdown_sender = Some(shutdown_sender);
122
123 if let Err(e) = self.start_file_rotation_watcher(&shutdown_receiver) {
125 log::error!("Failed to start file rotation watcher: {e}");
126 }
127
128 for worker_id in 0..self.config.worker_count {
130 let worker_handle = self
131 .spawn_worker(worker_id, event_receiver.clone(), shutdown_receiver.clone())
132 .await;
133 self.worker_handles.push(worker_handle);
134 }
135
136 log::info!("Data pipeline started successfully");
137 Ok(())
138 }
139
140 pub async fn stop(&mut self) -> Result<()> {
142 log::info!("Stopping data pipeline");
143
144 if let Some(shutdown_sender) = self.shutdown_sender.take() {
146 let _ = shutdown_sender.send(());
147 }
148
149 if let Some(handle) = self.rotation_handle.take() {
151 handle.abort();
152 let _ = handle.await;
153 }
154
155 for handle in self.worker_handles.drain(..) {
157 handle.abort();
158 let _ = handle.await;
159 }
160
161 log::info!("Data pipeline stopped");
162 Ok(())
163 }
164
165 fn start_file_rotation_watcher(&mut self, shutdown_receiver: &Receiver<()>) -> Result<()> {
167 let file_roller = self
168 .file_roller
169 .take()
170 .ok_or(CollectionError::DataProcessing(
171 "File roller not found".to_string(),
172 ))?;
173
174 let stats = self.stats.clone();
175 let shutdown_receiver = shutdown_receiver.clone();
176 let check_interval = Duration::from_secs(self.config.rotation_check_interval_seconds);
177
178 let handle = tokio::spawn(async move {
179 log::info!(
180 "Starting file rotation watcher (check interval: {}s)",
181 check_interval.as_secs()
182 );
183
184 let mut interval = tokio::time::interval(check_interval);
185 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
186
187 loop {
188 tokio::select! {
189 _ = shutdown_receiver.recv_async() => {
191 log::info!("File rotation watcher received shutdown signal");
192 break;
193 }
194
195 _ = interval.tick() => {
197 let rotated_files = match file_roller.execute_rotation_check().await {
198 Ok(files) => files,
199 Err(e) => {
200 log::error!("File rotation check failed: {e}");
201
202 let mut stats_guard = stats.write();
204 stats_guard.rotation_errors += 1;
205 continue;
206 }
207 };
208
209 if !rotated_files.is_empty() {
210 let count = rotated_files.len() as u64;
211 log::info!("Rotated {count} files: {rotated_files:?}");
212
213 let mut stats_guard = stats.write();
215 stats_guard.files_rotated += count;
216 }
217
218 if stats.read().files_rotated.is_multiple_of(10) {
220 match file_roller.cleanup_old_files().await {
221 Ok(deleted_count) => {
222 if deleted_count > 0 {
223 log::info!("Cleaned up {deleted_count} old files");
224 }
225 }
226 Err(e) => {
227 log::error!("File cleanup failed: {e}");
228 }
229 }
230 }
231
232 }
233 }
234 }
235
236 log::info!("File rotation watcher stopped");
237 });
238
239 self.rotation_handle = Some(handle);
240
241 Ok(())
242 }
243
244 pub fn get_stats(&self) -> PipelineStats {
246 self.stats.read().clone()
247 }
248
249 pub fn reset_stats(&self) {
251 *self.stats.write() = PipelineStats::default();
252 }
253
254 async fn spawn_worker(
256 &self,
257 worker_id: usize,
258 event_receiver: Receiver<MarketDataEvent>,
259 shutdown_receiver: Receiver<()>,
260 ) -> JoinHandle<()> {
261 let storage_manager = self.storage_manager.clone();
262 let stats = self.stats.clone();
263 let config = self.config.clone();
264
265 tokio::spawn(async move {
266 log::info!("Starting pipeline worker {worker_id}");
267
268 let mut batch = Vec::with_capacity(config.batch_size);
269 let mut last_flush = Instant::now();
270 let batch_timeout = Duration::from_millis(config.batch_timeout_ms);
271
272 loop {
273 tokio::select! {
274 _ = shutdown_receiver.recv_async() => {
276 log::info!("Worker {worker_id} received shutdown signal");
277 break;
278 }
279
280 _ = tokio::time::sleep_until(last_flush + batch_timeout) => {
282 if !batch.is_empty() {
283 Self::process_batch(&batch, &storage_manager, &stats, &config).await;
284 batch.clear();
285 last_flush = Instant::now();
286 }
287 }
288
289 event_result = event_receiver.recv_async() => {
291 match event_result {
292 Ok(event) => {
293 batch.push(event);
294
295 if batch.len() >= config.batch_size {
297 Self::process_batch(&batch, &storage_manager, &stats, &config).await;
298 batch.clear();
299 last_flush = Instant::now();
300 }
301 }
302 Err(_) => {
303 log::info!("Worker {worker_id} event channel closed");
304 break;
305 }
306 }
307 }
308 }
309 }
310
311 if !batch.is_empty() {
313 Self::process_batch(&batch, &storage_manager, &stats, &config).await;
314 }
315
316 log::info!("Pipeline worker {worker_id} stopped");
317 })
318 }
319
320 async fn process_batch(
322 batch: &[MarketDataEvent],
323 storage_manager: &StorageManager,
324 stats: &Arc<RwLock<PipelineStats>>,
325 config: &PipelineConfig,
326 ) {
327 let start_time = QuantaInstant::now();
328
329 let mut trade_groups: FxHashMap<(SmartString, SmartString), Vec<&TradeRecord>> =
331 FxHashMap::default();
332 let mut orderbook_groups: FxHashMap<(SmartString, SmartString), Vec<&OrderBookRecord>> =
333 FxHashMap::default();
334
335 for event in batch {
336 match event {
337 MarketDataEvent::Trade(trade) => {
338 let key = (trade.exchange.clone(), trade.symbol.clone());
339 trade_groups.entry(key).or_default().push(trade);
340 }
341 MarketDataEvent::OrderBook(orderbook) => {
342 let key = (orderbook.exchange.clone(), orderbook.symbol.clone());
343 orderbook_groups.entry(key).or_default().push(orderbook);
344 }
345 MarketDataEvent::ConnectionStatus { .. } | MarketDataEvent::Error { .. } => {
346 log::debug!("Received status/error event: {event:?}");
348 }
349 }
350 }
351
352 let mut total_bytes_written = 0u64;
353 let mut trades_processed = 0u64;
354 let mut orderbooks_processed = 0u64;
355 let mut errors_count = 0u64;
356
357 for ((exchange, symbol), trades) in trade_groups {
359 let trade_count = trades.len() as u64;
360 match Self::process_trade_batch(&exchange, &symbol, trades, storage_manager, config)
361 .await
362 {
363 Ok(bytes_written) => {
364 total_bytes_written += bytes_written;
365 trades_processed += trade_count;
366 }
367 Err(e) => {
368 log::error!("Failed to process trade batch for {exchange}:{symbol}: {e}");
369 errors_count += 1;
370 }
371 }
372 }
373
374 for ((exchange, symbol), orderbooks) in orderbook_groups {
376 let orderbook_count = orderbooks.len() as u64;
377 match Self::process_orderbook_batch(
378 &exchange,
379 &symbol,
380 orderbooks,
381 storage_manager,
382 config,
383 )
384 .await
385 {
386 Ok(bytes_written) => {
387 total_bytes_written += bytes_written;
388 orderbooks_processed += orderbook_count;
389 }
390 Err(e) => {
391 log::error!("Failed to process orderbook batch for {exchange}:{symbol}: {e}");
392 errors_count += 1;
393 }
394 }
395 }
396
397 let processing_time_nanos = start_time.elapsed().as_nanos() as u64;
399 {
400 let mut stats_guard = stats.write();
401 stats_guard.events_processed += batch.len() as u64;
402 stats_guard.trades_processed += trades_processed;
403 stats_guard.orderbooks_processed += orderbooks_processed;
404 stats_guard.errors_count += errors_count;
405 stats_guard.bytes_written += total_bytes_written;
406 stats_guard.avg_latency_nanos =
407 (stats_guard.avg_latency_nanos * 9 + processing_time_nanos) / 10;
408 stats_guard.last_processed_time = Some(crate::monitor::schema::timestamp::now_nanos());
409 }
410
411 log::debug!(
412 "Processed batch of {} events in {}μs",
413 batch.len(),
414 processing_time_nanos / 1000
415 );
416 }
417
418 async fn process_trade_batch(
420 _exchange: &str,
421 _symbol: &str,
422 trades: Vec<&TradeRecord>,
423 storage_manager: &StorageManager,
424 _config: &PipelineConfig,
425 ) -> Result<u64> {
426 if trades.is_empty() {
427 return Ok(0);
428 }
429
430 let mut bytes_written = 0u64;
431
432 for trade in trades {
434 storage_manager.write_trade(trade).await.map_err(|e| {
435 CollectionError::DataProcessing(format!("Storage write failed: {e}"))
436 })?;
437
438 bytes_written += std::mem::size_of::<TradeRecord>() as u64;
440 }
441
442 Ok(bytes_written)
443 }
444
445 async fn process_orderbook_batch(
447 _exchange: &str,
448 _symbol: &str,
449 orderbooks: Vec<&OrderBookRecord>,
450 storage_manager: &StorageManager,
451 _config: &PipelineConfig,
452 ) -> Result<u64> {
453 if orderbooks.is_empty() {
454 return Ok(0);
455 }
456
457 let mut bytes_written = 0u64;
458
459 for orderbook in orderbooks {
461 storage_manager
462 .write_orderbook(orderbook)
463 .await
464 .map_err(|e| {
465 CollectionError::DataProcessing(format!("Storage write failed: {e}"))
466 })?;
467
468 bytes_written += std::mem::size_of::<OrderBookRecord>() as u64;
470 }
471
472 Ok(bytes_written)
473 }
474}
475
476impl Drop for DataPipeline {
477 fn drop(&mut self) {
478 if let Some(shutdown_sender) = self.shutdown_sender.take() {
480 let _ = shutdown_sender.try_send(());
481 }
482
483 for handle in &self.worker_handles {
485 handle.abort();
486 }
487 }
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493 use crate::monitor::config::storage::StorageConfig;
494 use tempfile::TempDir;
495
496 #[tokio::test]
497 async fn test_pipeline_creation() {
498 let temp_dir = TempDir::new().unwrap();
499 let storage_config = StorageConfig {
500 market_data_path: temp_dir.path().join("market_data"),
501 ..Default::default()
502 };
503 let storage_manager = Arc::new(StorageManager::new(storage_config).await.unwrap());
504
505 let config = PipelineConfig::default();
506 let pipeline = DataPipeline::new(config, storage_manager);
507
508 assert_eq!(pipeline.worker_handles.len(), 0);
509 assert_eq!(pipeline.get_stats().events_processed, 0);
510 }
511
512 #[tokio::test]
513 async fn test_pipeline_stats() {
514 let temp_dir = TempDir::new().unwrap();
515 let storage_config = StorageConfig {
516 market_data_path: temp_dir.path().join("market_data"),
517 ..Default::default()
518 };
519 let storage_manager = Arc::new(StorageManager::new(storage_config).await.unwrap());
520
521 let config = PipelineConfig::default();
522 let pipeline = DataPipeline::new(config, storage_manager);
523
524 let initial_stats = pipeline.get_stats();
525 assert_eq!(initial_stats.events_processed, 0);
526 assert_eq!(initial_stats.trades_processed, 0);
527 assert_eq!(initial_stats.orderbooks_processed, 0);
528
529 pipeline.reset_stats();
530 let reset_stats = pipeline.get_stats();
531 assert_eq!(reset_stats.events_processed, 0);
532 }
533}