rusty_feeder/common/
circular_buffer.rs

1use std::mem::MaybeUninit;
2use std::ops::{Index, IndexMut};
3use std::ptr;
4
5/// Cache-friendly fixed-size circular buffer optimized for HFT applications
6/// Uses a contiguous memory layout and avoids heap allocations
7#[repr(align(64))] // Cache-line alignment for optimal CPU cache behavior
8pub struct CircularBuffer<T, const N: usize> {
9    /// Storage for elements (fixed size)
10    buffer: [MaybeUninit<T>; N],
11
12    /// Current read position
13    read_pos: usize,
14
15    /// Current write position
16    write_pos: usize,
17
18    /// Current number of elements
19    size: usize,
20}
21
22impl<T, const N: usize> CircularBuffer<T, N> {
23    /// Create a new empty circular buffer with fixed capacity
24    #[must_use]
25    pub const fn new() -> Self {
26        // Safe way to initialize an array of MaybeUninit
27        let buffer = unsafe {
28            // MaybeUninit doesn't need initialization
29            MaybeUninit::<[MaybeUninit<T>; N]>::uninit().assume_init()
30        };
31
32        Self {
33            buffer,
34            read_pos: 0,
35            write_pos: 0,
36            size: 0,
37        }
38    }
39
40    /// Returns the capacity of the buffer
41    #[inline(always)]
42    pub const fn capacity(&self) -> usize {
43        N
44    }
45
46    /// Returns the current number of elements in the buffer
47    #[inline(always)]
48    pub const fn len(&self) -> usize {
49        self.size
50    }
51
52    /// Returns true if the buffer is empty
53    #[inline(always)]
54    pub const fn is_empty(&self) -> bool {
55        self.size == 0
56    }
57
58    /// Returns true if the buffer is full
59    #[inline(always)]
60    pub const fn is_full(&self) -> bool {
61        self.size == N
62    }
63
64    /// Add an element to the buffer
65    /// Returns true if successful, false if buffer is full
66    #[inline]
67    pub fn push(&mut self, item: T) -> bool {
68        if self.is_full() {
69            return false;
70        }
71
72        // Write to the current write position
73        unsafe {
74            ptr::write(self.buffer[self.write_pos].as_mut_ptr(), item);
75        }
76
77        // Update write position and size
78        self.write_pos = (self.write_pos + 1) % N;
79        self.size += 1;
80
81        true
82    }
83
84    /// Remove and return the oldest element from the buffer
85    /// Returns None if the buffer is empty
86    #[inline]
87    pub const fn pop(&mut self) -> Option<T> {
88        if self.is_empty() {
89            return None;
90        }
91
92        // Read from the current read position
93        let item = unsafe { ptr::read(self.buffer[self.read_pos].as_ptr()) };
94
95        // Update read position and size
96        self.read_pos = (self.read_pos + 1) % N;
97        self.size -= 1;
98
99        Some(item)
100    }
101
102    /// Peek at the oldest element without removing it
103    /// Returns None if the buffer is empty
104    #[inline]
105    pub const fn peek(&self) -> Option<&T> {
106        if self.is_empty() {
107            return None;
108        }
109
110        // Return a reference to the element at read position
111        Some(unsafe { &*self.buffer[self.read_pos].as_ptr() })
112    }
113
114    /// Get a reference to an element at a specific index relative to the read position
115    /// Returns None if the index is out of bounds
116    #[inline]
117    pub const fn get(&self, index: usize) -> Option<&T> {
118        if index >= self.size {
119            return None;
120        }
121
122        let pos = (self.read_pos + index) % N;
123        Some(unsafe { &*self.buffer[pos].as_ptr() })
124    }
125
126    /// Get a mutable reference to an element at a specific index relative to the read position
127    /// Returns None if the index is out of bounds
128    #[inline]
129    pub fn get_mut(&mut self, index: usize) -> Option<&mut T> {
130        if index >= self.size {
131            return None;
132        }
133
134        let pos = (self.read_pos + index) % N;
135        Some(unsafe { &mut *self.buffer[pos].as_mut_ptr() })
136    }
137
138    /// Clear the buffer, dropping all elements
139    #[inline]
140    pub fn clear(&mut self) {
141        // Drop all elements
142        while self.pop().is_some() {}
143    }
144
145    /// Execute a function on each element in the buffer
146    #[inline]
147    pub fn for_each<F>(&self, mut f: F)
148    where
149        F: FnMut(&T),
150    {
151        let mut pos = self.read_pos;
152        for _ in 0..self.size {
153            unsafe {
154                f(&*self.buffer[pos].as_ptr());
155            }
156            pos = (pos + 1) % N;
157        }
158    }
159
160    /// Execute a function on each element in the buffer, allowing mutation
161    #[inline]
162    pub fn for_each_mut<F>(&mut self, mut f: F)
163    where
164        F: FnMut(&mut T),
165    {
166        let mut pos = self.read_pos;
167        for _ in 0..self.size {
168            unsafe {
169                f(&mut *self.buffer[pos].as_mut_ptr());
170            }
171            pos = (pos + 1) % N;
172        }
173    }
174
175    /// Convert the buffer to a Vec, consuming the buffer
176    pub fn to_vec(&self) -> Vec<T>
177    where
178        T: Clone,
179    {
180        let mut result = Vec::with_capacity(self.size);
181        self.for_each(|item| {
182            result.push(item.clone());
183        });
184        result
185    }
186
187    /// Overwrite the oldest element if the buffer is full
188    /// Always returns true (as it will always add the item)
189    #[inline]
190    pub fn push_overwrite(&mut self, item: T) -> bool {
191        if self.is_full() {
192            // Drop the oldest element
193            let _ = self.pop();
194        }
195
196        self.push(item)
197    }
198}
199
200impl<T, const N: usize> Index<usize> for CircularBuffer<T, N> {
201    type Output = T;
202
203    #[inline]
204    fn index(&self, index: usize) -> &Self::Output {
205        self.get(index).expect("Index out of bounds")
206    }
207}
208
209impl<T, const N: usize> IndexMut<usize> for CircularBuffer<T, N> {
210    #[inline]
211    fn index_mut(&mut self, index: usize) -> &mut Self::Output {
212        self.get_mut(index).expect("Index out of bounds")
213    }
214}
215
216impl<T, const N: usize> Drop for CircularBuffer<T, N> {
217    fn drop(&mut self) {
218        // Drop all elements
219        self.clear();
220    }
221}
222
223impl<T: Clone, const N: usize> Clone for CircularBuffer<T, N> {
224    fn clone(&self) -> Self {
225        let mut new_buffer = Self::new();
226
227        // Clone all elements into the new buffer
228        self.for_each(|item| {
229            let _ = new_buffer.push(item.clone());
230        });
231
232        new_buffer
233    }
234}
235
236// Only implement Default if T implements Default
237impl<T, const N: usize> Default for CircularBuffer<T, N> {
238    fn default() -> Self {
239        Self::new()
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246
247    #[test]
248    fn test_push_pop() {
249        let mut buffer = CircularBuffer::<i32, 5>::new();
250
251        // Push elements
252        assert!(buffer.push(1));
253        assert!(buffer.push(2));
254        assert!(buffer.push(3));
255
256        // Check size
257        assert_eq!(buffer.len(), 3);
258        assert!(!buffer.is_empty());
259        assert!(!buffer.is_full());
260
261        // Pop elements
262        assert_eq!(buffer.pop(), Some(1));
263        assert_eq!(buffer.pop(), Some(2));
264        assert_eq!(buffer.pop(), Some(3));
265
266        // Buffer should be empty now
267        assert_eq!(buffer.len(), 0);
268        assert!(buffer.is_empty());
269        assert!(!buffer.is_full());
270
271        // Popping from an empty buffer should return None
272        assert_eq!(buffer.pop(), None);
273    }
274
275    #[test]
276    fn test_full_buffer() {
277        let mut buffer = CircularBuffer::<i32, 3>::new();
278
279        // Fill the buffer
280        assert!(buffer.push(1));
281        assert!(buffer.push(2));
282        assert!(buffer.push(3));
283
284        // Buffer should be full
285        assert_eq!(buffer.len(), 3);
286        assert!(buffer.is_full());
287
288        // Pushing to a full buffer should fail
289        assert!(!buffer.push(4));
290
291        // Buffer contents should remain unchanged
292        assert_eq!(buffer.pop(), Some(1));
293        assert_eq!(buffer.pop(), Some(2));
294        assert_eq!(buffer.pop(), Some(3));
295    }
296
297    #[test]
298    fn test_circular_behavior() {
299        let mut buffer = CircularBuffer::<i32, 3>::new();
300
301        // Fill the buffer
302        assert!(buffer.push(1));
303        assert!(buffer.push(2));
304        assert!(buffer.push(3));
305
306        // Remove first element
307        assert_eq!(buffer.pop(), Some(1));
308
309        // Add a new element
310        assert!(buffer.push(4));
311
312        // Check the content
313        assert_eq!(buffer.pop(), Some(2));
314        assert_eq!(buffer.pop(), Some(3));
315        assert_eq!(buffer.pop(), Some(4));
316    }
317
318    #[test]
319    fn test_peek_and_get() {
320        let mut buffer = CircularBuffer::<i32, 5>::new();
321
322        // Push elements
323        buffer.push(10);
324        buffer.push(20);
325        buffer.push(30);
326
327        // Test peek
328        assert_eq!(buffer.peek(), Some(&10));
329
330        // Test get
331        assert_eq!(buffer.get(0), Some(&10));
332        assert_eq!(buffer.get(1), Some(&20));
333        assert_eq!(buffer.get(2), Some(&30));
334        assert_eq!(buffer.get(3), None); // Out of bounds
335
336        // Test get_mut
337        if let Some(value) = buffer.get_mut(1) {
338            *value = 25;
339        }
340
341        assert_eq!(buffer.get(1), Some(&25));
342    }
343
344    #[test]
345    fn test_index_operator() {
346        let mut buffer = CircularBuffer::<i32, 5>::new();
347
348        buffer.push(10);
349        buffer.push(20);
350        buffer.push(30);
351
352        // Test index operator
353        assert_eq!(buffer[0], 10);
354        assert_eq!(buffer[1], 20);
355        assert_eq!(buffer[2], 30);
356
357        // Test mutable index operator
358        buffer[1] = 25;
359        assert_eq!(buffer[1], 25);
360    }
361
362    #[test]
363    fn test_clear() {
364        let mut buffer = CircularBuffer::<i32, 5>::new();
365
366        buffer.push(10);
367        buffer.push(20);
368        buffer.push(30);
369
370        buffer.clear();
371
372        assert_eq!(buffer.len(), 0);
373        assert!(buffer.is_empty());
374        assert_eq!(buffer.pop(), None);
375    }
376
377    #[test]
378    fn test_for_each() {
379        let mut buffer = CircularBuffer::<i32, 5>::new();
380
381        buffer.push(1);
382        buffer.push(2);
383        buffer.push(3);
384
385        let mut sum = 0;
386        buffer.for_each(|&x| sum += x);
387
388        assert_eq!(sum, 6);
389    }
390
391    #[test]
392    fn test_for_each_mut() {
393        let mut buffer = CircularBuffer::<i32, 5>::new();
394
395        buffer.push(1);
396        buffer.push(2);
397        buffer.push(3);
398
399        buffer.for_each_mut(|x| *x *= 2);
400
401        assert_eq!(buffer[0], 2);
402        assert_eq!(buffer[1], 4);
403        assert_eq!(buffer[2], 6);
404    }
405
406    #[test]
407    fn test_to_vec() {
408        let mut buffer = CircularBuffer::<i32, 5>::new();
409
410        buffer.push(1);
411        buffer.push(2);
412        buffer.push(3);
413
414        let vec = buffer.to_vec();
415
416        assert_eq!(vec, vec![1, 2, 3]);
417    }
418
419    #[test]
420    fn test_push_overwrite() {
421        let mut buffer = CircularBuffer::<i32, 3>::new();
422
423        // Fill the buffer
424        buffer.push(1);
425        buffer.push(2);
426        buffer.push(3);
427
428        // Overwrite the oldest element
429        assert!(buffer.push_overwrite(4));
430
431        // Check the content (1 should be overwritten)
432        assert_eq!(buffer.pop(), Some(2));
433        assert_eq!(buffer.pop(), Some(3));
434        assert_eq!(buffer.pop(), Some(4));
435    }
436}