rusty_engine/monitoring/
ring_buffer.rs

1//! Lock-free ring buffer implementation for zero-latency metric recording
2//!
3//! This module provides a single-producer, multiple-consumer (SPMC) ring buffer
4//! optimized for high-frequency metric recording with minimal overhead.
5//!
6//! # Architecture Overview
7//!
8//! The ring buffer uses atomic operations to avoid locks in the hot path, ensuring
9//! minimal impact on trading system performance. It supports a single producer with
10//! multiple concurrent consumers.
11//!
12//! **Key Design Features:**
13//! - Zero-allocation in hot paths (all memory pre-allocated)
14//! - Lock-free operations using compare-and-swap
15//! - Cache-friendly data layout with alignment
16//! - Overflow tracking for capacity monitoring
17//!
18//! # Capacity Requirements and Planning
19//!
20//! ## Power of 2 Requirement
21//!
22//! **MANDATORY**: Capacity must be a power of 2 (2, 4, 8, 16, 32, 64, 128, ...).
23//! This enables:
24//! - Bitwise AND operations (`& capacity_mask`) instead of expensive modulo
25//! - ~10-20% performance improvement in hot paths
26//! - Optimal cache line alignment for atomic operations
27//!
28//! ## Effective Capacity
29//!
30//! **Important**: Effective capacity is `capacity - 1` due to full/empty distinction.
31//! The buffer reserves one slot to differentiate between full and empty states.
32//!
33//! ## Capacity Planning Formula
34//!
35//! ```text
36//! Required_Capacity = Peak_Throughput_Per_Second × Max_Consumer_Delay_Seconds × Safety_Factor
37//!
38//! Where:
39//! - Peak_Throughput_Per_Second: Maximum messages/second during bursts
40//! - Max_Consumer_Delay_Seconds: Worst-case time between drain operations
41//! - Safety_Factor: 2-5x buffer (3x recommended for production)
42//! ```
43//!
44//! ## HFT Capacity Guidelines by Use Case
45//!
46//! | Use Case | Frequency | Recommended Capacity | Effective Items | Memory (8-byte) | Cache Level |
47//! |----------|-----------|---------------------|-----------------|-----------------|-------------|
48//! | Trade execution | Sub-μs | 64-128 | 63-127 | ~0.5-1KB | L1 |
49//! | Order book updates | μs | 512-1024 | 511-1023 | ~4-8KB | L1/L2 |
50//! | Risk calculations | ms | 2048-4096 | 2047-4095 | ~16-32KB | L2 |
51//! | Strategy signals | s | 8192-16384 | 8191-16383 | ~64-128KB | L2/L3 |
52//! | Historical logging | min+ | 32768+ | 32767+ | ~256KB+ | Memory |
53//!
54//! ## Performance Characteristics by Capacity
55//!
56//! - **64-256 items**: Ultra-low latency (~10-50ns), fits in L1 cache (32-64KB)
57//! - **512-2048 items**: High performance (~50-100ns), excellent L2 utilization
58//! - **4096-8192 items**: Balanced (~100-200ns), L3 cache friendly
59//! - **16384+ items**: High capacity (~200ns+), may cause cache misses
60//!
61//! ## Memory Usage Calculation
62//!
63//! ```text
64//! Total Memory = capacity × size_of::<T>() + overhead (~64 bytes)
65//!
66//! Examples:
67//! - capacity=64, T=u64: ~576 bytes (fits in single cache line group)
68//! - capacity=1024, T=u64: ~8KB (good L1 cache utilization)
69//! - capacity=8192, T=u64: ~64KB (fits comfortably in L2 cache)
70//! ```
71//!
72//! # Overflow Monitoring and Capacity Adjustment
73//!
74//! ## Overflow Implications
75//!
76//! Each overflow represents:
77//! - **Lost metric data** that could impact trading decisions
78//! - **Potential performance degradation** due to buffer pressure
79//! - **Need for capacity adjustment** or consumer optimization
80//!
81//! ## Production Monitoring Guidelines
82//!
83//! - **Zero overflows**: Optimal capacity, consider reducing if over-provisioned
84//! - **Rare overflows (< 0.1%)**: Acceptable for non-critical metrics
85//! - **Regular overflows (> 1%)**: Increase capacity or optimize consumers
86//! - **Frequent overflows (> 10%)**: Critical issue, system overload
87//!
88//! ## Capacity Adjustment Strategy
89//!
90//! Based on overflow rate per minute:
91//! - **> 100 overflows/min**: Double capacity immediately
92//! - **10-100 overflows/min**: Increase capacity by 50%
93//! - **1-10 overflows/min**: Monitor trends, minor adjustment
94//! - **0 overflows/min**: Current capacity sufficient
95//!
96//! # Usage Examples
97//!
98//! ## Basic Usage
99//!
100//! ```rust
101//! use crate::monitoring::ring_buffer::SharedRingBuffer;
102//!
103//! // Create buffer for order book updates
104//! let buffer = SharedRingBuffer::new(1024);
105//!
106//! // Producer: Push metrics
107//! if !buffer.push(metric_data) {
108//!     eprintln!("Buffer overflow! Consider increasing capacity");
109//! }
110//!
111//! // Consumer: Drain in batches
112//! let metrics = buffer.drain(64);
113//! for metric in metrics {
114//!     process_metric(metric);
115//! }
116//! ```
117//!
118//! ## Capacity Planning Examples
119//!
120//! ```rust
121//! use crate::monitoring::ring_buffer::SharedRingBuffer;
122//!
123//! // Example 1: High-frequency order book (50K updates/sec, 1ms drain)
124//! // Required: 50,000 × 0.001 = 50, Safety: 4x = 200, Next power of 2: 256
125//! let order_book = SharedRingBuffer::new(256);
126//!
127//! // Example 2: Strategy signals (1K signals/sec, 10ms drain)
128//! // Required: 1,000 × 0.01 = 10, Safety: 8x = 80, Next power of 2: 128
129//! let strategy = SharedRingBuffer::new(128);
130//!
131//! // Example 3: Risk metrics (10K metrics/sec, 5ms drain)
132//! // Required: 10,000 × 0.005 = 50, Safety: 5x = 250, Next power of 2: 512
133//! let risk = SharedRingBuffer::new(512);
134//! ```
135//!
136//! # Performance Optimization Guidelines
137//!
138//! ## Multi-Consumer Efficiency
139//!
140//! - Use `drain(batch_size)` instead of repeated `pop()` calls
141//! - Recommended batch sizes: 16-64 items per drain operation
142//! - Each consumer competes for items (no broadcasting)
143//!
144//! ## Overflow Handling Strategy
145//!
146//! When buffer overflows:
147//! - Data is lost (ring buffer does not resize)
148//! - Monitor `overflow_count()` for capacity planning
149//! - Consider: larger capacity, faster consumers, or multiple buffers
150
151use std::cell::UnsafeCell;
152use std::mem::MaybeUninit;
153use std::sync::Arc;
154use std::sync::atomic::{AtomicUsize, Ordering};
155
156/// A lock-free ring buffer for metric storage
157///
158/// This implementation uses atomic operations to avoid locks in the hot path,
159/// ensuring minimal impact on trading system performance.
160pub struct RingBuffer<T: Clone> {
161    /// The buffer storage
162    buffer: Vec<UnsafeCell<MaybeUninit<T>>>,
163    /// Write position (only modified by single producer)
164    write_pos: AtomicUsize,
165    /// Read position (can be modified by multiple consumers)
166    read_pos: AtomicUsize,
167    /// Capacity mask (capacity - 1, for fast modulo)
168    capacity_mask: usize,
169    /// Total capacity
170    capacity: usize,
171    /// Overflow counter
172    overflow_count: AtomicUsize,
173}
174
175unsafe impl<T: Clone + Send> Send for RingBuffer<T> {}
176unsafe impl<T: Clone + Send> Sync for RingBuffer<T> {}
177
178impl<T: Clone> RingBuffer<T> {
179    /// Create a new ring buffer with the specified capacity
180    ///
181    /// # Arguments
182    ///
183    /// * `capacity` - Buffer capacity, **must be a power of 2** (2, 4, 8, 16, 32, 64, 128, ...)
184    ///
185    /// # Returns
186    ///
187    /// A new RingBuffer with effective capacity of `capacity - 1` items.
188    ///
189    /// # Panics
190    ///
191    /// Panics if capacity is not a power of 2 or is zero.
192    ///
193    /// # Example
194    ///
195    /// ```rust
196    /// # use crate::monitoring::ring_buffer::RingBuffer;
197    /// let buffer = RingBuffer::new(1024);  // Can hold 1023 items
198    /// ```
199    #[must_use]
200    pub fn new(capacity: usize) -> Self {
201        assert!(capacity.is_power_of_two(), "Capacity must be a power of 2");
202
203        let mut buffer = Vec::with_capacity(capacity);
204        for _ in 0..capacity {
205            buffer.push(UnsafeCell::new(MaybeUninit::uninit()));
206        }
207
208        Self {
209            buffer,
210            write_pos: AtomicUsize::new(0),
211            read_pos: AtomicUsize::new(0),
212            capacity_mask: capacity - 1,
213            capacity,
214            overflow_count: AtomicUsize::new(0),
215        }
216    }
217
218    /// Push an item into the ring buffer
219    ///
220    /// Returns `true` if the item was successfully pushed, `false` if the buffer was full.
221    /// When full, the overflow counter is incremented and data is lost.
222    ///
223    /// # Arguments
224    ///
225    /// * `item` - The item to push into the buffer
226    ///
227    /// # Returns
228    ///
229    /// `true` if successful, `false` if buffer was full (overflow)
230    #[inline(always)]
231    pub fn push(&self, item: T) -> bool {
232        let write_pos = self.write_pos.load(Ordering::Acquire);
233        let read_pos = self.read_pos.load(Ordering::Acquire);
234
235        // Check if buffer is full
236        let next_write_pos = (write_pos + 1) & self.capacity_mask;
237        let is_full = next_write_pos == read_pos;
238
239        if is_full {
240            // Buffer is full, increment overflow counter
241            self.overflow_count.fetch_add(1, Ordering::Relaxed);
242            // Optionally, we could force advance the read position here
243            // to make room, but that would add complexity
244            return false;
245        }
246
247        // Write the item
248        unsafe {
249            let slot_ptr = self.buffer[write_pos].get();
250            (*slot_ptr).write(item);
251        }
252
253        // Update write position
254        self.write_pos.store(next_write_pos, Ordering::Release);
255
256        true
257    }
258
259    /// Pop an item from the ring buffer
260    ///
261    /// # Returns
262    ///
263    /// `Some(item)` if an item was available, `None` if the buffer was empty
264    pub fn pop(&self) -> Option<T> {
265        loop {
266            let read_pos = self.read_pos.load(Ordering::Acquire);
267            let write_pos = self.write_pos.load(Ordering::Acquire);
268
269            // Check if buffer is empty
270            if read_pos == write_pos {
271                return None;
272            }
273
274            // Try to advance read position
275            let next_read_pos = (read_pos + 1) & self.capacity_mask;
276
277            // Use compare_exchange to ensure we're the only one reading this slot
278            match self.read_pos.compare_exchange(
279                read_pos,
280                next_read_pos,
281                Ordering::AcqRel,
282                Ordering::Acquire,
283            ) {
284                Ok(_) => {
285                    // Successfully claimed this slot
286                    unsafe {
287                        let slot = &*self.buffer[read_pos].get();
288                        let item = slot.assume_init_read().clone();
289                        return Some(item);
290                    }
291                }
292                Err(_) => {
293                    // Another thread beat us, retry
294                    continue;
295                }
296            }
297        }
298    }
299
300    /// Drain up to `max_items` from the buffer
301    ///
302    /// More efficient than repeatedly calling `pop()` for batch operations.
303    ///
304    /// # Arguments
305    ///
306    /// * `max_items` - Maximum number of items to drain
307    ///
308    /// # Returns
309    ///
310    /// Vector of drained items (may be fewer than `max_items`)
311    pub fn drain(&self, max_items: usize) -> Vec<T> {
312        let mut items = Vec::with_capacity(max_items.min(self.capacity));
313
314        for _ in 0..max_items {
315            match self.pop() {
316                Some(item) => items.push(item),
317                None => break,
318            }
319        }
320
321        items
322    }
323
324    /// Returns the current number of items in the buffer
325    pub fn len(&self) -> usize {
326        let write_pos = self.write_pos.load(Ordering::Acquire);
327        let read_pos = self.read_pos.load(Ordering::Acquire);
328
329        if write_pos >= read_pos {
330            write_pos - read_pos
331        } else {
332            self.capacity - read_pos + write_pos
333        }
334    }
335
336    /// Returns `true` if the buffer contains no items
337    pub fn is_empty(&self) -> bool {
338        self.write_pos.load(Ordering::Acquire) == self.read_pos.load(Ordering::Acquire)
339    }
340
341    /// Returns the total number of overflow events
342    pub fn overflow_count(&self) -> usize {
343        self.overflow_count.load(Ordering::Relaxed)
344    }
345
346    /// Resets the overflow counter to zero
347    pub fn reset_overflow_count(&self) {
348        self.overflow_count.store(0, Ordering::Relaxed);
349    }
350}
351
352/// A thread-safe wrapper around RingBuffer for shared access
353pub struct SharedRingBuffer<T: Clone> {
354    inner: Arc<RingBuffer<T>>,
355}
356
357impl<T: Clone> SharedRingBuffer<T> {
358    /// Creates a new shared ring buffer with the specified capacity
359    ///
360    /// This is the primary interface for creating ring buffers in the HFT system.
361    /// The buffer is wrapped in an `Arc` for efficient sharing between threads.
362    ///
363    /// # Arguments
364    ///
365    /// * `capacity` - Buffer capacity, **must be a power of 2** (2, 4, 8, 16, 32, 64, 128, ...)
366    ///
367    /// # Returns
368    ///
369    /// A SharedRingBuffer with effective capacity of `capacity - 1` items.
370    ///
371    /// # Panics
372    ///
373    /// Panics if capacity is not a power of 2 or is zero.
374    ///
375    /// # Example
376    ///
377    /// ```rust
378    /// # use crate::monitoring::ring_buffer::SharedRingBuffer;
379    /// let buffer = SharedRingBuffer::new(1024);  // For high-frequency data
380    /// ```
381    #[must_use]
382    pub fn new(capacity: usize) -> Self {
383        Self {
384            inner: Arc::new(RingBuffer::new(capacity)),
385        }
386    }
387
388    /// Push an item into the ring buffer
389    ///
390    /// # Arguments
391    ///
392    /// * `item` - The item to push
393    ///
394    /// # Returns
395    ///
396    /// `true` if successful, `false` if buffer was full
397    pub fn push(&self, item: T) -> bool {
398        self.inner.push(item)
399    }
400
401    /// Pop an item from the ring buffer
402    ///
403    /// # Returns
404    ///
405    /// `Some(item)` if available, `None` if buffer was empty
406    pub fn pop(&self) -> Option<T> {
407        self.inner.pop()
408    }
409
410    /// Drain up to `max_items` from the ring buffer
411    ///
412    /// More efficient than repeatedly calling `pop()` for batch operations.
413    ///
414    /// # Arguments
415    ///
416    /// * `max_items` - Maximum number of items to drain
417    ///
418    /// # Returns
419    ///
420    /// Vector of drained items (may be fewer than requested)
421    pub fn drain(&self, max_items: usize) -> Vec<T> {
422        self.inner.drain(max_items)
423    }
424
425    /// Returns the current number of items in the buffer
426    pub fn len(&self) -> usize {
427        self.inner.len()
428    }
429
430    /// Returns `true` if the buffer contains no items
431    pub fn is_empty(&self) -> bool {
432        self.inner.is_empty()
433    }
434
435    /// Returns the number of times the ring buffer has overflowed
436    ///
437    /// An overflow occurs when `push()` is called on a full buffer, resulting in data loss.
438    /// This counter is critical for monitoring buffer saturation and capacity planning.
439    ///
440    /// # Returns
441    ///
442    /// The total number of overflow events since buffer creation or last reset.
443    ///
444    /// # Example
445    ///
446    /// ```rust
447    /// # use crate::monitoring::ring_buffer::SharedRingBuffer;
448    /// let buffer = SharedRingBuffer::new(64);
449    ///
450    /// // Monitor overflow rate
451    /// let initial = buffer.overflow_count();
452    /// // ... time passes ...
453    /// let overflows = buffer.overflow_count() - initial;
454    /// if overflows > 0 {
455    ///     eprintln!("Warning: {} overflows detected", overflows);
456    /// }
457    /// ```
458    pub fn overflow_count(&self) -> usize {
459        self.inner.overflow_count()
460    }
461}
462
463impl<T: Clone> Clone for SharedRingBuffer<T> {
464    fn clone(&self) -> Self {
465        Self {
466            inner: Arc::clone(&self.inner),
467        }
468    }
469}
470
471#[cfg(test)]
472mod tests {
473    use super::*;
474    use std::sync::Arc;
475    use std::thread;
476
477    #[test]
478    fn test_ring_buffer_basic() {
479        let rb = RingBuffer::new(4);
480
481        // Test empty buffer
482        assert!(rb.is_empty());
483        assert_eq!(rb.len(), 0);
484        assert!(rb.pop().is_none());
485
486        // Test push and pop
487        assert!(rb.push(1));
488        assert!(rb.push(2));
489        assert_eq!(rb.len(), 2);
490
491        assert_eq!(rb.pop(), Some(1));
492        assert_eq!(rb.pop(), Some(2));
493        assert!(rb.pop().is_none());
494    }
495
496    #[test]
497    fn test_ring_buffer_wrap_around() {
498        let rb = RingBuffer::new(4);
499
500        // CRITICAL TEST: Verify overflow counter starts at 0
501        assert_eq!(rb.overflow_count(), 0, "Overflow counter should start at 0");
502
503        // Fill buffer (capacity 4 can hold 3 items due to full/empty distinction)
504        assert!(rb.push(1));
505        assert!(rb.push(2));
506        assert!(rb.push(3));
507
508        // CRITICAL: No overflow should occur when buffer is exactly full but not overflowing
509        assert_eq!(
510            rb.overflow_count(),
511            0,
512            "Overflow counter should remain 0 when buffer is full but not overflowing"
513        );
514
515        // Pop some items to make space
516        assert_eq!(rb.pop(), Some(1));
517        assert_eq!(rb.pop(), Some(2));
518
519        // CRITICAL: Verify overflow counter remains 0 after popping
520        assert_eq!(
521            rb.overflow_count(),
522            0,
523            "Overflow counter should remain 0 after popping items"
524        );
525
526        // Push more items (wrap around) - we now have 1 item (3) so we can push 2 more
527        // This tests the wrap-around behavior where write_pos wraps back to 0
528        assert!(rb.push(4));
529        assert_eq!(
530            rb.overflow_count(),
531            0,
532            "Overflow counter should remain 0 during wrap-around (push 4)"
533        );
534
535        assert!(rb.push(5));
536        assert_eq!(
537            rb.overflow_count(),
538            0,
539            "Overflow counter should remain 0 during wrap-around (push 5)"
540        );
541
542        // Pop one item to make space for one more
543        assert_eq!(rb.pop(), Some(3));
544        assert_eq!(
545            rb.overflow_count(),
546            0,
547            "Overflow counter should remain 0 after popping during wrap-around"
548        );
549
550        // Now we can push one more item (another wrap-around operation)
551        assert!(rb.push(6));
552        assert_eq!(
553            rb.overflow_count(),
554            0,
555            "Overflow counter should remain 0 during second wrap-around (push 6)"
556        );
557
558        // Verify remaining order and final overflow counter state
559        assert_eq!(rb.pop(), Some(4));
560        assert_eq!(rb.pop(), Some(5));
561        assert_eq!(rb.pop(), Some(6));
562        assert!(rb.pop().is_none());
563
564        // FINAL CRITICAL VERIFICATION: Overflow counter should still be 0
565        // This confirms that wrap-around operations are NOT overflow conditions
566        assert_eq!(
567            rb.overflow_count(),
568            0,
569            "Overflow counter should remain 0 throughout normal wrap-around operations"
570        );
571    }
572
573    #[test]
574    fn test_ring_buffer_overflow() {
575        let rb = RingBuffer::new(4);
576
577        // Fill buffer completely (capacity - 1 items)
578        assert!(rb.push(1));
579        assert!(rb.push(2));
580        assert!(rb.push(3));
581
582        // This should fail (buffer full)
583        assert!(!rb.push(4));
584        assert_eq!(rb.overflow_count(), 1);
585    }
586
587    #[test]
588    fn test_ring_buffer_concurrent() {
589        let rb = Arc::new(RingBuffer::new(1024));
590        let rb_clone = Arc::clone(&rb);
591
592        // Producer thread
593        let producer = thread::spawn(move || {
594            for i in 0..1000 {
595                while !rb_clone.push(i) {
596                    // Buffer full, yield
597                    thread::yield_now();
598                }
599            }
600        });
601
602        // Consumer thread
603        let consumer = thread::spawn(move || {
604            let mut items = Vec::new();
605            while items.len() < 1000 {
606                if let Some(item) = rb.pop() {
607                    items.push(item);
608                } else {
609                    thread::yield_now();
610                }
611            }
612            items
613        });
614
615        producer.join().unwrap();
616        let items = consumer.join().unwrap();
617
618        // Verify we got all items in order
619        assert_eq!(items.len(), 1000);
620        for (i, &item) in items.iter().enumerate() {
621            assert_eq!(item, i);
622        }
623    }
624
625    #[test]
626    fn test_shared_ring_buffer() {
627        let srb = SharedRingBuffer::new(8);
628        let srb_clone = srb.clone();
629
630        assert!(srb.push(42));
631        assert_eq!(srb_clone.pop(), Some(42));
632    }
633
634    #[test]
635    #[should_panic(expected = "Capacity must be a power of 2")]
636    fn test_non_power_of_two_capacity() {
637        let _rb = RingBuffer::<i32>::new(10);
638    }
639}