rusty_ems/throttle.rs
1use std::sync::Arc;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::time::Duration;
4
5use anyhow::Result;
6use crossbeam::queue::SegQueue;
7use quanta::Clock;
8use tokio::time;
9
10/// A rate limiter that enforces rate limits for exchange API calls
11#[derive(Debug)]
12pub struct RateLimiter {
13 /// The maximum number of requests allowed in the window
14 max_requests_per_window: usize,
15
16 /// The time window in milliseconds
17 window_milliseconds: u64,
18
19 /// Request timestamps for sliding window enforcement
20 /// Using a lock-free queue for better performance under contention
21 request_times: Arc<SegQueue<u64>>,
22
23 /// Current number of active requests in the window
24 /// This is an approximation and may be higher than actual due to expired requests
25 active_count: AtomicUsize,
26
27 /// High-precision clock
28 clock: Clock,
29}
30
31impl RateLimiter {
32 /// Creates a new rate limiter
33 ///
34 /// # Parameters
35 /// * `max_requests_per_window` - Maximum number of requests allowed per time window
36 /// * `window_milliseconds` - Time window in milliseconds
37 #[must_use]
38 pub fn new(max_requests_per_window: usize, window_milliseconds: u64) -> Self {
39 Self {
40 max_requests_per_window,
41 window_milliseconds,
42 request_times: Arc::new(SegQueue::new()),
43 active_count: AtomicUsize::new(0),
44 clock: Clock::new(),
45 }
46 }
47
48 /// Acquires permission to make a request, waiting if necessary
49 ///
50 /// # Returns
51 /// `Ok(())` when permission is granted, or an error if the wait was interrupted
52 pub async fn acquire(&self) -> Result<()> {
53 loop {
54 let now = self.clock.raw() / 1_000_000; // Convert ns to ms
55
56 // Count active requests and find the oldest timestamp
57 let mut active_count = 0;
58 let mut oldest_active_timestamp = now;
59 let mut timestamps = Vec::new();
60
61 // Drain the queue temporarily to scan all timestamps
62 while let Some(timestamp) = self.request_times.pop() {
63 timestamps.push(timestamp);
64
65 // Check if this timestamp is still active
66 if now - timestamp <= self.window_milliseconds {
67 active_count += 1;
68 if timestamp < oldest_active_timestamp {
69 oldest_active_timestamp = timestamp;
70 }
71 }
72 }
73
74 // Put all timestamps back in the queue
75 for timestamp in timestamps {
76 self.request_times.push(timestamp);
77 }
78
79 // Update the atomic counter with the accurate count
80 self.active_count.store(active_count, Ordering::SeqCst);
81
82 if active_count < self.max_requests_per_window {
83 // We have capacity, add a new timestamp
84 self.request_times.push(now);
85 self.active_count.fetch_add(1, Ordering::SeqCst);
86 return Ok(());
87 }
88
89 // We need to wait until the oldest active request expires
90 let elapsed = now - oldest_active_timestamp;
91 if elapsed >= self.window_milliseconds {
92 // The oldest request has already expired (this shouldn't happen with our counting logic)
93 self.request_times.push(now);
94 self.active_count.fetch_add(1, Ordering::SeqCst);
95 return Ok(());
96 }
97
98 // Wait until the oldest request expires
99 let wait_time_ms = self.window_milliseconds - elapsed + 1; // Add 1ms buffer
100
101 // Wait for the calculated time
102 time::sleep(Duration::from_millis(wait_time_ms)).await;
103 }
104 }
105
106 /// Tries to acquire permission to make a request without waiting
107 ///
108 /// # Returns
109 /// `true` if permission was granted, `false` if the rate limit would be exceeded
110 pub fn try_acquire(&self) -> bool {
111 let now = self.clock.raw() / 1_000_000; // Convert ns to ms
112
113 // First, try a fast path using the atomic counter
114 // This is an approximation and may reject some requests unnecessarily
115 let current_count = self.active_count.load(Ordering::Relaxed);
116 if current_count < self.max_requests_per_window {
117 // Optimistically assume we have capacity and try to increment
118 if self
119 .active_count
120 .compare_exchange(
121 current_count,
122 current_count + 1,
123 Ordering::SeqCst,
124 Ordering::Relaxed,
125 )
126 .is_ok()
127 {
128 // We successfully claimed a slot
129 self.request_times.push(now);
130 return true;
131 }
132 }
133
134 // Slow path: count active requests accurately
135 let mut active_count = 0;
136 let mut timestamps = Vec::new();
137
138 // Drain the queue temporarily to scan all timestamps
139 while let Some(timestamp) = self.request_times.pop() {
140 timestamps.push(timestamp);
141
142 // Check if this timestamp is still active
143 if now - timestamp <= self.window_milliseconds {
144 active_count += 1;
145 }
146 }
147
148 // Put all timestamps back in the queue
149 for timestamp in timestamps {
150 self.request_times.push(timestamp);
151 }
152
153 // Update the atomic counter with the accurate count
154 self.active_count.store(active_count, Ordering::SeqCst);
155
156 if active_count < self.max_requests_per_window {
157 // We have capacity
158 self.request_times.push(now);
159 self.active_count.fetch_add(1, Ordering::SeqCst);
160 true
161 } else {
162 // Rate limit would be exceeded
163 false
164 }
165 }
166
167 /// Marks a request as completed, useful for tracking successful API calls
168 pub fn mark_request(&self) {
169 let now = self.clock.raw() / 1_000_000; // Convert ns to ms
170
171 // Add the timestamp to the queue
172 self.request_times.push(now);
173
174 // Increment the active count
175 self.active_count.fetch_add(1, Ordering::SeqCst);
176
177 // Note: We don't need to remove old timestamps here since they'll be
178 // filtered out when counting active requests in acquire/try_acquire
179 }
180
181 /// Returns the number of remaining requests that can be made immediately
182 pub fn remaining_requests(&self) -> usize {
183 // Fast path: use the atomic counter for an approximate count
184 let current_count = self.active_count.load(Ordering::Relaxed);
185
186 // If we're well below the limit, we can return immediately
187 if current_count < self.max_requests_per_window / 2 {
188 return self.max_requests_per_window - current_count;
189 }
190
191 // Slow path: count active requests accurately
192 let now = self.clock.raw() / 1_000_000; // Convert ns to ms
193 let mut active_count = 0;
194 let mut timestamps = Vec::new();
195
196 // Drain the queue temporarily to scan all timestamps
197 while let Some(timestamp) = self.request_times.pop() {
198 timestamps.push(timestamp);
199
200 // Check if this timestamp is still active
201 if now - timestamp <= self.window_milliseconds {
202 active_count += 1;
203 }
204 }
205
206 // Put all timestamps back in the queue
207 for timestamp in timestamps {
208 self.request_times.push(timestamp);
209 }
210
211 // Update the atomic counter with the accurate count
212 self.active_count.store(active_count, Ordering::SeqCst);
213
214 self.max_requests_per_window.saturating_sub(active_count)
215 }
216
217 /// Returns the time in milliseconds until the next slot becomes available
218 pub fn time_until_next_available_ms(&self) -> u64 {
219 let now = self.clock.raw() / 1_000_000; // Convert ns to ms
220
221 // Fast path: check if we're below capacity using the atomic counter
222 let current_count = self.active_count.load(Ordering::Relaxed);
223 if current_count < self.max_requests_per_window {
224 // We have capacity now
225 return 0;
226 }
227
228 // Slow path: scan all timestamps to find the oldest active one
229 let mut active_count = 0;
230 let mut oldest_active_timestamp = now;
231 let mut timestamps = Vec::new();
232
233 // Drain the queue temporarily to scan all timestamps
234 while let Some(timestamp) = self.request_times.pop() {
235 timestamps.push(timestamp);
236
237 // Check if this timestamp is still active
238 if now - timestamp <= self.window_milliseconds {
239 active_count += 1;
240 if timestamp < oldest_active_timestamp {
241 oldest_active_timestamp = timestamp;
242 }
243 }
244 }
245
246 // Put all timestamps back in the queue
247 for timestamp in timestamps {
248 self.request_times.push(timestamp);
249 }
250
251 // Update the atomic counter with the accurate count
252 self.active_count.store(active_count, Ordering::SeqCst);
253
254 if active_count < self.max_requests_per_window {
255 // We have capacity now
256 return 0;
257 }
258
259 // Calculate time until the oldest active request expires
260 let elapsed = now - oldest_active_timestamp;
261 if elapsed >= self.window_milliseconds {
262 // The oldest request has already expired
263 return 0;
264 }
265
266 // Return time until the oldest request expires
267 self.window_milliseconds - elapsed
268 }
269}
270
271/// A weight-based rate limiter for more complex exchange rate limiting rules
272#[derive(Debug)]
273pub struct WeightBasedRateLimiter {
274 /// The maximum weight allowed in the window
275 max_weight: usize,
276
277 /// The time window in milliseconds
278 window_milliseconds: u64,
279
280 /// Request weights and timestamps for sliding window enforcement
281 /// Using a lock-free queue for better performance under contention
282 requests: Arc<SegQueue<(u64, usize)>>, // (timestamp_ms, weight)
283
284 /// Current total weight of active requests in the window
285 /// This is an approximation and may be higher than actual due to expired requests
286 active_weight: AtomicUsize,
287
288 /// High-precision clock
289 clock: Clock,
290}
291
292impl WeightBasedRateLimiter {
293 /// Creates a new weight-based rate limiter
294 ///
295 /// # Parameters
296 /// * `max_weight` - Maximum weight allowed per time window
297 /// * `window_milliseconds` - Time window in milliseconds
298 #[must_use]
299 pub fn new(max_weight: usize, window_milliseconds: u64) -> Self {
300 Self {
301 max_weight,
302 window_milliseconds,
303 requests: Arc::new(SegQueue::new()),
304 active_weight: AtomicUsize::new(0),
305 clock: Clock::new(),
306 }
307 }
308
309 /// Acquires permission to make a request with a specific weight, waiting if necessary
310 ///
311 /// # Parameters
312 /// * `weight` - The weight of the request
313 ///
314 /// # Returns
315 /// `Ok(())` when permission is granted, or an error if the wait was interrupted
316 pub async fn acquire(&self, weight: usize) -> Result<()> {
317 loop {
318 let now = self.clock.raw() / 1_000_000; // Convert ns to ms
319
320 // Count active weight and find the oldest timestamp
321 let mut active_weight = 0;
322 let mut oldest_active_timestamp = now;
323 let mut entries = Vec::new();
324
325 // Drain the queue temporarily to scan all entries
326 while let Some(entry) = self.requests.pop() {
327 entries.push(entry);
328
329 // Check if this entry is still active
330 let (timestamp, entry_weight) = entry;
331 if now - timestamp <= self.window_milliseconds {
332 active_weight += entry_weight;
333 if timestamp < oldest_active_timestamp {
334 oldest_active_timestamp = timestamp;
335 }
336 }
337 }
338
339 // Put all entries back in the queue
340 for entry in entries {
341 self.requests.push(entry);
342 }
343
344 // Update the atomic counter with the accurate weight
345 self.active_weight.store(active_weight, Ordering::SeqCst);
346
347 if active_weight + weight <= self.max_weight {
348 // We have capacity, add a new entry
349 self.requests.push((now, weight));
350 self.active_weight.fetch_add(weight, Ordering::SeqCst);
351 return Ok(());
352 }
353
354 // We need to wait until the oldest active request expires
355 let elapsed = now - oldest_active_timestamp;
356 if elapsed >= self.window_milliseconds {
357 // The oldest request has already expired (this shouldn't happen with our counting logic)
358 self.requests.push((now, weight));
359 self.active_weight.fetch_add(weight, Ordering::SeqCst);
360 return Ok(());
361 }
362
363 // Wait until the oldest request expires
364 let wait_time_ms = self.window_milliseconds - elapsed + 1; // Add 1ms buffer
365
366 // Wait for the calculated time
367 time::sleep(Duration::from_millis(wait_time_ms)).await;
368 }
369 }
370
371 /// Tries to acquire permission to make a request without waiting
372 ///
373 /// # Parameters
374 /// * `weight` - The weight of the request
375 ///
376 /// # Returns
377 /// `true` if permission was granted, `false` if the rate limit would be exceeded
378 pub fn try_acquire(&self, weight: usize) -> bool {
379 let now = self.clock.raw() / 1_000_000; // Convert ns to ms
380
381 // First, try a fast path using the atomic counter
382 // This is an approximation and may reject some requests unnecessarily
383 let current_weight = self.active_weight.load(Ordering::Relaxed);
384 if current_weight + weight <= self.max_weight {
385 // Optimistically assume we have capacity
386 // We can't use compare_exchange here because we're adding a variable amount (weight)
387 // Instead, we'll add the weight and then verify we didn't exceed the limit
388 let new_weight = self.active_weight.fetch_add(weight, Ordering::SeqCst) + weight;
389 if new_weight <= self.max_weight {
390 // We successfully claimed capacity
391 self.requests.push((now, weight));
392 return true;
393 }
394 // We exceeded the limit, rollback the weight addition
395 self.active_weight.fetch_sub(weight, Ordering::SeqCst);
396 }
397
398 // Slow path: count active weight accurately
399 let mut active_weight = 0;
400 let mut entries = Vec::new();
401
402 // Drain the queue temporarily to scan all entries
403 while let Some(entry) = self.requests.pop() {
404 entries.push(entry);
405
406 // Check if this entry is still active
407 let (timestamp, entry_weight) = entry;
408 if now - timestamp <= self.window_milliseconds {
409 active_weight += entry_weight;
410 }
411 }
412
413 // Put all entries back in the queue
414 for entry in entries {
415 self.requests.push(entry);
416 }
417
418 // Update the atomic counter with the accurate weight
419 self.active_weight.store(active_weight, Ordering::SeqCst);
420
421 if active_weight + weight <= self.max_weight {
422 // We have capacity
423 self.requests.push((now, weight));
424 self.active_weight.fetch_add(weight, Ordering::SeqCst);
425 true
426 } else {
427 // Rate limit would be exceeded
428 false
429 }
430 }
431
432 /// Marks a request with a specific weight as completed
433 pub fn mark_request(&self, weight: usize) {
434 let now = self.clock.raw() / 1_000_000; // Convert ns to ms
435
436 // Add the entry to the queue
437 self.requests.push((now, weight));
438
439 // Increment the active weight
440 self.active_weight.fetch_add(weight, Ordering::SeqCst);
441
442 // Note: We don't need to remove old entries here since they'll be
443 // filtered out when counting active weight in acquire/try_acquire
444 }
445
446 /// Returns the remaining weight available for immediate use
447 pub fn remaining_weight(&self) -> usize {
448 // Fast path: use the atomic counter for an approximate count
449 let current_weight = self.active_weight.load(Ordering::Relaxed);
450
451 // If we're well below the limit, we can return immediately
452 if current_weight < self.max_weight / 2 {
453 return self.max_weight - current_weight;
454 }
455
456 // Slow path: count active weight accurately
457 let now = self.clock.raw() / 1_000_000; // Convert ns to ms
458 let mut active_weight = 0;
459 let mut entries = Vec::new();
460
461 // Drain the queue temporarily to scan all entries
462 while let Some(entry) = self.requests.pop() {
463 entries.push(entry);
464
465 // Check if this entry is still active
466 let (timestamp, entry_weight) = entry;
467 if now - timestamp <= self.window_milliseconds {
468 active_weight += entry_weight;
469 }
470 }
471
472 // Put all entries back in the queue
473 for entry in entries {
474 self.requests.push(entry);
475 }
476
477 // Update the atomic counter with the accurate weight
478 self.active_weight.store(active_weight, Ordering::SeqCst);
479
480 self.max_weight.saturating_sub(active_weight)
481 }
482}