rusty_common/websocket/
batch.rs1use parking_lot::RwLock;
7use quanta::Clock;
8use std::sync::Arc;
9
10#[derive(Debug, Clone, Default)]
12pub struct BatchProcessingMetrics {
13 pub max_batch_size: u32,
15
16 pub avg_batch_size: f64,
18
19 pub total_batches: u64,
21
22 pub total_messages: u64,
24
25 pub avg_batch_processing_time_nanoseconds: u64,
27
28 pub max_batch_processing_time_nanoseconds: u64,
30}
31
32#[derive(Debug)]
34pub struct BatchProcessor {
35 metrics: Arc<RwLock<BatchProcessingMetrics>>,
37
38 clock: Clock,
40
41 batch_size: usize,
43}
44
45impl BatchProcessor {
46 #[must_use]
48 pub fn new(batch_size: usize) -> Self {
49 Self {
50 metrics: Arc::new(RwLock::new(BatchProcessingMetrics::default())),
51 clock: Clock::new(),
52 batch_size,
53 }
54 }
55
56 pub fn update_metrics(&self, batch_size: usize, processing_time_ns: u64) {
58 let mut metrics = self.metrics.write();
59 metrics.total_batches += 1;
60 metrics.total_messages += batch_size as u64;
61
62 if batch_size as u32 > metrics.max_batch_size {
64 metrics.max_batch_size = batch_size as u32;
65 }
66
67 metrics.avg_batch_size = (metrics.avg_batch_size * (metrics.total_batches - 1) as f64
69 + batch_size as f64)
70 / metrics.total_batches as f64;
71
72 if processing_time_ns > metrics.max_batch_processing_time_nanoseconds {
74 metrics.max_batch_processing_time_nanoseconds = processing_time_ns;
75 }
76
77 metrics.avg_batch_processing_time_nanoseconds =
79 (metrics.avg_batch_processing_time_nanoseconds * (metrics.total_batches - 1)
80 + processing_time_ns)
81 / metrics.total_batches;
82 }
83
84 pub fn get_metrics(&self) -> BatchProcessingMetrics {
86 self.metrics.read().clone()
87 }
88
89 pub const fn batch_size(&self) -> usize {
91 self.batch_size
92 }
93
94 pub const fn clock(&self) -> &Clock {
96 &self.clock
97 }
98}