rusty_feeder/limit/
optimized_rate_limiter.rs1use std::{
2 num::NonZeroUsize,
3 sync::Arc,
4 time::{Duration, Instant},
5};
6
7use parking_lot::Mutex;
8use smallvec::SmallVec;
9use tokio::sync::Semaphore;
10
11use super::helpers::*;
12
13#[repr(align(64))] #[derive(Clone)]
17pub struct OptimizedRateLimiter {
18 concurrency_limit: Arc<Semaphore>,
20
21 rate_state: Arc<Mutex<RateLimitState>>,
23
24 window: Duration,
26
27 max_tokens: usize,
29}
30
31#[derive(Debug)]
33struct RateLimitState {
34 available_tokens: usize,
36
37 last_refresh: Instant,
39
40 recent_timestamps: SmallVec<[Instant; 32]>,
42}
43
44impl OptimizedRateLimiter {
45 #[must_use]
47 pub fn new(max_concurrent: NonZeroUsize, window: Duration) -> Self {
48 let max_tokens = max_concurrent.get();
49
50 Self {
51 concurrency_limit: Arc::new(Semaphore::new(max_tokens)),
52 rate_state: Arc::new(Mutex::new(RateLimitState {
53 available_tokens: max_tokens,
54 last_refresh: Instant::now(),
55 recent_timestamps: SmallVec::new(),
56 })),
57 window,
58 max_tokens,
59 }
60 }
61
62 pub async fn wait(&self) -> RateGuard {
64 let permit = self
66 .concurrency_limit
67 .clone()
68 .acquire_owned()
69 .await
70 .expect("Semaphore closed");
71
72 let rate_state = self.rate_state.clone();
74 let now = Instant::now();
75
76 let window = self.window;
78 let max_tokens = self.max_tokens;
79
80 let token_wait = tokio::task::spawn_blocking(move || {
81 let mut state = rate_state.lock();
82
83 let (new_tokens, new_last_refresh) = refill_tokens_with_time_adjustment(
85 state.available_tokens,
86 max_tokens,
87 state.last_refresh,
88 now,
89 window,
90 );
91
92 state.available_tokens = new_tokens;
94 state.last_refresh = new_last_refresh;
95
96 if should_perform_full_reset(now.duration_since(state.last_refresh), window) {
98 state.recent_timestamps.clear();
99 } else {
100 let cutoff = now - window;
102 state.recent_timestamps.retain(|ts| *ts >= cutoff);
103 }
104
105 if state.available_tokens == 0 {
107 if let Some(&oldest) = state.recent_timestamps.first() {
109 let wait_duration = window
110 .checked_sub(now.duration_since(oldest))
111 .unwrap_or(Duration::from_millis(0));
112
113 Some(wait_duration)
115 } else {
116 None }
118 } else {
119 state.available_tokens -= 1;
121 state.recent_timestamps.push(now);
122
123 state.recent_timestamps.sort_unstable();
125
126 None
128 }
129 })
130 .await
131 .expect("Token wait task panicked");
132
133 if let Some(wait_duration) = token_wait {
135 tokio::time::sleep(wait_duration).await;
136
137 let mut state = self.rate_state.lock();
139 state.available_tokens = state.available_tokens.saturating_sub(1);
140 state.recent_timestamps.push(Instant::now());
141
142 state.recent_timestamps.sort_unstable();
144 }
145
146 RateGuard { _permit: permit }
148 }
149
150 pub fn try_acquire(&self) -> Option<RateGuard> {
152 let permit = self.concurrency_limit.clone().try_acquire_owned().ok()?;
154
155 let mut state = self.rate_state.lock();
157 let now = Instant::now();
158
159 let (new_tokens, new_last_refresh) = refill_tokens_with_time_adjustment(
161 state.available_tokens,
162 self.max_tokens,
163 state.last_refresh,
164 now,
165 self.window,
166 );
167
168 state.available_tokens = new_tokens;
170 state.last_refresh = new_last_refresh;
171
172 if should_perform_full_reset(now.duration_since(state.last_refresh), self.window) {
174 state.recent_timestamps.clear();
175 } else {
176 let cutoff = now - self.window;
178 state.recent_timestamps.retain(|ts| *ts >= cutoff);
179 }
180
181 if state.available_tokens > 0 {
183 state.available_tokens -= 1;
185 state.recent_timestamps.push(now);
186
187 state.recent_timestamps.sort_unstable();
189
190 Some(RateGuard { _permit: permit })
192 } else {
193 drop(permit);
195 None
196 }
197 }
198}
199
200pub struct RateGuard {
202 _permit: tokio::sync::OwnedSemaphorePermit,
203}