1use parking_lot::{Mutex, RwLock};
7use quanta::Instant as QuantaInstant;
8use simd_aligned::VecSimd;
9use std::alloc::{Layout, alloc, dealloc};
10use std::ptr::NonNull;
11use std::sync::Arc;
12use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
13use wide::f64x4;
14
15#[derive(Debug, Clone)]
17pub struct ZerocopyPoolConfig {
18 pub initial_buffer_count: usize,
20 pub max_buffer_count: usize,
22 pub buffer_size: usize,
24 pub simd_aligned: bool,
26 pub enable_statistics: bool,
28}
29
30impl Default for ZerocopyPoolConfig {
31 fn default() -> Self {
32 Self {
33 initial_buffer_count: 64,
34 max_buffer_count: 512,
35 buffer_size: 64 * 1024, simd_aligned: true,
37 enable_statistics: true,
38 }
39 }
40}
41
42#[derive(Debug, Clone, Default)]
44pub struct PoolStatistics {
45 pub total_allocations: u64,
47 pub total_deallocations: u64,
49 pub current_allocated: u64,
51 pub peak_allocated: u64,
53 pub total_bytes_allocated: u64,
55 pub pool_hits: u64,
57 pub pool_misses: u64,
59 pub grow_operations: u64,
61 pub avg_allocation_time_nanos: u64,
63}
64
65pub struct BufferHandle {
67 ptr: NonNull<u8>,
68 size: usize,
69 pool: Arc<ZerocopyMemoryPool>,
70 buffer_id: usize,
71 allocation_time: QuantaInstant,
72}
73
74impl BufferHandle {
75 pub const fn as_slice_mut(&mut self) -> &mut [u8] {
77 unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.size) }
84 }
85
86 pub const fn as_slice(&self) -> &[u8] {
88 unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.size) }
95 }
96
97 pub const fn as_ptr(&self) -> *const u8 {
99 self.ptr.as_ptr()
100 }
101
102 pub const fn as_mut_ptr(&mut self) -> *mut u8 {
104 self.ptr.as_ptr()
105 }
106
107 pub const fn size(&self) -> usize {
109 self.size
110 }
111
112 pub const fn clear(&mut self) {
114 unsafe {
115 std::ptr::write_bytes(self.ptr.as_ptr(), 0, self.size);
116 }
117 }
118}
119
120impl Drop for BufferHandle {
121 fn drop(&mut self) {
122 let elapsed_nanos = self.allocation_time.elapsed().as_nanos() as u64;
123 self.pool.return_buffer(self.buffer_id, elapsed_nanos);
124 }
125}
126
127unsafe impl Send for BufferHandle {}
128unsafe impl Sync for BufferHandle {}
129
130pub struct SimdBufferHandle {
132 simd_data: VecSimd<f64x4>,
133 pool: Arc<SimdMemoryPool>,
134 buffer_id: usize,
135 allocation_time: QuantaInstant,
136}
137
138impl SimdBufferHandle {
139 pub const fn as_simd_mut(&mut self) -> &mut VecSimd<f64x4> {
141 &mut self.simd_data
142 }
143
144 pub const fn as_simd(&self) -> &VecSimd<f64x4> {
146 &self.simd_data
147 }
148
149 pub fn as_flat_slice_mut(&mut self) -> &mut [f64] {
151 self.simd_data.flat_mut()
152 }
153
154 pub fn as_flat_slice(&self) -> &[f64] {
156 self.simd_data.flat()
157 }
158
159 pub fn clear(&mut self) {
161 for i in 0..self.simd_data.len() {
162 self.simd_data[i] = f64x4::splat(0.0);
163 }
164 }
165}
166
167impl Drop for SimdBufferHandle {
168 fn drop(&mut self) {
169 let elapsed_nanos = self.allocation_time.elapsed().as_nanos() as u64;
170 self.pool.return_simd_buffer(self.buffer_id, elapsed_nanos);
171 }
172}
173
174pub struct TypedBufferHandle<T: Default> {
176 data: Box<T>,
177 pool: Arc<TypedMemoryPool<T>>,
178 buffer_id: usize,
179 allocation_time: QuantaInstant,
180}
181
182impl<T: Default> TypedBufferHandle<T> {
183 pub fn data_mut(&mut self) -> &mut T {
185 &mut self.data
186 }
187
188 pub fn data(&self) -> &T {
190 &self.data
191 }
192}
193
194impl<T: Default> AsRef<T> for TypedBufferHandle<T> {
196 fn as_ref(&self) -> &T {
197 &self.data
198 }
199}
200
201impl<T: Default> AsMut<T> for TypedBufferHandle<T> {
203 fn as_mut(&mut self) -> &mut T {
204 &mut self.data
205 }
206}
207
208impl<T: Default> Drop for TypedBufferHandle<T> {
209 fn drop(&mut self) {
210 let elapsed_nanos = self.allocation_time.elapsed().as_nanos() as u64;
211 self.pool.return_typed_buffer(self.buffer_id, elapsed_nanos);
212 }
213}
214
215pub struct ZerocopyMemoryPool {
217 config: ZerocopyPoolConfig,
218 buffers: RwLock<Vec<Option<NonNull<u8>>>>,
219 available_buffers: Mutex<Vec<usize>>,
220 statistics: RwLock<PoolStatistics>,
221 total_allocations: AtomicU64,
222 current_allocated: AtomicUsize,
223 peak_allocated: AtomicUsize,
224}
225
226impl ZerocopyMemoryPool {
227 pub fn new(config: ZerocopyPoolConfig) -> Arc<Self> {
229 let pool = Arc::new(Self {
230 config: config.clone(),
231 buffers: RwLock::new(Vec::with_capacity(config.max_buffer_count)),
232 available_buffers: Mutex::new(Vec::with_capacity(config.initial_buffer_count)),
233 statistics: RwLock::new(PoolStatistics::default()),
234 total_allocations: AtomicU64::new(0),
235 current_allocated: AtomicUsize::new(0),
236 peak_allocated: AtomicUsize::new(0),
237 });
238
239 pool.clone().preallocate_buffers();
241 pool
242 }
243
244 fn preallocate_buffers(self: Arc<Self>) {
246 let layout = if self.config.simd_aligned {
247 Layout::from_size_align(self.config.buffer_size, 32).unwrap()
248 } else {
249 Layout::from_size_align(self.config.buffer_size, 8).unwrap()
250 };
251
252 let mut buffers = self.buffers.write();
253 let mut available = self.available_buffers.lock();
254
255 for i in 0..self.config.initial_buffer_count {
256 unsafe {
257 let ptr = alloc(layout);
258 if !ptr.is_null() {
259 let non_null_ptr = NonNull::new_unchecked(ptr);
260 buffers.push(Some(non_null_ptr));
261 available.push(i);
262 } else {
263 break;
265 }
266 }
267 }
268 }
269
270 pub fn allocate_buffer(self: &Arc<Self>) -> Option<BufferHandle> {
272 let start_time = QuantaInstant::now();
273 self.total_allocations.fetch_add(1, Ordering::Relaxed);
274
275 let buffer_id = {
277 let mut available = self.available_buffers.lock();
278 available.pop()
279 };
280
281 if let Some(id) = buffer_id {
282 if self.config.enable_statistics {
284 let mut stats = self.statistics.write();
285 stats.pool_hits += 1;
286 }
287
288 let buffers = self.buffers.read();
289 if let Some(Some(ptr)) = buffers.get(id) {
290 let current = self.current_allocated.fetch_add(1, Ordering::Relaxed) + 1;
291 let peak = self.peak_allocated.load(Ordering::Relaxed);
292 if current > peak {
293 self.peak_allocated.store(current, Ordering::Relaxed);
294 }
295
296 return Some(BufferHandle {
297 ptr: *ptr,
298 size: self.config.buffer_size,
299 pool: self.clone(),
300 buffer_id: id,
301 allocation_time: start_time,
302 });
303 }
304 }
305
306 if self.config.enable_statistics {
308 let mut stats = self.statistics.write();
309 stats.pool_misses += 1;
310 }
311
312 self.try_grow_pool().map(|id| {
313 let buffers = self.buffers.read();
314 let ptr = buffers[id].unwrap();
315
316 let current = self.current_allocated.fetch_add(1, Ordering::Relaxed) + 1;
317 let peak = self.peak_allocated.load(Ordering::Relaxed);
318 if current > peak {
319 self.peak_allocated.store(current, Ordering::Relaxed);
320 }
321
322 BufferHandle {
323 ptr,
324 size: self.config.buffer_size,
325 pool: self.clone(),
326 buffer_id: id,
327 allocation_time: start_time,
328 }
329 })
330 }
331
332 fn try_grow_pool(&self) -> Option<usize> {
334 let layout = if self.config.simd_aligned {
335 Layout::from_size_align(self.config.buffer_size, 32).unwrap()
336 } else {
337 Layout::from_size_align(self.config.buffer_size, 8).unwrap()
338 };
339
340 let mut buffers = self.buffers.write();
341 if buffers.len() >= self.config.max_buffer_count {
342 return None; }
344
345 unsafe {
346 let ptr = alloc(layout);
347 if !ptr.is_null() {
348 let non_null_ptr = NonNull::new_unchecked(ptr);
349 let id = buffers.len();
350 buffers.push(Some(non_null_ptr));
351
352 if self.config.enable_statistics {
353 let mut stats = self.statistics.write();
354 stats.grow_operations += 1;
355 }
356
357 Some(id)
358 } else {
359 None
360 }
361 }
362 }
363
364 fn return_buffer(&self, buffer_id: usize, allocation_time_nanos: u64) {
366 let mut available = self.available_buffers.lock();
367 available.push(buffer_id);
368
369 self.current_allocated.fetch_sub(1, Ordering::Relaxed);
370
371 if self.config.enable_statistics {
372 let mut stats = self.statistics.write();
373 stats.total_deallocations += 1;
374 stats.avg_allocation_time_nanos =
375 (stats.avg_allocation_time_nanos * 9 + allocation_time_nanos) / 10;
376 }
377 }
378
379 pub fn get_statistics(&self) -> PoolStatistics {
381 if !self.config.enable_statistics {
382 return PoolStatistics::default();
383 }
384
385 let mut stats = self.statistics.read().clone();
386 stats.total_allocations = self.total_allocations.load(Ordering::Relaxed);
387 stats.current_allocated = self.current_allocated.load(Ordering::Relaxed) as u64;
388 stats.peak_allocated = self.peak_allocated.load(Ordering::Relaxed) as u64;
389 stats.total_bytes_allocated = stats.total_allocations * self.config.buffer_size as u64;
390 stats
391 }
392
393 pub fn available_count(&self) -> usize {
395 self.available_buffers.lock().len()
396 }
397
398 pub fn buffer_count(&self) -> usize {
400 self.buffers.read().len()
401 }
402
403 pub const fn buffer_size(&self) -> usize {
405 self.config.buffer_size
406 }
407
408 pub fn total_memory_bytes(&self) -> usize {
410 self.buffer_count() * self.buffer_size()
411 }
412}
413
414impl Drop for ZerocopyMemoryPool {
415 fn drop(&mut self) {
416 let layout = if self.config.simd_aligned {
417 Layout::from_size_align(self.config.buffer_size, 32).unwrap()
418 } else {
419 Layout::from_size_align(self.config.buffer_size, 8).unwrap()
420 };
421
422 let buffers = self.buffers.write();
423 for ptr in buffers.iter().flatten() {
424 unsafe {
425 dealloc(ptr.as_ptr(), layout);
426 }
427 }
428 }
429}
430
431unsafe impl Send for ZerocopyMemoryPool {}
432unsafe impl Sync for ZerocopyMemoryPool {}
433
434pub struct SimdMemoryPool {
436 config: ZerocopyPoolConfig,
437 simd_buffers: RwLock<Vec<Option<VecSimd<f64x4>>>>,
438 available_buffers: Mutex<Vec<usize>>,
439 statistics: RwLock<PoolStatistics>,
440 total_allocations: AtomicU64,
441 current_allocated: AtomicUsize,
442 peak_allocated: AtomicUsize,
443}
444
445impl SimdMemoryPool {
446 pub fn new(config: ZerocopyPoolConfig) -> Arc<Self> {
448 let simd_element_count = config.buffer_size / (4 * std::mem::size_of::<f64>());
449
450 let pool = Arc::new(Self {
451 config: config.clone(),
452 simd_buffers: RwLock::new(Vec::with_capacity(config.max_buffer_count)),
453 available_buffers: Mutex::new(Vec::with_capacity(config.initial_buffer_count)),
454 statistics: RwLock::new(PoolStatistics::default()),
455 total_allocations: AtomicU64::new(0),
456 current_allocated: AtomicUsize::new(0),
457 peak_allocated: AtomicUsize::new(0),
458 });
459
460 {
462 let mut buffers = pool.simd_buffers.write();
463 let mut available = pool.available_buffers.lock();
464
465 for i in 0..config.initial_buffer_count {
466 let simd_buffer = VecSimd::<f64x4>::with(0.0, simd_element_count);
467 buffers.push(Some(simd_buffer));
468 available.push(i);
469 }
470 }
471
472 pool
473 }
474
475 pub fn allocate_simd_buffer(self: &Arc<Self>) -> Option<SimdBufferHandle> {
477 let start_time = QuantaInstant::now();
478 self.total_allocations.fetch_add(1, Ordering::Relaxed);
479
480 let buffer_id = {
481 let mut available = self.available_buffers.lock();
482 available.pop()
483 };
484
485 if let Some(id) = buffer_id {
486 let mut buffers = self.simd_buffers.write();
487 if let Some(simd_data) = buffers[id].take() {
488 let current = self.current_allocated.fetch_add(1, Ordering::Relaxed) + 1;
489 let peak = self.peak_allocated.load(Ordering::Relaxed);
490 if current > peak {
491 self.peak_allocated.store(current, Ordering::Relaxed);
492 }
493
494 return Some(SimdBufferHandle {
495 simd_data,
496 pool: self.clone(),
497 buffer_id: id,
498 allocation_time: start_time,
499 });
500 }
501 }
502
503 None
504 }
505
506 fn return_simd_buffer(&self, buffer_id: usize, allocation_time_nanos: u64) {
508 let simd_element_count = self.config.buffer_size / (4 * std::mem::size_of::<f64>());
510 let mut cleared_buffer = VecSimd::<f64x4>::with(0.0, simd_element_count);
511
512 for i in 0..cleared_buffer.len() {
513 cleared_buffer[i] = f64x4::splat(0.0);
514 }
515
516 {
517 let mut buffers = self.simd_buffers.write();
518 buffers[buffer_id] = Some(cleared_buffer);
519 }
520
521 {
522 let mut available = self.available_buffers.lock();
523 available.push(buffer_id);
524 }
525
526 self.current_allocated.fetch_sub(1, Ordering::Relaxed);
527
528 if self.config.enable_statistics {
529 let mut stats = self.statistics.write();
530 stats.total_deallocations += 1;
531 stats.avg_allocation_time_nanos =
532 (stats.avg_allocation_time_nanos * 9 + allocation_time_nanos) / 10;
533 }
534 }
535
536 pub fn available_simd_count(&self) -> usize {
538 self.available_buffers.lock().len()
539 }
540}
541
542pub struct TypedMemoryPool<T> {
544 config: ZerocopyPoolConfig,
545 typed_buffers: RwLock<Vec<Option<Box<T>>>>,
546 available_buffers: Mutex<Vec<usize>>,
547 statistics: RwLock<PoolStatistics>,
548 total_allocations: AtomicU64,
549 current_allocated: AtomicUsize,
550 peak_allocated: AtomicUsize,
551}
552
553impl<T: Default> TypedMemoryPool<T> {
554 pub fn new(config: ZerocopyPoolConfig) -> Arc<Self> {
556 let pool = Arc::new(Self {
557 config: config.clone(),
558 typed_buffers: RwLock::new(Vec::with_capacity(config.max_buffer_count)),
559 available_buffers: Mutex::new(Vec::with_capacity(config.initial_buffer_count)),
560 statistics: RwLock::new(PoolStatistics::default()),
561 total_allocations: AtomicU64::new(0),
562 current_allocated: AtomicUsize::new(0),
563 peak_allocated: AtomicUsize::new(0),
564 });
565
566 {
568 let mut buffers = pool.typed_buffers.write();
569 let mut available = pool.available_buffers.lock();
570
571 for i in 0..config.initial_buffer_count {
572 let typed_buffer = Box::new(T::default());
573 buffers.push(Some(typed_buffer));
574 available.push(i);
575 }
576 }
577
578 pool
579 }
580
581 pub fn allocate_typed(self: &Arc<Self>) -> Option<TypedBufferHandle<T>> {
583 let start_time = QuantaInstant::now();
584 self.total_allocations.fetch_add(1, Ordering::Relaxed);
585
586 let buffer_id = {
587 let mut available = self.available_buffers.lock();
588 available.pop()
589 };
590
591 if let Some(id) = buffer_id {
592 let mut buffers = self.typed_buffers.write();
593 if let Some(data) = buffers[id].take() {
594 let current = self.current_allocated.fetch_add(1, Ordering::Relaxed) + 1;
595 let peak = self.peak_allocated.load(Ordering::Relaxed);
596 if current > peak {
597 self.peak_allocated.store(current, Ordering::Relaxed);
598 }
599
600 return Some(TypedBufferHandle {
601 data,
602 pool: self.clone(),
603 buffer_id: id,
604 allocation_time: start_time,
605 });
606 }
607 }
608
609 None
610 }
611
612 fn return_typed_buffer(&self, buffer_id: usize, allocation_time_nanos: u64) {
614 let default_data = Box::new(T::default());
616
617 {
618 let mut buffers = self.typed_buffers.write();
619 buffers[buffer_id] = Some(default_data);
620 }
621
622 {
623 let mut available = self.available_buffers.lock();
624 available.push(buffer_id);
625 }
626
627 self.current_allocated.fetch_sub(1, Ordering::Relaxed);
628
629 if self.config.enable_statistics {
630 let mut stats = self.statistics.write();
631 stats.total_deallocations += 1;
632 stats.avg_allocation_time_nanos =
633 (stats.avg_allocation_time_nanos * 9 + allocation_time_nanos) / 10;
634 }
635 }
636
637 pub fn available_count(&self) -> usize {
639 self.available_buffers.lock().len()
640 }
641}
642
643pub struct GlobalPoolManager {
645 general_pool: Arc<ZerocopyMemoryPool>,
646 simd_pool: Arc<SimdMemoryPool>,
647 large_buffer_pool: Arc<ZerocopyMemoryPool>,
648 small_buffer_pool: Arc<ZerocopyMemoryPool>,
649}
650
651impl GlobalPoolManager {
652 pub fn new() -> Self {
654 let general_config = ZerocopyPoolConfig {
656 initial_buffer_count: 128,
657 max_buffer_count: 1024,
658 buffer_size: 64 * 1024,
659 simd_aligned: true,
660 enable_statistics: true,
661 };
662
663 let simd_config = ZerocopyPoolConfig {
665 initial_buffer_count: 64,
666 max_buffer_count: 256,
667 buffer_size: 32 * 1024, simd_aligned: true,
669 enable_statistics: true,
670 };
671
672 let large_config = ZerocopyPoolConfig {
674 initial_buffer_count: 16,
675 max_buffer_count: 64,
676 buffer_size: 1024 * 1024,
677 simd_aligned: true,
678 enable_statistics: true,
679 };
680
681 let small_config = ZerocopyPoolConfig {
683 initial_buffer_count: 256,
684 max_buffer_count: 2048,
685 buffer_size: 4 * 1024,
686 simd_aligned: false,
687 enable_statistics: true,
688 };
689
690 Self {
691 general_pool: ZerocopyMemoryPool::new(general_config),
692 simd_pool: SimdMemoryPool::new(simd_config),
693 large_buffer_pool: ZerocopyMemoryPool::new(large_config),
694 small_buffer_pool: ZerocopyMemoryPool::new(small_config),
695 }
696 }
697
698 pub const fn general_pool(&self) -> &Arc<ZerocopyMemoryPool> {
700 &self.general_pool
701 }
702
703 pub const fn simd_pool(&self) -> &Arc<SimdMemoryPool> {
705 &self.simd_pool
706 }
707
708 pub const fn large_buffer_pool(&self) -> &Arc<ZerocopyMemoryPool> {
710 &self.large_buffer_pool
711 }
712
713 pub const fn small_buffer_pool(&self) -> &Arc<ZerocopyMemoryPool> {
715 &self.small_buffer_pool
716 }
717
718 pub fn get_comprehensive_statistics(&self) -> GlobalPoolStatistics {
720 GlobalPoolStatistics {
721 general_pool: self.general_pool.get_statistics(),
722 simd_pool: self.simd_pool.statistics.read().clone(),
723 large_buffer_pool: self.large_buffer_pool.get_statistics(),
724 small_buffer_pool: self.small_buffer_pool.get_statistics(),
725 }
726 }
727}
728
729impl Default for GlobalPoolManager {
730 fn default() -> Self {
731 Self::new()
732 }
733}
734
735#[derive(Debug, Clone)]
737pub struct GlobalPoolStatistics {
738 pub general_pool: PoolStatistics,
740 pub simd_pool: PoolStatistics,
742 pub large_buffer_pool: PoolStatistics,
744 pub small_buffer_pool: PoolStatistics,
746}
747
748#[cfg(test)]
749mod tests {
750 use super::*;
751
752 #[test]
753 fn test_zerocopy_memory_pool_basic_operations() {
754 let config = ZerocopyPoolConfig {
755 initial_buffer_count: 4,
756 max_buffer_count: 8,
757 buffer_size: 1024,
758 simd_aligned: false,
759 enable_statistics: true,
760 };
761
762 let pool = ZerocopyMemoryPool::new(config);
763
764 assert_eq!(pool.buffer_count(), 4);
765 assert_eq!(pool.available_count(), 4);
766
767 let mut buffer = pool.allocate_buffer().unwrap();
769 assert_eq!(pool.available_count(), 3);
770
771 let slice = buffer.as_slice_mut();
773 slice[0] = 0xFF;
774 slice[1023] = 0xAA;
775
776 drop(buffer);
777 assert_eq!(pool.available_count(), 4);
778 }
779
780 #[test]
781 fn test_simd_memory_pool() {
782 let config = ZerocopyPoolConfig {
783 initial_buffer_count: 2,
784 max_buffer_count: 4,
785 buffer_size: 1024,
786 simd_aligned: true,
787 enable_statistics: true,
788 };
789
790 let pool = SimdMemoryPool::new(config);
791
792 assert_eq!(pool.available_simd_count(), 2);
793
794 let mut simd_buffer = pool.allocate_simd_buffer().unwrap();
795 assert_eq!(pool.available_simd_count(), 1);
796
797 let simd_data = simd_buffer.as_simd_mut();
799 if !simd_data.is_empty() {
800 simd_data[0] = f64x4::splat(42.0);
801 }
802
803 drop(simd_buffer);
804 assert_eq!(pool.available_simd_count(), 2);
805 }
806
807 #[test]
808 fn test_global_pool_manager() {
809 let manager = GlobalPoolManager::new();
810
811 let general_buffer = manager.general_pool().allocate_buffer().unwrap();
813 assert_eq!(general_buffer.size(), 64 * 1024);
814
815 let small_buffer = manager.small_buffer_pool().allocate_buffer().unwrap();
817 assert_eq!(small_buffer.size(), 4 * 1024);
818
819 let large_buffer = manager.large_buffer_pool().allocate_buffer().unwrap();
821 assert_eq!(large_buffer.size(), 1024 * 1024);
822
823 let simd_buffer = manager.simd_pool().allocate_simd_buffer().unwrap();
825 assert!(!simd_buffer.as_simd().is_empty());
826
827 drop(general_buffer);
828 drop(small_buffer);
829 drop(large_buffer);
830 drop(simd_buffer);
831
832 let stats = manager.get_comprehensive_statistics();
833 assert!(stats.general_pool.total_allocations > 0);
834 assert!(stats.small_buffer_pool.total_allocations > 0);
835 assert!(stats.large_buffer_pool.total_allocations > 0);
836 }
837}