1use crate::monitor::{
7 collector::{DataType, ExchangeClient, MarketDataEvent, Result},
8 lockfree_buffer_pool::{LockFreeBufferPool, LockFreeBufferPoolConfig},
9 lockfree_exchange_manager::{ExchangeManagerStats, LockFreeExchangeManager},
10 lockfree_stats::{LockFreeStatsCollector, StatsSnapshot},
11 storage::manager::StorageManager,
12 zerocopy_pipeline::{ZerocopyDataPipeline, ZerocopyPipelineConfig},
13};
14use flume::Sender;
15use quanta::{Clock, Instant as QuantaInstant};
16use smartstring::alias::String as SmartString;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
19use std::time::Duration;
20use tokio::sync::oneshot;
21use tokio::task::JoinHandle;
22
23#[derive(Debug, Clone)]
25pub struct OptimizedCollectionConfig {
26 pub connection_timeout_ms: u64,
31 pub health_check_interval_ms: u64,
36 pub retry_interval_ms: u64,
41 pub max_retry_attempts: usize,
46 pub buffer_pool_config: LockFreeBufferPoolConfig,
51 pub pipeline_config: ZerocopyPipelineConfig,
56 pub enable_stats_cleanup: bool,
61 pub stats_cleanup_interval_ms: u64,
66}
67
68impl Default for OptimizedCollectionConfig {
73 fn default() -> Self {
74 Self {
75 connection_timeout_ms: 30000, health_check_interval_ms: 30000, retry_interval_ms: 60000, max_retry_attempts: 10,
79 buffer_pool_config: LockFreeBufferPoolConfig::default(),
80 pipeline_config: ZerocopyPipelineConfig::default(),
81 enable_stats_cleanup: true,
82 stats_cleanup_interval_ms: 300000, }
84 }
85}
86
87pub struct OptimizedCollectionManager {
89 config: OptimizedCollectionConfig,
91
92 exchange_manager: Arc<LockFreeExchangeManager>,
94
95 stats_collector: Arc<LockFreeStatsCollector>,
97
98 buffer_pool: Arc<LockFreeBufferPool>,
100
101 zerocopy_pipeline: Option<ZerocopyDataPipeline>,
103
104 storage_manager: Arc<StorageManager>,
106
107 management_handles: Vec<JoinHandle<()>>,
109
110 shutdown_sender: Option<oneshot::Sender<()>>,
112 shutdown_requested: Arc<AtomicBool>,
113
114 clock: Clock,
116 start_time: QuantaInstant,
117
118 events_processed_total: Arc<AtomicU64>,
120 last_health_check: Arc<AtomicU64>,
121 last_stats_cleanup: Arc<AtomicU64>,
122}
123
124impl OptimizedCollectionManager {
125 pub async fn new(
127 config: OptimizedCollectionConfig,
128 storage_manager: Arc<StorageManager>,
129 ) -> Result<Self> {
130 let exchange_manager = Arc::new(LockFreeExchangeManager::new());
131 let stats_collector = Arc::new(LockFreeStatsCollector::new());
132 let buffer_pool = Arc::new(LockFreeBufferPool::new(config.buffer_pool_config.clone()));
133
134 let clock = Clock::new();
135 let start_time = clock.now();
136
137 Ok(Self {
138 config,
139 exchange_manager,
140 stats_collector,
141 buffer_pool,
142 zerocopy_pipeline: None,
143 storage_manager,
144 management_handles: Vec::new(),
145 shutdown_sender: None,
146 shutdown_requested: Arc::new(AtomicBool::new(false)),
147 clock,
148 start_time,
149 events_processed_total: Arc::new(AtomicU64::new(0)),
150 last_health_check: Arc::new(AtomicU64::new(0)),
151 last_stats_cleanup: Arc::new(AtomicU64::new(0)),
152 })
153 }
154
155 pub async fn start_collection(
157 &mut self,
158 exchange_filter: Option<Vec<SmartString>>,
159 symbol_filter: Option<Vec<SmartString>>,
160 event_sender: Sender<MarketDataEvent>,
161 ) -> Result<()> {
162 log::info!("Starting optimized collection manager with lock-free optimizations");
163
164 let exchanges = exchange_filter.unwrap_or_else(|| vec!["binance".into()]);
166 let symbols = symbol_filter.unwrap_or_else(|| {
167 vec![
168 "BTCUSDT".into(),
169 "ETHUSDT".into(),
170 "ADAUSDT".into(),
171 "SOLUSDT".into(),
172 ]
173 });
174
175 let mut pipeline = ZerocopyDataPipeline::new(
177 self.config.pipeline_config.clone(),
178 self.storage_manager.clone(),
179 );
180
181 let (processing_sender, processing_receiver) =
183 flume::bounded(self.config.pipeline_config.worker_buffer_size);
184
185 let (internal_sender, internal_receiver) =
187 flume::bounded(self.config.pipeline_config.worker_buffer_size);
188
189 pipeline.start(processing_receiver).await?;
191 self.zerocopy_pipeline = Some(pipeline);
192
193 self.start_management_tasks(internal_receiver, processing_sender.clone())
195 .await?;
196
197 let data_types = vec![DataType::Trades, DataType::OrderBook];
199
200 let mut connection_futures = Vec::new();
202 for exchange_name in exchanges {
203 let manager = self.exchange_manager.clone();
204 let stats = self.stats_collector.clone();
205 let exchange_name_clone = exchange_name.clone();
206 let symbols_clone = symbols.clone();
207 let data_types_clone = data_types.clone();
208 let processing_sender_clone = processing_sender.clone();
209
210 let future = async move {
211 let mut exchange_client = ExchangeClient::new(exchange_name_clone.clone());
213
214 match exchange_client.connect().await {
216 Ok(()) => {
217 log::info!("Successfully connected to exchange: {exchange_name_clone}");
218
219 if let Err(e) = manager
221 .add_exchange(exchange_name_clone.clone(), exchange_client)
222 .await
223 {
224 log::error!(
225 "Failed to add exchange {exchange_name_clone} to manager: {e}"
226 );
227 stats.record_connection_error();
228 return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
229 }
230
231 if let Err(e) = manager
233 .start_collection(
234 &exchange_name_clone,
235 symbols_clone,
236 data_types_clone,
237 processing_sender_clone,
238 )
239 .await
240 {
241 log::error!(
242 "Failed to start collection for exchange {exchange_name_clone}: {e}"
243 );
244 stats.record_connection_error();
245 return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
246 }
247
248 log::info!(
249 "Successfully started collection for exchange: {exchange_name_clone}"
250 );
251 }
252 Err(e) => {
253 log::error!("Failed to connect to exchange {exchange_name_clone}: {e}");
254 stats.record_connection_error();
255 return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
256 }
257 }
258
259 Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
260 };
261
262 connection_futures.push(tokio::spawn(future));
263 }
264
265 for future in connection_futures {
267 if let Err(e) = future
268 .await
269 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
270 {
271 log::error!("Failed to initialize exchange connection: {e}");
272 self.stats_collector.record_connection_error();
273 }
274 }
275
276 log::info!("Optimized collection manager started successfully");
277 Ok(())
278 }
279
280 async fn start_management_tasks(
282 &mut self,
283 event_receiver: flume::Receiver<MarketDataEvent>,
284 processing_sender: Sender<MarketDataEvent>,
285 ) -> Result<()> {
286 let (shutdown_sender, mut shutdown_receiver) = oneshot::channel();
287 self.shutdown_sender = Some(shutdown_sender);
288
289 {
291 let stats = self.stats_collector.clone();
292 let exchange_manager = self.exchange_manager.clone();
293 let shutdown_requested = self.shutdown_requested.clone();
294 let events_processed = self.events_processed_total.clone();
295
296 let routing_handle = tokio::spawn(async move {
297 let mut event_batch = Vec::with_capacity(1000);
298 let mut last_batch_time = QuantaInstant::now();
299 const BATCH_TIMEOUT_NANOS: u64 = 1_000_000; loop {
302 tokio::select! {
303 event_result = event_receiver.recv_async() => {
305 match event_result {
306 Ok(event) => {
307 match &event {
309 MarketDataEvent::Trade(_) => {
310 stats.record_trades(1);
311 stats.record_event();
312 }
313 MarketDataEvent::OrderBook(_) => {
314 stats.record_orderbooks(1);
315 stats.record_event();
316 }
317 MarketDataEvent::ConnectionStatus { exchange, connected, .. } => {
318 if *connected {
319 log::debug!("Exchange {exchange} connected");
320 } else {
321 exchange_manager.record_error(exchange);
322 }
323 }
324 MarketDataEvent::Error { exchange, .. } => {
325 exchange_manager.record_error(exchange);
326 stats.record_parsing_error();
327 }
328 }
329
330 event_batch.push(event);
331 events_processed.fetch_add(1, Ordering::Relaxed);
332
333 let should_send_batch = event_batch.len() >= 1000 ||
335 last_batch_time.elapsed().as_nanos() as u64 >= BATCH_TIMEOUT_NANOS;
336
337 if should_send_batch && !event_batch.is_empty() {
338 for event in event_batch.drain(..) {
339 if processing_sender.send(event).is_err() {
340 log::warn!("Processing channel closed");
341 return;
342 }
343 }
344 last_batch_time = QuantaInstant::now();
345 stats.record_buffer_reuse();
346 }
347 }
348 Err(_) => {
349 log::info!("Event channel closed, stopping routing task");
350 break;
351 }
352 }
353 }
354
355 _ = &mut shutdown_receiver => {
357 log::info!("Event routing task received shutdown signal");
358 break;
359 }
360 }
361
362 if shutdown_requested.load(Ordering::Acquire) {
364 break;
365 }
366 }
367
368 for event in event_batch {
370 let _ = processing_sender.send(event);
371 }
372
373 log::info!("Event routing task stopped");
374 });
375
376 self.management_handles.push(routing_handle);
377 }
378
379 {
381 let exchange_manager = self.exchange_manager.clone();
382 let stats = self.stats_collector.clone();
383 let shutdown_requested = self.shutdown_requested.clone();
384 let last_health_check = self.last_health_check.clone();
385 let health_check_interval = Duration::from_millis(self.config.health_check_interval_ms);
386
387 let health_handle = tokio::spawn(async move {
388 let mut interval = tokio::time::interval(health_check_interval);
389
390 loop {
391 tokio::select! {
392 _ = interval.tick() => {
393 let start_time = QuantaInstant::now();
394 let health_status = exchange_manager.health_check().await;
395 let health_check_latency = start_time.elapsed().as_nanos() as u64;
396
397 stats.update_latency(health_check_latency);
399 last_health_check.store(start_time.elapsed().as_nanos() as u64, Ordering::Relaxed);
400
401 for (exchange, healthy) in health_status {
403 if !healthy {
404 log::warn!("Exchange {exchange} health check failed");
405 }
406 }
407 }
408 }
409
410 if shutdown_requested.load(Ordering::Acquire) {
411 break;
412 }
413 }
414
415 log::info!("Health check task stopped");
416 });
417
418 self.management_handles.push(health_handle);
419 }
420
421 if self.config.enable_stats_cleanup {
423 let stats = self.stats_collector.clone();
424 let shutdown_requested = self.shutdown_requested.clone();
425 let last_cleanup = self.last_stats_cleanup.clone();
426 let cleanup_interval = Duration::from_millis(self.config.stats_cleanup_interval_ms);
427
428 let cleanup_handle = tokio::spawn(async move {
429 let mut interval = tokio::time::interval(cleanup_interval);
430
431 loop {
432 tokio::select! {
433 _ = interval.tick() => {
434 let snapshot = stats.get_snapshot();
436 last_cleanup.store(
437 std::time::SystemTime::now()
438 .duration_since(std::time::UNIX_EPOCH)
439 .unwrap_or_default()
440 .as_nanos() as u64,
441 Ordering::Relaxed
442 );
443
444 log::info!(
445 "Collection stats: {} events/sec, {} avg latency (ns), {} trades, {} orderbooks",
446 snapshot.events_per_second,
447 snapshot.avg_latency_nanos,
448 snapshot.trades_processed,
449 snapshot.orderbooks_processed
450 );
451 }
452 }
453
454 if shutdown_requested.load(Ordering::Acquire) {
455 break;
456 }
457 }
458
459 log::info!("Statistics cleanup task stopped");
460 });
461
462 self.management_handles.push(cleanup_handle);
463 }
464
465 Ok(())
466 }
467
468 pub fn get_comprehensive_stats(&self) -> ComprehensiveStats {
470 let stats_snapshot = self.stats_collector.get_snapshot();
471 let exchange_stats = self.exchange_manager.get_statistics();
472 let buffer_stats = self.buffer_pool.get_stats();
473 let buffer_counts = self.buffer_pool.get_available_counts();
474
475 let pipeline_stats = self
476 .zerocopy_pipeline
477 .as_ref()
478 .map(|p| p.get_stats())
479 .unwrap_or_default();
480
481 let uptime_nanos = self.start_time.elapsed().as_nanos() as u64;
482
483 ComprehensiveStats {
484 stats_snapshot,
485 exchange_stats,
486 buffer_stats,
487 buffer_counts,
488 pipeline_stats,
489 uptime_nanos,
490 events_processed_total: self.events_processed_total.load(Ordering::Relaxed),
491 last_health_check_nanos: self.last_health_check.load(Ordering::Relaxed),
492 last_stats_cleanup_nanos: self.last_stats_cleanup.load(Ordering::Relaxed),
493 }
494 }
495
496 #[inline(always)]
498 pub fn record_hf_event(&self, exchange: &str, bytes: u64, latency_nanos: u64) {
499 self.stats_collector.record_event();
501 self.stats_collector.update_latency(latency_nanos);
502 self.exchange_manager.record_message(exchange, bytes);
503 self.events_processed_total.fetch_add(1, Ordering::Relaxed);
504 }
505
506 pub fn request_shutdown(&self) {
508 self.shutdown_requested.store(true, Ordering::Release);
509 self.exchange_manager.request_shutdown();
510 }
511
512 pub fn is_shutdown_requested(&self) -> bool {
514 self.shutdown_requested.load(Ordering::Acquire)
515 }
516
517 pub async fn stop_all(&mut self) -> Result<()> {
519 log::info!("Stopping optimized collection manager");
520
521 self.request_shutdown();
523
524 if let Some(sender) = self.shutdown_sender.take() {
526 let _ = sender.send(());
527 }
528
529 if let Some(mut pipeline) = self.zerocopy_pipeline.take() {
531 pipeline.stop().await?;
532 }
533
534 for handle in self.management_handles.drain(..) {
536 handle.abort();
537 let _ = handle.await;
538 }
539
540 self.exchange_manager.shutdown().await?;
542
543 log::info!("Optimized collection manager stopped");
544 Ok(())
545 }
546}
547
548#[derive(Debug, Clone)]
550pub struct ComprehensiveStats {
551 pub stats_snapshot: StatsSnapshot,
556 pub exchange_stats: ExchangeManagerStats,
561 pub buffer_stats: crate::monitor::lockfree_buffer_pool::BufferPoolStats,
566 pub buffer_counts: crate::monitor::lockfree_buffer_pool::BufferCounts,
571 pub pipeline_stats: crate::monitor::zerocopy_pipeline::ZerocopyPipelineStats,
576 pub uptime_nanos: u64,
581 pub events_processed_total: u64,
586 pub last_health_check_nanos: u64,
591 pub last_stats_cleanup_nanos: u64,
596}
597
598impl ComprehensiveStats {
599 pub fn calculate_efficiency_metrics(&self) -> EfficiencyMetrics {
601 let uptime_seconds = self.uptime_nanos as f64 / 1_000_000_000.0;
602
603 let overall_throughput = if uptime_seconds > 0.0 {
604 self.events_processed_total as f64 / uptime_seconds
605 } else {
606 0.0
607 };
608
609 let memory_efficiency = if self.buffer_counts.total_capacity > 0 {
610 (self.buffer_stats.pool_hits as f64)
611 / ((self.buffer_stats.pool_hits + self.buffer_stats.pool_misses) as f64).max(1.0)
612 } else {
613 0.0
614 };
615
616 let processing_efficiency = if self.pipeline_stats.events_processed > 0 {
617 (self.pipeline_stats.zero_copy_operations as f64)
618 / (self.pipeline_stats.events_processed as f64)
619 } else {
620 0.0
621 };
622
623 EfficiencyMetrics {
624 overall_throughput_eps: overall_throughput,
625 memory_efficiency_ratio: memory_efficiency,
626 processing_efficiency_ratio: processing_efficiency,
627 avg_latency_microseconds: self.stats_snapshot.avg_latency_nanos as f64 / 1000.0,
628 buffer_utilization_ratio: {
629 let total_available = self.buffer_counts.serialization_available
630 + self.buffer_counts.compression_available
631 + self.buffer_counts.simd_available;
632 let total_capacity = self.buffer_counts.total_capacity * 3; if total_capacity > 0 {
634 1.0 - (total_available as f64 / total_capacity as f64)
635 } else {
636 0.0
637 }
638 },
639 }
640 }
641}
642
643#[derive(Debug, Clone)]
645pub struct EfficiencyMetrics {
646 pub overall_throughput_eps: f64,
651 pub memory_efficiency_ratio: f64,
656 pub processing_efficiency_ratio: f64,
661 pub avg_latency_microseconds: f64,
666 pub buffer_utilization_ratio: f64,
671}
672
673unsafe impl Send for OptimizedCollectionManager {}
675unsafe impl Sync for OptimizedCollectionManager {}
676
677#[cfg(test)]
678mod tests {
679 use super::*;
680 use crate::monitor::config::storage::StorageConfig;
681 use tempfile::TempDir;
682
683 #[tokio::test]
684 async fn test_optimized_collection_manager_creation() {
685 let temp_dir = TempDir::new().unwrap();
686 let storage_config = StorageConfig {
687 market_data_path: temp_dir.path().join("market_data"),
688 ..Default::default()
689 };
690 let storage_manager = Arc::new(StorageManager::new(storage_config).await.unwrap());
691
692 let config = OptimizedCollectionConfig::default();
693 let manager = OptimizedCollectionManager::new(config, storage_manager)
694 .await
695 .unwrap();
696
697 assert!(!manager.is_shutdown_requested());
698 assert_eq!(manager.events_processed_total.load(Ordering::Relaxed), 0);
699 }
700
701 #[tokio::test]
702 async fn test_optimized_collection_manager_stats() {
703 let temp_dir = TempDir::new().unwrap();
704 let storage_config = StorageConfig {
705 market_data_path: temp_dir.path().join("market_data"),
706 ..Default::default()
707 };
708 let storage_manager = Arc::new(StorageManager::new(storage_config).await.unwrap());
709
710 let config = OptimizedCollectionConfig::default();
711 let manager = OptimizedCollectionManager::new(config, storage_manager)
712 .await
713 .unwrap();
714
715 manager.record_hf_event("test_exchange", 1024, 500);
717 manager.record_hf_event("test_exchange", 2048, 750);
718
719 let stats = manager.get_comprehensive_stats();
720 assert_eq!(stats.events_processed_total, 2);
721 assert!(stats.uptime_nanos > 0);
722
723 let efficiency = stats.calculate_efficiency_metrics();
724 assert!(efficiency.overall_throughput_eps >= 0.0);
725 assert!(efficiency.avg_latency_microseconds >= 0.0);
726 }
727
728 #[tokio::test]
729 async fn test_optimized_collection_manager_shutdown() {
730 let temp_dir = TempDir::new().unwrap();
731 let storage_config = StorageConfig {
732 market_data_path: temp_dir.path().join("market_data"),
733 ..Default::default()
734 };
735 let storage_manager = Arc::new(StorageManager::new(storage_config).await.unwrap());
736
737 let config = OptimizedCollectionConfig::default();
738 let mut manager = OptimizedCollectionManager::new(config, storage_manager)
739 .await
740 .unwrap();
741
742 manager.request_shutdown();
744 assert!(manager.is_shutdown_requested());
745
746 manager.stop_all().await.unwrap();
748 }
749}