1use crossbeam_utils::CachePadded;
101use simd_aligned::VecSimd;
102use std::ptr;
103use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
104use wide::f64x4;
105
106#[derive(Debug, Clone)]
108pub struct LockFreeBufferPoolConfig {
109 pub buffer_count: usize,
111 pub serialization_buffer_size: usize,
113 pub compression_buffer_size: usize,
115 pub simd_buffer_elements: usize,
117}
118
119impl Default for LockFreeBufferPoolConfig {
120 fn default() -> Self {
121 Self {
122 buffer_count: 256, serialization_buffer_size: 128 * 1024, compression_buffer_size: 64 * 1024, simd_buffer_elements: 64, }
127 }
128}
129
130#[repr(align(64))] struct BufferNode<T> {
133 buffer: T,
134 next: AtomicPtr<BufferNode<T>>,
135}
136
137impl<T> BufferNode<T> {
138 fn new(buffer: T) -> Box<Self> {
139 Box::new(Self {
140 buffer,
141 next: AtomicPtr::new(ptr::null_mut()),
142 })
143 }
144}
145
146#[repr(align(64))] struct LockFreeStack<T> {
149 head: CachePadded<AtomicPtr<BufferNode<T>>>,
150 _phantom: std::marker::PhantomData<T>,
151}
152
153impl<T> LockFreeStack<T> {
154 const fn new() -> Self {
155 Self {
156 head: CachePadded::new(AtomicPtr::new(ptr::null_mut())),
157 _phantom: std::marker::PhantomData,
158 }
159 }
160
161 fn push(&self, buffer: T) {
163 let new_node = Box::into_raw(BufferNode::new(buffer));
164
165 loop {
166 let current_head = self.head.load(Ordering::Acquire);
167 unsafe {
168 (*new_node).next.store(current_head, Ordering::Relaxed);
169 }
170
171 match self.head.compare_exchange_weak(
173 current_head,
174 new_node,
175 Ordering::Release,
176 Ordering::Relaxed,
177 ) {
178 Ok(_) => break,
179 Err(_) => continue, }
181 }
182 }
183
184 fn pop(&self) -> Option<T> {
186 loop {
187 let current_head = self.head.load(Ordering::Acquire);
188 if current_head.is_null() {
189 return None;
190 }
191
192 let next = unsafe { (*current_head).next.load(Ordering::Acquire) };
193
194 match self.head.compare_exchange_weak(
195 current_head,
196 next,
197 Ordering::Release,
198 Ordering::Relaxed,
199 ) {
200 Ok(_) => {
201 let buffer = unsafe {
202 let node = Box::from_raw(current_head);
203 node.buffer
204 };
205 return Some(buffer);
206 }
207 Err(_) => continue, }
209 }
210 }
211}
212
213impl<T> Drop for LockFreeStack<T> {
214 fn drop(&mut self) {
215 while self.pop().is_some() {}
216 }
217}
218
219unsafe impl<T: Send> Send for LockFreeStack<T> {}
221unsafe impl<T: Send> Sync for LockFreeStack<T> {}
222
223pub struct LockFreeBufferPool {
225 serialization_buffers: LockFreeStack<Vec<u8>>,
227 compression_buffers: LockFreeStack<Vec<u8>>,
228 simd_buffers: LockFreeStack<VecSimd<f64x4>>,
229
230 allocations: CachePadded<AtomicUsize>,
232 deallocations: CachePadded<AtomicUsize>,
233 pool_hits: CachePadded<AtomicUsize>,
234 pool_misses: CachePadded<AtomicUsize>,
235
236 config: LockFreeBufferPoolConfig,
238}
239
240impl LockFreeBufferPool {
241 pub fn new(config: LockFreeBufferPoolConfig) -> Self {
243 let pool = Self {
244 serialization_buffers: LockFreeStack::new(),
245 compression_buffers: LockFreeStack::new(),
246 simd_buffers: LockFreeStack::new(),
247 allocations: CachePadded::new(AtomicUsize::new(0)),
248 deallocations: CachePadded::new(AtomicUsize::new(0)),
249 pool_hits: CachePadded::new(AtomicUsize::new(0)),
250 pool_misses: CachePadded::new(AtomicUsize::new(0)),
251 config: config.clone(),
252 };
253
254 pool.preallocate_buffers();
256 pool
257 }
258
259 fn preallocate_buffers(&self) {
261 for _ in 0..self.config.buffer_count {
263 let mut buffer = Vec::with_capacity(self.config.serialization_buffer_size);
264 buffer.clear(); self.serialization_buffers.push(buffer);
266 }
267
268 for _ in 0..self.config.buffer_count {
270 let mut buffer = Vec::with_capacity(self.config.compression_buffer_size);
271 buffer.clear();
272 self.compression_buffers.push(buffer);
273 }
274
275 for _ in 0..self.config.buffer_count {
277 let buffer = VecSimd::<f64x4>::with(0.0, self.config.simd_buffer_elements);
278 self.simd_buffers.push(buffer);
279 }
280 }
281
282 #[inline(always)]
284 pub fn get_serialization_buffer(&self) -> Vec<u8> {
285 if let Some(mut buffer) = self.serialization_buffers.pop() {
286 buffer.clear(); self.pool_hits.fetch_add(1, Ordering::Relaxed);
288 buffer
289 } else {
290 self.pool_misses.fetch_add(1, Ordering::Relaxed);
292 self.allocations.fetch_add(1, Ordering::Relaxed);
293 Vec::with_capacity(self.config.serialization_buffer_size)
294 }
295 }
296
297 #[inline(always)]
299 pub fn return_serialization_buffer(&self, buffer: Vec<u8>) {
300 if buffer.capacity() >= self.config.serialization_buffer_size / 2
302 && buffer.capacity() <= self.config.serialization_buffer_size * 2
303 {
304 self.serialization_buffers.push(buffer);
305 self.deallocations.fetch_add(1, Ordering::Relaxed);
306 }
307 }
309
310 #[inline(always)]
312 pub fn get_compression_buffer(&self) -> Vec<u8> {
313 if let Some(mut buffer) = self.compression_buffers.pop() {
314 buffer.clear();
315 self.pool_hits.fetch_add(1, Ordering::Relaxed);
316 buffer
317 } else {
318 self.pool_misses.fetch_add(1, Ordering::Relaxed);
319 self.allocations.fetch_add(1, Ordering::Relaxed);
320 Vec::with_capacity(self.config.compression_buffer_size)
321 }
322 }
323
324 #[inline(always)]
326 pub fn return_compression_buffer(&self, buffer: Vec<u8>) {
327 if buffer.capacity() >= self.config.compression_buffer_size / 2
328 && buffer.capacity() <= self.config.compression_buffer_size * 2
329 {
330 self.compression_buffers.push(buffer);
331 self.deallocations.fetch_add(1, Ordering::Relaxed);
332 }
333 }
334
335 #[inline(always)]
337 pub fn get_simd_buffer(&self) -> VecSimd<f64x4> {
338 if let Some(mut buffer) = self.simd_buffers.pop() {
339 buffer.fill(f64x4::ZERO);
341 self.pool_hits.fetch_add(1, Ordering::Relaxed);
342 buffer
343 } else {
344 self.pool_misses.fetch_add(1, Ordering::Relaxed);
345 self.allocations.fetch_add(1, Ordering::Relaxed);
346 VecSimd::<f64x4>::with(0.0, self.config.simd_buffer_elements)
347 }
348 }
349
350 #[inline(always)]
352 pub fn return_simd_buffer(&self, buffer: VecSimd<f64x4>) {
353 self.simd_buffers.push(buffer);
355 self.deallocations.fetch_add(1, Ordering::Relaxed);
356 }
357
358 pub fn get_stats(&self) -> BufferPoolStats {
360 BufferPoolStats {
361 allocations: self.allocations.load(Ordering::Relaxed),
362 deallocations: self.deallocations.load(Ordering::Relaxed),
363 pool_hits: self.pool_hits.load(Ordering::Relaxed),
364 pool_misses: self.pool_misses.load(Ordering::Relaxed),
365 hit_rate: {
366 let hits = self.pool_hits.load(Ordering::Relaxed);
367 let misses = self.pool_misses.load(Ordering::Relaxed);
368 let total = hits + misses;
369 if total > 0 {
370 hits as f64 / total as f64
371 } else {
372 0.0
373 }
374 },
375 }
376 }
377
378 pub fn reset_stats(&self) {
380 self.allocations.store(0, Ordering::Relaxed);
381 self.deallocations.store(0, Ordering::Relaxed);
382 self.pool_hits.store(0, Ordering::Relaxed);
383 self.pool_misses.store(0, Ordering::Relaxed);
384 }
385
386 pub fn get_available_counts(&self) -> BufferCounts {
389 let allocations = self.allocations.load(Ordering::Relaxed);
392 let deallocations = self.deallocations.load(Ordering::Relaxed);
393 let estimated_used = allocations.saturating_sub(deallocations);
394 let estimated_available = self.config.buffer_count.saturating_sub(estimated_used);
395
396 BufferCounts {
397 serialization_available: estimated_available,
398 compression_available: estimated_available,
399 simd_available: estimated_available,
400 total_capacity: self.config.buffer_count,
401 }
402 }
403}
404
405#[derive(Debug, Clone)]
407pub struct BufferPoolStats {
408 pub allocations: usize,
410 pub deallocations: usize,
412 pub pool_hits: usize,
414 pub pool_misses: usize,
416 pub hit_rate: f64,
418}
419
420#[derive(Debug, Clone)]
422pub struct BufferCounts {
423 pub serialization_available: usize,
425 pub compression_available: usize,
427 pub simd_available: usize,
429 pub total_capacity: usize,
431}
432
433unsafe impl Send for LockFreeBufferPool {}
435unsafe impl Sync for LockFreeBufferPool {}
436
437pub struct BufferGuard<'a, T> {
439 buffer: Option<T>,
440 pool: &'a LockFreeBufferPool,
441 return_fn: fn(&LockFreeBufferPool, T),
442}
443
444impl<'a, T> BufferGuard<'a, T> {
445 fn new(buffer: T, pool: &'a LockFreeBufferPool, return_fn: fn(&LockFreeBufferPool, T)) -> Self {
446 Self {
447 buffer: Some(buffer),
448 pool,
449 return_fn,
450 }
451 }
452
453 pub const fn data_mut(&mut self) -> &mut T {
455 self.buffer.as_mut().unwrap()
456 }
457
458 pub const fn data(&self) -> &T {
460 self.buffer.as_ref().unwrap()
461 }
462
463 pub fn return_early(mut self) {
465 if let Some(buffer) = self.buffer.take() {
466 (self.return_fn)(self.pool, buffer);
467 }
468 }
469}
470
471impl<'a, T> Drop for BufferGuard<'a, T> {
472 fn drop(&mut self) {
473 if let Some(buffer) = self.buffer.take() {
474 (self.return_fn)(self.pool, buffer);
475 }
476 }
477}
478
479impl<'a, T> AsRef<T> for BufferGuard<'a, T> {
480 fn as_ref(&self) -> &T {
481 self.buffer.as_ref().unwrap()
482 }
483}
484
485impl<'a, T> AsMut<T> for BufferGuard<'a, T> {
486 fn as_mut(&mut self) -> &mut T {
487 self.buffer.as_mut().unwrap()
488 }
489}
490
491impl LockFreeBufferPool {
492 pub fn get_serialization_buffer_guard(&self) -> BufferGuard<'_, Vec<u8>> {
494 let buffer = self.get_serialization_buffer();
495 BufferGuard::new(buffer, self, |pool, buf| {
496 pool.return_serialization_buffer(buf)
497 })
498 }
499
500 pub fn get_compression_buffer_guard(&self) -> BufferGuard<'_, Vec<u8>> {
502 let buffer = self.get_compression_buffer();
503 BufferGuard::new(buffer, self, |pool, buf| {
504 pool.return_compression_buffer(buf)
505 })
506 }
507
508 pub fn get_simd_buffer_guard(&self) -> BufferGuard<'_, VecSimd<f64x4>> {
510 let buffer = self.get_simd_buffer();
511 BufferGuard::new(buffer, self, |pool, buf| pool.return_simd_buffer(buf))
512 }
513}
514
515#[cfg(test)]
516mod tests {
517 use super::*;
518 use std::sync::Arc;
519 use std::thread;
520 use std::time::Duration;
521
522 #[test]
523 fn test_lockfree_buffer_pool_basic_operations() {
524 let config = LockFreeBufferPoolConfig::default();
525 let pool = LockFreeBufferPool::new(config);
526
527 let buffer1 = pool.get_serialization_buffer();
529 assert!(buffer1.capacity() > 0);
530 pool.return_serialization_buffer(buffer1);
531
532 let buffer2 = pool.get_compression_buffer();
534 assert!(buffer2.capacity() > 0);
535 pool.return_compression_buffer(buffer2);
536
537 let buffer3 = pool.get_simd_buffer();
539 assert!(!buffer3.is_empty());
540 pool.return_simd_buffer(buffer3);
541
542 let stats = pool.get_stats();
543 assert_eq!(stats.pool_hits, 3);
544 assert_eq!(stats.deallocations, 3);
545 }
546
547 #[test]
548 fn test_lockfree_buffer_pool_guard() {
549 let config = LockFreeBufferPoolConfig::default();
550 let pool = LockFreeBufferPool::new(config);
551
552 {
554 let mut guard = pool.get_serialization_buffer_guard();
555 let buffer = guard.as_mut();
556 buffer.extend_from_slice(b"test data");
557 assert_eq!(buffer.len(), 9);
558 } let stats = pool.get_stats();
561 assert_eq!(stats.deallocations, 1);
562 }
563
564 #[test]
565 fn test_lockfree_buffer_pool_concurrent_access() {
566 let config = LockFreeBufferPoolConfig {
567 buffer_count: 100,
568 ..Default::default()
569 };
570 let pool = Arc::new(LockFreeBufferPool::new(config));
571 let mut handles = Vec::new();
572
573 for _ in 0..10 {
575 let pool_clone = Arc::clone(&pool);
576 let handle = thread::spawn(move || {
577 for _ in 0..100 {
578 let buffer = pool_clone.get_serialization_buffer();
580 thread::sleep(Duration::from_micros(1)); pool_clone.return_serialization_buffer(buffer);
582
583 let simd_buffer = pool_clone.get_simd_buffer();
585 thread::sleep(Duration::from_micros(1)); pool_clone.return_simd_buffer(simd_buffer);
587 }
588 });
589 handles.push(handle);
590 }
591
592 for handle in handles {
594 handle.join().unwrap();
595 }
596
597 let stats = pool.get_stats();
598 assert_eq!(stats.deallocations, 2000); assert!(stats.hit_rate > 0.8); }
601
602 #[test]
603 fn test_lockfree_buffer_pool_memory_bounds() {
604 let config = LockFreeBufferPoolConfig {
605 buffer_count: 10,
606 serialization_buffer_size: 1024,
607 ..Default::default()
608 };
609 let pool = LockFreeBufferPool::new(config);
610
611 let mut buffers = Vec::new();
613 for _ in 0..20 {
614 buffers.push(pool.get_serialization_buffer());
615 }
616
617 for buffer in buffers {
619 pool.return_serialization_buffer(buffer);
620 }
621
622 let stats = pool.get_stats();
623 assert!(stats.pool_misses > 0); assert!(stats.hit_rate < 1.0); }
626
627 #[test]
628 fn test_lockfree_buffer_pool_stats() {
629 let config = LockFreeBufferPoolConfig::default();
630 let pool = LockFreeBufferPool::new(config);
631
632 pool.reset_stats();
634
635 let buffer = pool.get_serialization_buffer();
636 pool.return_serialization_buffer(buffer);
637
638 let stats = pool.get_stats();
639 assert_eq!(stats.pool_hits, 1);
640 assert_eq!(stats.deallocations, 1);
641 assert_eq!(stats.hit_rate, 1.0);
642
643 let counts = pool.get_available_counts();
644 assert_eq!(counts.total_capacity, 256); }
646
647 #[test]
648 fn test_buffer_guard_early_return() {
649 let config = LockFreeBufferPoolConfig::default();
650 let pool = LockFreeBufferPool::new(config);
651
652 pool.reset_stats();
653
654 let guard = pool.get_serialization_buffer_guard();
656 guard.return_early();
657
658 let stats = pool.get_stats();
659 assert_eq!(stats.deallocations, 1);
660 }
661}