rusty_bin/monitor/
optimized_collection_manager.rs

1//! Optimized collection manager using lock-free data structures
2//!
3//! This module provides a high-performance collection manager that leverages
4//! lock-free data structures for minimal latency in high-frequency trading environments.
5
6use crate::monitor::{
7    collector::{DataType, ExchangeClient, MarketDataEvent, Result},
8    lockfree_buffer_pool::{LockFreeBufferPool, LockFreeBufferPoolConfig},
9    lockfree_exchange_manager::{ExchangeManagerStats, LockFreeExchangeManager},
10    lockfree_stats::{LockFreeStatsCollector, StatsSnapshot},
11    storage::manager::StorageManager,
12    zerocopy_pipeline::{ZerocopyDataPipeline, ZerocopyPipelineConfig},
13};
14use flume::Sender;
15use quanta::{Clock, Instant as QuantaInstant};
16use smartstring::alias::String as SmartString;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
19use std::time::Duration;
20use tokio::sync::oneshot;
21use tokio::task::JoinHandle;
22
23/// Configuration for the optimized collection manager
24#[derive(Debug, Clone)]
25pub struct OptimizedCollectionConfig {
26    /// Exchange connection timeout in milliseconds
27    ///
28    /// Controls how long to wait for an exchange connection to be established
29    /// before timing out. Default is 30 seconds (30000ms).
30    pub connection_timeout_ms: u64,
31    /// Health check interval in milliseconds
32    ///
33    /// Determines how frequently the system performs health checks on exchange
34    /// connections to ensure they remain active. Default is 30 seconds (30000ms).
35    pub health_check_interval_ms: u64,
36    /// Retry interval for failed connections in milliseconds
37    ///
38    /// Specifies the delay between retry attempts when reconnecting to
39    /// exchanges after connection failures. Default is 60 seconds (60000ms).
40    pub retry_interval_ms: u64,
41    /// Maximum number of retry attempts
42    ///
43    /// Limits the number of times the system will attempt to reconnect to
44    /// an exchange before marking it as permanently failed. Default is 10.
45    pub max_retry_attempts: usize,
46    /// Buffer pool configuration for lock-free operations
47    ///
48    /// Configures the lock-free buffer pool used for high-performance
49    /// memory management during market data processing.
50    pub buffer_pool_config: LockFreeBufferPoolConfig,
51    /// Zero-copy pipeline configuration
52    ///
53    /// Configures the zero-copy data pipeline for ultra-low-latency
54    /// processing of market data streams without unnecessary allocations.
55    pub pipeline_config: ZerocopyPipelineConfig,
56    /// Enable automatic cleanup of old statistics
57    ///
58    /// When enabled, the system will periodically clean up old statistical
59    /// data to prevent memory bloat. Default is true.
60    pub enable_stats_cleanup: bool,
61    /// Statistics cleanup interval in milliseconds
62    ///
63    /// Determines how frequently the system cleans up old statistical data
64    /// when automatic cleanup is enabled. Default is 5 minutes (300000ms).
65    pub stats_cleanup_interval_ms: u64,
66}
67
68/// Default configuration values for the optimized collection manager
69///
70/// Provides sensible defaults for high-frequency trading environments,
71/// with conservative timeouts and aggressive performance optimizations.
72impl Default for OptimizedCollectionConfig {
73    fn default() -> Self {
74        Self {
75            connection_timeout_ms: 30000,    // 30 seconds
76            health_check_interval_ms: 30000, // 30 seconds
77            retry_interval_ms: 60000,        // 60 seconds
78            max_retry_attempts: 10,
79            buffer_pool_config: LockFreeBufferPoolConfig::default(),
80            pipeline_config: ZerocopyPipelineConfig::default(),
81            enable_stats_cleanup: true,
82            stats_cleanup_interval_ms: 300000, // 5 minutes
83        }
84    }
85}
86
87/// Optimized collection manager with lock-free optimizations
88pub struct OptimizedCollectionManager {
89    /// Configuration
90    config: OptimizedCollectionConfig,
91
92    /// Lock-free exchange manager
93    exchange_manager: Arc<LockFreeExchangeManager>,
94
95    /// Lock-free statistics collector
96    stats_collector: Arc<LockFreeStatsCollector>,
97
98    /// Lock-free buffer pool
99    buffer_pool: Arc<LockFreeBufferPool>,
100
101    /// Zero-copy data pipeline
102    zerocopy_pipeline: Option<ZerocopyDataPipeline>,
103
104    /// Storage manager
105    storage_manager: Arc<StorageManager>,
106
107    /// Management task handles
108    management_handles: Vec<JoinHandle<()>>,
109
110    /// Shutdown coordination
111    shutdown_sender: Option<oneshot::Sender<()>>,
112    shutdown_requested: Arc<AtomicBool>,
113
114    /// High-resolution timing
115    clock: Clock,
116    start_time: QuantaInstant,
117
118    /// Atomic counters for ultra-fast metrics
119    events_processed_total: Arc<AtomicU64>,
120    last_health_check: Arc<AtomicU64>,
121    last_stats_cleanup: Arc<AtomicU64>,
122}
123
124impl OptimizedCollectionManager {
125    /// Create a new optimized collection manager
126    pub async fn new(
127        config: OptimizedCollectionConfig,
128        storage_manager: Arc<StorageManager>,
129    ) -> Result<Self> {
130        let exchange_manager = Arc::new(LockFreeExchangeManager::new());
131        let stats_collector = Arc::new(LockFreeStatsCollector::new());
132        let buffer_pool = Arc::new(LockFreeBufferPool::new(config.buffer_pool_config.clone()));
133
134        let clock = Clock::new();
135        let start_time = clock.now();
136
137        Ok(Self {
138            config,
139            exchange_manager,
140            stats_collector,
141            buffer_pool,
142            zerocopy_pipeline: None,
143            storage_manager,
144            management_handles: Vec::new(),
145            shutdown_sender: None,
146            shutdown_requested: Arc::new(AtomicBool::new(false)),
147            clock,
148            start_time,
149            events_processed_total: Arc::new(AtomicU64::new(0)),
150            last_health_check: Arc::new(AtomicU64::new(0)),
151            last_stats_cleanup: Arc::new(AtomicU64::new(0)),
152        })
153    }
154
155    /// Start optimized collection with lock-free coordination
156    pub async fn start_collection(
157        &mut self,
158        exchange_filter: Option<Vec<SmartString>>,
159        symbol_filter: Option<Vec<SmartString>>,
160        event_sender: Sender<MarketDataEvent>,
161    ) -> Result<()> {
162        log::info!("Starting optimized collection manager with lock-free optimizations");
163
164        // Determine exchanges and symbols
165        let exchanges = exchange_filter.unwrap_or_else(|| vec!["binance".into()]);
166        let symbols = symbol_filter.unwrap_or_else(|| {
167            vec![
168                "BTCUSDT".into(),
169                "ETHUSDT".into(),
170                "ADAUSDT".into(),
171                "SOLUSDT".into(),
172            ]
173        });
174
175        // Initialize zero-copy pipeline
176        let mut pipeline = ZerocopyDataPipeline::new(
177            self.config.pipeline_config.clone(),
178            self.storage_manager.clone(),
179        );
180
181        // Create processing channels with optimal buffer sizes
182        let (processing_sender, processing_receiver) =
183            flume::bounded(self.config.pipeline_config.worker_buffer_size);
184
185        // Create internal event routing channels
186        let (internal_sender, internal_receiver) =
187            flume::bounded(self.config.pipeline_config.worker_buffer_size);
188
189        // Start the zero-copy pipeline
190        pipeline.start(processing_receiver).await?;
191        self.zerocopy_pipeline = Some(pipeline);
192
193        // Start management tasks
194        self.start_management_tasks(internal_receiver, processing_sender.clone())
195            .await?;
196
197        // Define data types to collect
198        let data_types = vec![DataType::Trades, DataType::OrderBook];
199
200        // Initialize exchanges with parallel connection attempts
201        let mut connection_futures = Vec::new();
202        for exchange_name in exchanges {
203            let manager = self.exchange_manager.clone();
204            let stats = self.stats_collector.clone();
205            let exchange_name_clone = exchange_name.clone();
206            let symbols_clone = symbols.clone();
207            let data_types_clone = data_types.clone();
208            let processing_sender_clone = processing_sender.clone();
209
210            let future = async move {
211                // Create ExchangeClient for market data collection (not trading)
212                let mut exchange_client = ExchangeClient::new(exchange_name_clone.clone());
213
214                // Connect to exchange
215                match exchange_client.connect().await {
216                    Ok(()) => {
217                        log::info!("Successfully connected to exchange: {exchange_name_clone}");
218
219                        // Add exchange to manager
220                        if let Err(e) = manager
221                            .add_exchange(exchange_name_clone.clone(), exchange_client)
222                            .await
223                        {
224                            log::error!(
225                                "Failed to add exchange {exchange_name_clone} to manager: {e}"
226                            );
227                            stats.record_connection_error();
228                            return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
229                        }
230
231                        // Start collection for symbols
232                        if let Err(e) = manager
233                            .start_collection(
234                                &exchange_name_clone,
235                                symbols_clone,
236                                data_types_clone,
237                                processing_sender_clone,
238                            )
239                            .await
240                        {
241                            log::error!(
242                                "Failed to start collection for exchange {exchange_name_clone}: {e}"
243                            );
244                            stats.record_connection_error();
245                            return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
246                        }
247
248                        log::info!(
249                            "Successfully started collection for exchange: {exchange_name_clone}"
250                        );
251                    }
252                    Err(e) => {
253                        log::error!("Failed to connect to exchange {exchange_name_clone}: {e}");
254                        stats.record_connection_error();
255                        return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
256                    }
257                }
258
259                Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
260            };
261
262            connection_futures.push(tokio::spawn(future));
263        }
264
265        // Wait for all connections to complete
266        for future in connection_futures {
267            if let Err(e) = future
268                .await
269                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
270            {
271                log::error!("Failed to initialize exchange connection: {e}");
272                self.stats_collector.record_connection_error();
273            }
274        }
275
276        log::info!("Optimized collection manager started successfully");
277        Ok(())
278    }
279
280    /// Start management tasks using lock-free coordination
281    async fn start_management_tasks(
282        &mut self,
283        event_receiver: flume::Receiver<MarketDataEvent>,
284        processing_sender: Sender<MarketDataEvent>,
285    ) -> Result<()> {
286        let (shutdown_sender, mut shutdown_receiver) = oneshot::channel();
287        self.shutdown_sender = Some(shutdown_sender);
288
289        // Event routing task (high-frequency, lock-free)
290        {
291            let stats = self.stats_collector.clone();
292            let exchange_manager = self.exchange_manager.clone();
293            let shutdown_requested = self.shutdown_requested.clone();
294            let events_processed = self.events_processed_total.clone();
295
296            let routing_handle = tokio::spawn(async move {
297                let mut event_batch = Vec::with_capacity(1000);
298                let mut last_batch_time = QuantaInstant::now();
299                const BATCH_TIMEOUT_NANOS: u64 = 1_000_000; // 1ms
300
301                loop {
302                    tokio::select! {
303                        // Receive events
304                        event_result = event_receiver.recv_async() => {
305                            match event_result {
306                                Ok(event) => {
307                                    // Record statistics (lock-free)
308                                    match &event {
309                                        MarketDataEvent::Trade(_) => {
310                                            stats.record_trades(1);
311                                            stats.record_event();
312                                        }
313                                        MarketDataEvent::OrderBook(_) => {
314                                            stats.record_orderbooks(1);
315                                            stats.record_event();
316                                        }
317                                        MarketDataEvent::ConnectionStatus { exchange, connected, .. } => {
318                                            if *connected {
319                                                log::debug!("Exchange {exchange} connected");
320                                            } else {
321                                                exchange_manager.record_error(exchange);
322                                            }
323                                        }
324                                        MarketDataEvent::Error { exchange, .. } => {
325                                            exchange_manager.record_error(exchange);
326                                            stats.record_parsing_error();
327                                        }
328                                    }
329
330                                    event_batch.push(event);
331                                    events_processed.fetch_add(1, Ordering::Relaxed);
332
333                                    // Send batch when full or timeout reached
334                                    let should_send_batch = event_batch.len() >= 1000 ||
335                                        last_batch_time.elapsed().as_nanos() as u64 >= BATCH_TIMEOUT_NANOS;
336
337                                    if should_send_batch && !event_batch.is_empty() {
338                                        for event in event_batch.drain(..) {
339                                            if processing_sender.send(event).is_err() {
340                                                log::warn!("Processing channel closed");
341                                                return;
342                                            }
343                                        }
344                                        last_batch_time = QuantaInstant::now();
345                                        stats.record_buffer_reuse();
346                                    }
347                                }
348                                Err(_) => {
349                                    log::info!("Event channel closed, stopping routing task");
350                                    break;
351                                }
352                            }
353                        }
354
355                        // Check for shutdown
356                        _ = &mut shutdown_receiver => {
357                            log::info!("Event routing task received shutdown signal");
358                            break;
359                        }
360                    }
361
362                    // Break if shutdown requested
363                    if shutdown_requested.load(Ordering::Acquire) {
364                        break;
365                    }
366                }
367
368                // Send remaining events
369                for event in event_batch {
370                    let _ = processing_sender.send(event);
371                }
372
373                log::info!("Event routing task stopped");
374            });
375
376            self.management_handles.push(routing_handle);
377        }
378
379        // Health check task (periodic, lock-free coordination)
380        {
381            let exchange_manager = self.exchange_manager.clone();
382            let stats = self.stats_collector.clone();
383            let shutdown_requested = self.shutdown_requested.clone();
384            let last_health_check = self.last_health_check.clone();
385            let health_check_interval = Duration::from_millis(self.config.health_check_interval_ms);
386
387            let health_handle = tokio::spawn(async move {
388                let mut interval = tokio::time::interval(health_check_interval);
389
390                loop {
391                    tokio::select! {
392                        _ = interval.tick() => {
393                            let start_time = QuantaInstant::now();
394                            let health_status = exchange_manager.health_check().await;
395                            let health_check_latency = start_time.elapsed().as_nanos() as u64;
396
397                            // Update statistics (lock-free)
398                            stats.update_latency(health_check_latency);
399                            last_health_check.store(start_time.elapsed().as_nanos() as u64, Ordering::Relaxed);
400
401                            // Log unhealthy exchanges
402                            for (exchange, healthy) in health_status {
403                                if !healthy {
404                                    log::warn!("Exchange {exchange} health check failed");
405                                }
406                            }
407                        }
408                    }
409
410                    if shutdown_requested.load(Ordering::Acquire) {
411                        break;
412                    }
413                }
414
415                log::info!("Health check task stopped");
416            });
417
418            self.management_handles.push(health_handle);
419        }
420
421        // Statistics cleanup task (periodic, low frequency)
422        if self.config.enable_stats_cleanup {
423            let stats = self.stats_collector.clone();
424            let shutdown_requested = self.shutdown_requested.clone();
425            let last_cleanup = self.last_stats_cleanup.clone();
426            let cleanup_interval = Duration::from_millis(self.config.stats_cleanup_interval_ms);
427
428            let cleanup_handle = tokio::spawn(async move {
429                let mut interval = tokio::time::interval(cleanup_interval);
430
431                loop {
432                    tokio::select! {
433                        _ = interval.tick() => {
434                            // Perform periodic maintenance (e.g., log statistics)
435                            let snapshot = stats.get_snapshot();
436                            last_cleanup.store(
437                                std::time::SystemTime::now()
438                                    .duration_since(std::time::UNIX_EPOCH)
439                                    .unwrap_or_default()
440                                    .as_nanos() as u64,
441                                Ordering::Relaxed
442                            );
443
444                            log::info!(
445                                "Collection stats: {} events/sec, {} avg latency (ns), {} trades, {} orderbooks",
446                                snapshot.events_per_second,
447                                snapshot.avg_latency_nanos,
448                                snapshot.trades_processed,
449                                snapshot.orderbooks_processed
450                            );
451                        }
452                    }
453
454                    if shutdown_requested.load(Ordering::Acquire) {
455                        break;
456                    }
457                }
458
459                log::info!("Statistics cleanup task stopped");
460            });
461
462            self.management_handles.push(cleanup_handle);
463        }
464
465        Ok(())
466    }
467
468    /// Get comprehensive statistics (mostly lock-free reads)
469    pub fn get_comprehensive_stats(&self) -> ComprehensiveStats {
470        let stats_snapshot = self.stats_collector.get_snapshot();
471        let exchange_stats = self.exchange_manager.get_statistics();
472        let buffer_stats = self.buffer_pool.get_stats();
473        let buffer_counts = self.buffer_pool.get_available_counts();
474
475        let pipeline_stats = self
476            .zerocopy_pipeline
477            .as_ref()
478            .map(|p| p.get_stats())
479            .unwrap_or_default();
480
481        let uptime_nanos = self.start_time.elapsed().as_nanos() as u64;
482
483        ComprehensiveStats {
484            stats_snapshot,
485            exchange_stats,
486            buffer_stats,
487            buffer_counts,
488            pipeline_stats,
489            uptime_nanos,
490            events_processed_total: self.events_processed_total.load(Ordering::Relaxed),
491            last_health_check_nanos: self.last_health_check.load(Ordering::Relaxed),
492            last_stats_cleanup_nanos: self.last_stats_cleanup.load(Ordering::Relaxed),
493        }
494    }
495
496    /// Record a high-frequency event (ultra-fast path)
497    #[inline(always)]
498    pub fn record_hf_event(&self, exchange: &str, bytes: u64, latency_nanos: u64) {
499        // All lock-free operations
500        self.stats_collector.record_event();
501        self.stats_collector.update_latency(latency_nanos);
502        self.exchange_manager.record_message(exchange, bytes);
503        self.events_processed_total.fetch_add(1, Ordering::Relaxed);
504    }
505
506    /// Request shutdown (lock-free)
507    pub fn request_shutdown(&self) {
508        self.shutdown_requested.store(true, Ordering::Release);
509        self.exchange_manager.request_shutdown();
510    }
511
512    /// Check if shutdown was requested (lock-free)
513    pub fn is_shutdown_requested(&self) -> bool {
514        self.shutdown_requested.load(Ordering::Acquire)
515    }
516
517    /// Stop all collection operations
518    pub async fn stop_all(&mut self) -> Result<()> {
519        log::info!("Stopping optimized collection manager");
520
521        // Request shutdown
522        self.request_shutdown();
523
524        // Send shutdown signal to management tasks
525        if let Some(sender) = self.shutdown_sender.take() {
526            let _ = sender.send(());
527        }
528
529        // Stop zerocopy pipeline
530        if let Some(mut pipeline) = self.zerocopy_pipeline.take() {
531            pipeline.stop().await?;
532        }
533
534        // Wait for management tasks to finish
535        for handle in self.management_handles.drain(..) {
536            handle.abort();
537            let _ = handle.await;
538        }
539
540        // Shutdown exchange manager
541        self.exchange_manager.shutdown().await?;
542
543        log::info!("Optimized collection manager stopped");
544        Ok(())
545    }
546}
547
548/// Comprehensive statistics combining all subsystems
549#[derive(Debug, Clone)]
550pub struct ComprehensiveStats {
551    /// Lock-free statistics snapshot containing event counts and latency metrics
552    ///
553    /// Provides a consistent view of the system's processing statistics
554    /// including events per second, average latency, and throughput metrics.
555    pub stats_snapshot: StatsSnapshot,
556    /// Exchange manager statistics for connection health and performance
557    ///
558    /// Contains detailed statistics about exchange connections, including
559    /// connection counts, error rates, and exchange-specific metrics.
560    pub exchange_stats: ExchangeManagerStats,
561    /// Lock-free buffer pool statistics for memory management efficiency
562    ///
563    /// Tracks buffer pool hits, misses, and allocation patterns to measure
564    /// memory management efficiency in high-frequency scenarios.
565    pub buffer_stats: crate::monitor::lockfree_buffer_pool::BufferPoolStats,
566    /// Current buffer availability counts across all buffer types
567    ///
568    /// Provides real-time counts of available buffers for serialization,
569    /// compression, and SIMD operations to monitor resource utilization.
570    pub buffer_counts: crate::monitor::lockfree_buffer_pool::BufferCounts,
571    /// Zero-copy pipeline statistics for ultra-low-latency processing
572    ///
573    /// Tracks zero-copy operations, memory mappings, and pipeline efficiency
574    /// metrics to optimize for minimal allocation overhead.
575    pub pipeline_stats: crate::monitor::zerocopy_pipeline::ZerocopyPipelineStats,
576    /// System uptime in nanoseconds since manager initialization
577    ///
578    /// High-resolution uptime measurement used for calculating throughput
579    /// and efficiency metrics over the system's operational lifetime.
580    pub uptime_nanos: u64,
581    /// Total number of events processed since system start
582    ///
583    /// Atomic counter tracking all market data events processed by the
584    /// collection manager, used for throughput calculations.
585    pub events_processed_total: u64,
586    /// Timestamp of the last health check operation in nanoseconds
587    ///
588    /// Records when the most recent health check was performed to monitor
589    /// the frequency and timing of system health assessments.
590    pub last_health_check_nanos: u64,
591    /// Timestamp of the last statistics cleanup operation in nanoseconds
592    ///
593    /// Tracks when the system last performed cleanup of old statistical
594    /// data to ensure efficient memory usage over time.
595    pub last_stats_cleanup_nanos: u64,
596}
597
598impl ComprehensiveStats {
599    /// Calculate overall system efficiency metrics
600    pub fn calculate_efficiency_metrics(&self) -> EfficiencyMetrics {
601        let uptime_seconds = self.uptime_nanos as f64 / 1_000_000_000.0;
602
603        let overall_throughput = if uptime_seconds > 0.0 {
604            self.events_processed_total as f64 / uptime_seconds
605        } else {
606            0.0
607        };
608
609        let memory_efficiency = if self.buffer_counts.total_capacity > 0 {
610            (self.buffer_stats.pool_hits as f64)
611                / ((self.buffer_stats.pool_hits + self.buffer_stats.pool_misses) as f64).max(1.0)
612        } else {
613            0.0
614        };
615
616        let processing_efficiency = if self.pipeline_stats.events_processed > 0 {
617            (self.pipeline_stats.zero_copy_operations as f64)
618                / (self.pipeline_stats.events_processed as f64)
619        } else {
620            0.0
621        };
622
623        EfficiencyMetrics {
624            overall_throughput_eps: overall_throughput,
625            memory_efficiency_ratio: memory_efficiency,
626            processing_efficiency_ratio: processing_efficiency,
627            avg_latency_microseconds: self.stats_snapshot.avg_latency_nanos as f64 / 1000.0,
628            buffer_utilization_ratio: {
629                let total_available = self.buffer_counts.serialization_available
630                    + self.buffer_counts.compression_available
631                    + self.buffer_counts.simd_available;
632                let total_capacity = self.buffer_counts.total_capacity * 3; // 3 buffer types
633                if total_capacity > 0 {
634                    1.0 - (total_available as f64 / total_capacity as f64)
635                } else {
636                    0.0
637                }
638            },
639        }
640    }
641}
642
643/// System efficiency metrics
644#[derive(Debug, Clone)]
645pub struct EfficiencyMetrics {
646    /// Overall system throughput measured in events per second
647    ///
648    /// Calculated as total events processed divided by system uptime,
649    /// providing a key performance indicator for the collection manager.
650    pub overall_throughput_eps: f64,
651    /// Memory efficiency ratio from buffer pool hit rate
652    ///
653    /// Ratio of buffer pool hits to total buffer requests, indicating
654    /// how effectively the system is reusing memory resources.
655    pub memory_efficiency_ratio: f64,
656    /// Processing efficiency ratio for zero-copy operations
657    ///
658    /// Proportion of operations that used zero-copy techniques versus
659    /// traditional copying, measuring optimization effectiveness.
660    pub processing_efficiency_ratio: f64,
661    /// Average processing latency in microseconds
662    ///
663    /// Mean latency for processing market data events, converted from
664    /// nanoseconds to microseconds for easier interpretation.
665    pub avg_latency_microseconds: f64,
666    /// Buffer utilization ratio across all buffer types
667    ///
668    /// Proportion of buffers currently in use versus total available,
669    /// indicating resource utilization and potential bottlenecks.
670    pub buffer_utilization_ratio: f64,
671}
672
673// Thread safety
674unsafe impl Send for OptimizedCollectionManager {}
675unsafe impl Sync for OptimizedCollectionManager {}
676
677#[cfg(test)]
678mod tests {
679    use super::*;
680    use crate::monitor::config::storage::StorageConfig;
681    use tempfile::TempDir;
682
683    #[tokio::test]
684    async fn test_optimized_collection_manager_creation() {
685        let temp_dir = TempDir::new().unwrap();
686        let storage_config = StorageConfig {
687            market_data_path: temp_dir.path().join("market_data"),
688            ..Default::default()
689        };
690        let storage_manager = Arc::new(StorageManager::new(storage_config).await.unwrap());
691
692        let config = OptimizedCollectionConfig::default();
693        let manager = OptimizedCollectionManager::new(config, storage_manager)
694            .await
695            .unwrap();
696
697        assert!(!manager.is_shutdown_requested());
698        assert_eq!(manager.events_processed_total.load(Ordering::Relaxed), 0);
699    }
700
701    #[tokio::test]
702    async fn test_optimized_collection_manager_stats() {
703        let temp_dir = TempDir::new().unwrap();
704        let storage_config = StorageConfig {
705            market_data_path: temp_dir.path().join("market_data"),
706            ..Default::default()
707        };
708        let storage_manager = Arc::new(StorageManager::new(storage_config).await.unwrap());
709
710        let config = OptimizedCollectionConfig::default();
711        let manager = OptimizedCollectionManager::new(config, storage_manager)
712            .await
713            .unwrap();
714
715        // Test high-frequency event recording
716        manager.record_hf_event("test_exchange", 1024, 500);
717        manager.record_hf_event("test_exchange", 2048, 750);
718
719        let stats = manager.get_comprehensive_stats();
720        assert_eq!(stats.events_processed_total, 2);
721        assert!(stats.uptime_nanos > 0);
722
723        let efficiency = stats.calculate_efficiency_metrics();
724        assert!(efficiency.overall_throughput_eps >= 0.0);
725        assert!(efficiency.avg_latency_microseconds >= 0.0);
726    }
727
728    #[tokio::test]
729    async fn test_optimized_collection_manager_shutdown() {
730        let temp_dir = TempDir::new().unwrap();
731        let storage_config = StorageConfig {
732            market_data_path: temp_dir.path().join("market_data"),
733            ..Default::default()
734        };
735        let storage_manager = Arc::new(StorageManager::new(storage_config).await.unwrap());
736
737        let config = OptimizedCollectionConfig::default();
738        let mut manager = OptimizedCollectionManager::new(config, storage_manager)
739            .await
740            .unwrap();
741
742        // Test shutdown request
743        manager.request_shutdown();
744        assert!(manager.is_shutdown_requested());
745
746        // Test stop_all
747        manager.stop_all().await.unwrap();
748    }
749}