rusty_common/websocket/
batch.rs

1//! Batch processing support for WebSocket messages
2//!
3//! Provides efficient batch processing of WebSocket messages to improve throughput
4//! in high-frequency trading scenarios.
5
6use parking_lot::RwLock;
7use quanta::Clock;
8use std::sync::Arc;
9
10/// Batch processing metrics
11#[derive(Debug, Clone, Default)]
12pub struct BatchProcessingMetrics {
13    /// Maximum number of messages processed in a single batch
14    pub max_batch_size: u32,
15
16    /// Average number of messages processed in a batch
17    pub avg_batch_size: f64,
18
19    /// Total number of batches processed
20    pub total_batches: u64,
21
22    /// Total number of messages processed
23    pub total_messages: u64,
24
25    /// Average processing time per batch in nanoseconds
26    pub avg_batch_processing_time_nanoseconds: u64,
27
28    /// Maximum processing time for a batch in nanoseconds
29    pub max_batch_processing_time_nanoseconds: u64,
30}
31
32/// Batch processor for WebSocket messages
33#[derive(Debug)]
34pub struct BatchProcessor {
35    /// Batch metrics
36    metrics: Arc<RwLock<BatchProcessingMetrics>>,
37
38    /// Clock for timing
39    clock: Clock,
40
41    /// Default batch size
42    batch_size: usize,
43}
44
45impl BatchProcessor {
46    /// Create a new batch processor
47    #[must_use]
48    pub fn new(batch_size: usize) -> Self {
49        Self {
50            metrics: Arc::new(RwLock::new(BatchProcessingMetrics::default())),
51            clock: Clock::new(),
52            batch_size,
53        }
54    }
55
56    /// Update batch metrics after processing
57    pub fn update_metrics(&self, batch_size: usize, processing_time_ns: u64) {
58        let mut metrics = self.metrics.write();
59        metrics.total_batches += 1;
60        metrics.total_messages += batch_size as u64;
61
62        // Update max batch size if this batch was larger
63        if batch_size as u32 > metrics.max_batch_size {
64            metrics.max_batch_size = batch_size as u32;
65        }
66
67        // Update average batch size
68        metrics.avg_batch_size = (metrics.avg_batch_size * (metrics.total_batches - 1) as f64
69            + batch_size as f64)
70            / metrics.total_batches as f64;
71
72        // Update processing time metrics
73        if processing_time_ns > metrics.max_batch_processing_time_nanoseconds {
74            metrics.max_batch_processing_time_nanoseconds = processing_time_ns;
75        }
76
77        // Update average processing time
78        metrics.avg_batch_processing_time_nanoseconds =
79            (metrics.avg_batch_processing_time_nanoseconds * (metrics.total_batches - 1)
80                + processing_time_ns)
81                / metrics.total_batches;
82    }
83
84    /// Get current batch metrics
85    pub fn get_metrics(&self) -> BatchProcessingMetrics {
86        self.metrics.read().clone()
87    }
88
89    /// Get the configured batch size
90    pub const fn batch_size(&self) -> usize {
91        self.batch_size
92    }
93
94    /// Get the clock instance
95    pub const fn clock(&self) -> &Clock {
96        &self.clock
97    }
98}