rusty_feeder/limit/
optimized_rate_limiter.rs

1use std::{
2    num::NonZeroUsize,
3    sync::Arc,
4    time::{Duration, Instant},
5};
6
7use parking_lot::Mutex;
8use smallvec::SmallVec;
9use tokio::sync::Semaphore;
10
11use super::helpers::*;
12
13/// Performance-optimized rate limiter for high-frequency trading applications
14/// Uses a token bucket algorithm with precise timing and low-latency synchronization
15#[repr(align(64))] // Cache-line alignment for better CPU cache efficiency
16#[derive(Clone)]
17pub struct OptimizedRateLimiter {
18    /// Maximum number of concurrent requests
19    concurrency_limit: Arc<Semaphore>,
20
21    /// Rate limit state (protected by mutex)
22    rate_state: Arc<Mutex<RateLimitState>>,
23
24    /// Pre-configured rate limit window
25    window: Duration,
26
27    /// Maximum allowed requests per window
28    max_tokens: usize,
29}
30
31/// Internal state for rate limiting
32#[derive(Debug)]
33struct RateLimitState {
34    /// Current available tokens
35    available_tokens: usize,
36
37    /// Last token refresh time
38    last_refresh: Instant,
39
40    /// Timestamps of recent requests for adaptive rate limiting
41    recent_timestamps: SmallVec<[Instant; 32]>,
42}
43
44impl OptimizedRateLimiter {
45    /// Create a new optimized rate limiter
46    #[must_use]
47    pub fn new(max_concurrent: NonZeroUsize, window: Duration) -> Self {
48        let max_tokens = max_concurrent.get();
49
50        Self {
51            concurrency_limit: Arc::new(Semaphore::new(max_tokens)),
52            rate_state: Arc::new(Mutex::new(RateLimitState {
53                available_tokens: max_tokens,
54                last_refresh: Instant::now(),
55                recent_timestamps: SmallVec::new(),
56            })),
57            window,
58            max_tokens,
59        }
60    }
61
62    /// Wait for a token to become available (async)
63    pub async fn wait(&self) -> RateGuard {
64        // First, acquire a permit from the semaphore
65        let permit = self
66            .concurrency_limit
67            .clone()
68            .acquire_owned()
69            .await
70            .expect("Semaphore closed");
71
72        // Then check if we're rate limited
73        let rate_state = self.rate_state.clone();
74        let now = Instant::now();
75
76        // Clone the necessary values from self to move into the closure
77        let window = self.window;
78        let max_tokens = self.max_tokens;
79
80        let token_wait = tokio::task::spawn_blocking(move || {
81            let mut state = rate_state.lock();
82
83            // Check if we need to refresh tokens based on elapsed time
84            let (new_tokens, new_last_refresh) = refill_tokens_with_time_adjustment(
85                state.available_tokens,
86                max_tokens,
87                state.last_refresh,
88                now,
89                window,
90            );
91
92            // Update state with new values
93            state.available_tokens = new_tokens;
94            state.last_refresh = new_last_refresh;
95
96            // Clean up old timestamps if a full reset occurred
97            if should_perform_full_reset(now.duration_since(state.last_refresh), window) {
98                state.recent_timestamps.clear();
99            } else {
100                // Clean up old timestamps
101                let cutoff = now - window;
102                state.recent_timestamps.retain(|ts| *ts >= cutoff);
103            }
104
105            // Wait if no tokens are available
106            if state.available_tokens == 0 {
107                // Calculate time until next token becomes available
108                if let Some(&oldest) = state.recent_timestamps.first() {
109                    let wait_duration = window
110                        .checked_sub(now.duration_since(oldest))
111                        .unwrap_or(Duration::from_millis(0));
112
113                    // Return the wait duration
114                    Some(wait_duration)
115                } else {
116                    None // This shouldn't happen if we track timestamps correctly
117                }
118            } else {
119                // Consume a token
120                state.available_tokens -= 1;
121                state.recent_timestamps.push(now);
122
123                // Sort recent timestamps to ensure oldest first
124                state.recent_timestamps.sort_unstable();
125
126                // No wait needed
127                None
128            }
129        })
130        .await
131        .expect("Token wait task panicked");
132
133        // If token_wait is Some, we need to wait before consuming a token
134        if let Some(wait_duration) = token_wait {
135            tokio::time::sleep(wait_duration).await;
136
137            // After waiting, record that we've consumed a token
138            let mut state = self.rate_state.lock();
139            state.available_tokens = state.available_tokens.saturating_sub(1);
140            state.recent_timestamps.push(Instant::now());
141
142            // Sort recent timestamps to ensure oldest first
143            state.recent_timestamps.sort_unstable();
144        }
145
146        // Return guard that will release the semaphore permit when dropped
147        RateGuard { _permit: permit }
148    }
149
150    /// Try to acquire a token without waiting
151    pub fn try_acquire(&self) -> Option<RateGuard> {
152        // Try to acquire a permit from the semaphore
153        let permit = self.concurrency_limit.clone().try_acquire_owned().ok()?;
154
155        // Check if we're rate limited
156        let mut state = self.rate_state.lock();
157        let now = Instant::now();
158
159        // Check if we need to refresh tokens based on elapsed time
160        let (new_tokens, new_last_refresh) = refill_tokens_with_time_adjustment(
161            state.available_tokens,
162            self.max_tokens,
163            state.last_refresh,
164            now,
165            self.window,
166        );
167
168        // Update state with new values
169        state.available_tokens = new_tokens;
170        state.last_refresh = new_last_refresh;
171
172        // Clean up old timestamps if a full reset occurred
173        if should_perform_full_reset(now.duration_since(state.last_refresh), self.window) {
174            state.recent_timestamps.clear();
175        } else {
176            // Clean up old timestamps
177            let cutoff = now - self.window;
178            state.recent_timestamps.retain(|ts| *ts >= cutoff);
179        }
180
181        // Check if a token is available
182        if state.available_tokens > 0 {
183            // Consume a token
184            state.available_tokens -= 1;
185            state.recent_timestamps.push(now);
186
187            // Sort recent timestamps to ensure oldest first
188            state.recent_timestamps.sort_unstable();
189
190            // Return the guard
191            Some(RateGuard { _permit: permit })
192        } else {
193            // No token available, release the permit
194            drop(permit);
195            None
196        }
197    }
198}
199
200/// RAII guard that releases a rate limit token when dropped
201pub struct RateGuard {
202    _permit: tokio::sync::OwnedSemaphorePermit,
203}