rusty_ems/
throttle.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::time::Duration;
4
5use anyhow::Result;
6use crossbeam::queue::SegQueue;
7use quanta::Clock;
8use tokio::time;
9
10/// A rate limiter that enforces rate limits for exchange API calls
11#[derive(Debug)]
12pub struct RateLimiter {
13    /// The maximum number of requests allowed in the window
14    max_requests_per_window: usize,
15
16    /// The time window in milliseconds
17    window_milliseconds: u64,
18
19    /// Request timestamps for sliding window enforcement
20    /// Using a lock-free queue for better performance under contention
21    request_times: Arc<SegQueue<u64>>,
22
23    /// Current number of active requests in the window
24    /// This is an approximation and may be higher than actual due to expired requests
25    active_count: AtomicUsize,
26
27    /// High-precision clock
28    clock: Clock,
29}
30
31impl RateLimiter {
32    /// Creates a new rate limiter
33    ///
34    /// # Parameters
35    /// * `max_requests_per_window` - Maximum number of requests allowed per time window
36    /// * `window_milliseconds` - Time window in milliseconds
37    #[must_use]
38    pub fn new(max_requests_per_window: usize, window_milliseconds: u64) -> Self {
39        Self {
40            max_requests_per_window,
41            window_milliseconds,
42            request_times: Arc::new(SegQueue::new()),
43            active_count: AtomicUsize::new(0),
44            clock: Clock::new(),
45        }
46    }
47
48    /// Acquires permission to make a request, waiting if necessary
49    ///
50    /// # Returns
51    /// `Ok(())` when permission is granted, or an error if the wait was interrupted
52    pub async fn acquire(&self) -> Result<()> {
53        loop {
54            let now = self.clock.raw() / 1_000_000; // Convert ns to ms
55
56            // Count active requests and find the oldest timestamp
57            let mut active_count = 0;
58            let mut oldest_active_timestamp = now;
59            let mut timestamps = Vec::new();
60
61            // Drain the queue temporarily to scan all timestamps
62            while let Some(timestamp) = self.request_times.pop() {
63                timestamps.push(timestamp);
64
65                // Check if this timestamp is still active
66                if now - timestamp <= self.window_milliseconds {
67                    active_count += 1;
68                    if timestamp < oldest_active_timestamp {
69                        oldest_active_timestamp = timestamp;
70                    }
71                }
72            }
73
74            // Put all timestamps back in the queue
75            for timestamp in timestamps {
76                self.request_times.push(timestamp);
77            }
78
79            // Update the atomic counter with the accurate count
80            self.active_count.store(active_count, Ordering::SeqCst);
81
82            if active_count < self.max_requests_per_window {
83                // We have capacity, add a new timestamp
84                self.request_times.push(now);
85                self.active_count.fetch_add(1, Ordering::SeqCst);
86                return Ok(());
87            }
88
89            // We need to wait until the oldest active request expires
90            let elapsed = now - oldest_active_timestamp;
91            if elapsed >= self.window_milliseconds {
92                // The oldest request has already expired (this shouldn't happen with our counting logic)
93                self.request_times.push(now);
94                self.active_count.fetch_add(1, Ordering::SeqCst);
95                return Ok(());
96            }
97
98            // Wait until the oldest request expires
99            let wait_time_ms = self.window_milliseconds - elapsed + 1; // Add 1ms buffer
100
101            // Wait for the calculated time
102            time::sleep(Duration::from_millis(wait_time_ms)).await;
103        }
104    }
105
106    /// Tries to acquire permission to make a request without waiting
107    ///
108    /// # Returns
109    /// `true` if permission was granted, `false` if the rate limit would be exceeded
110    pub fn try_acquire(&self) -> bool {
111        let now = self.clock.raw() / 1_000_000; // Convert ns to ms
112
113        // First, try a fast path using the atomic counter
114        // This is an approximation and may reject some requests unnecessarily
115        let current_count = self.active_count.load(Ordering::Relaxed);
116        if current_count < self.max_requests_per_window {
117            // Optimistically assume we have capacity and try to increment
118            if self
119                .active_count
120                .compare_exchange(
121                    current_count,
122                    current_count + 1,
123                    Ordering::SeqCst,
124                    Ordering::Relaxed,
125                )
126                .is_ok()
127            {
128                // We successfully claimed a slot
129                self.request_times.push(now);
130                return true;
131            }
132        }
133
134        // Slow path: count active requests accurately
135        let mut active_count = 0;
136        let mut timestamps = Vec::new();
137
138        // Drain the queue temporarily to scan all timestamps
139        while let Some(timestamp) = self.request_times.pop() {
140            timestamps.push(timestamp);
141
142            // Check if this timestamp is still active
143            if now - timestamp <= self.window_milliseconds {
144                active_count += 1;
145            }
146        }
147
148        // Put all timestamps back in the queue
149        for timestamp in timestamps {
150            self.request_times.push(timestamp);
151        }
152
153        // Update the atomic counter with the accurate count
154        self.active_count.store(active_count, Ordering::SeqCst);
155
156        if active_count < self.max_requests_per_window {
157            // We have capacity
158            self.request_times.push(now);
159            self.active_count.fetch_add(1, Ordering::SeqCst);
160            true
161        } else {
162            // Rate limit would be exceeded
163            false
164        }
165    }
166
167    /// Marks a request as completed, useful for tracking successful API calls
168    pub fn mark_request(&self) {
169        let now = self.clock.raw() / 1_000_000; // Convert ns to ms
170
171        // Add the timestamp to the queue
172        self.request_times.push(now);
173
174        // Increment the active count
175        self.active_count.fetch_add(1, Ordering::SeqCst);
176
177        // Note: We don't need to remove old timestamps here since they'll be
178        // filtered out when counting active requests in acquire/try_acquire
179    }
180
181    /// Returns the number of remaining requests that can be made immediately
182    pub fn remaining_requests(&self) -> usize {
183        // Fast path: use the atomic counter for an approximate count
184        let current_count = self.active_count.load(Ordering::Relaxed);
185
186        // If we're well below the limit, we can return immediately
187        if current_count < self.max_requests_per_window / 2 {
188            return self.max_requests_per_window - current_count;
189        }
190
191        // Slow path: count active requests accurately
192        let now = self.clock.raw() / 1_000_000; // Convert ns to ms
193        let mut active_count = 0;
194        let mut timestamps = Vec::new();
195
196        // Drain the queue temporarily to scan all timestamps
197        while let Some(timestamp) = self.request_times.pop() {
198            timestamps.push(timestamp);
199
200            // Check if this timestamp is still active
201            if now - timestamp <= self.window_milliseconds {
202                active_count += 1;
203            }
204        }
205
206        // Put all timestamps back in the queue
207        for timestamp in timestamps {
208            self.request_times.push(timestamp);
209        }
210
211        // Update the atomic counter with the accurate count
212        self.active_count.store(active_count, Ordering::SeqCst);
213
214        self.max_requests_per_window.saturating_sub(active_count)
215    }
216
217    /// Returns the time in milliseconds until the next slot becomes available
218    pub fn time_until_next_available_ms(&self) -> u64 {
219        let now = self.clock.raw() / 1_000_000; // Convert ns to ms
220
221        // Fast path: check if we're below capacity using the atomic counter
222        let current_count = self.active_count.load(Ordering::Relaxed);
223        if current_count < self.max_requests_per_window {
224            // We have capacity now
225            return 0;
226        }
227
228        // Slow path: scan all timestamps to find the oldest active one
229        let mut active_count = 0;
230        let mut oldest_active_timestamp = now;
231        let mut timestamps = Vec::new();
232
233        // Drain the queue temporarily to scan all timestamps
234        while let Some(timestamp) = self.request_times.pop() {
235            timestamps.push(timestamp);
236
237            // Check if this timestamp is still active
238            if now - timestamp <= self.window_milliseconds {
239                active_count += 1;
240                if timestamp < oldest_active_timestamp {
241                    oldest_active_timestamp = timestamp;
242                }
243            }
244        }
245
246        // Put all timestamps back in the queue
247        for timestamp in timestamps {
248            self.request_times.push(timestamp);
249        }
250
251        // Update the atomic counter with the accurate count
252        self.active_count.store(active_count, Ordering::SeqCst);
253
254        if active_count < self.max_requests_per_window {
255            // We have capacity now
256            return 0;
257        }
258
259        // Calculate time until the oldest active request expires
260        let elapsed = now - oldest_active_timestamp;
261        if elapsed >= self.window_milliseconds {
262            // The oldest request has already expired
263            return 0;
264        }
265
266        // Return time until the oldest request expires
267        self.window_milliseconds - elapsed
268    }
269}
270
271/// A weight-based rate limiter for more complex exchange rate limiting rules
272#[derive(Debug)]
273pub struct WeightBasedRateLimiter {
274    /// The maximum weight allowed in the window
275    max_weight: usize,
276
277    /// The time window in milliseconds
278    window_milliseconds: u64,
279
280    /// Request weights and timestamps for sliding window enforcement
281    /// Using a lock-free queue for better performance under contention
282    requests: Arc<SegQueue<(u64, usize)>>, // (timestamp_ms, weight)
283
284    /// Current total weight of active requests in the window
285    /// This is an approximation and may be higher than actual due to expired requests
286    active_weight: AtomicUsize,
287
288    /// High-precision clock
289    clock: Clock,
290}
291
292impl WeightBasedRateLimiter {
293    /// Creates a new weight-based rate limiter
294    ///
295    /// # Parameters
296    /// * `max_weight` - Maximum weight allowed per time window
297    /// * `window_milliseconds` - Time window in milliseconds
298    #[must_use]
299    pub fn new(max_weight: usize, window_milliseconds: u64) -> Self {
300        Self {
301            max_weight,
302            window_milliseconds,
303            requests: Arc::new(SegQueue::new()),
304            active_weight: AtomicUsize::new(0),
305            clock: Clock::new(),
306        }
307    }
308
309    /// Acquires permission to make a request with a specific weight, waiting if necessary
310    ///
311    /// # Parameters
312    /// * `weight` - The weight of the request
313    ///
314    /// # Returns
315    /// `Ok(())` when permission is granted, or an error if the wait was interrupted
316    pub async fn acquire(&self, weight: usize) -> Result<()> {
317        loop {
318            let now = self.clock.raw() / 1_000_000; // Convert ns to ms
319
320            // Count active weight and find the oldest timestamp
321            let mut active_weight = 0;
322            let mut oldest_active_timestamp = now;
323            let mut entries = Vec::new();
324
325            // Drain the queue temporarily to scan all entries
326            while let Some(entry) = self.requests.pop() {
327                entries.push(entry);
328
329                // Check if this entry is still active
330                let (timestamp, entry_weight) = entry;
331                if now - timestamp <= self.window_milliseconds {
332                    active_weight += entry_weight;
333                    if timestamp < oldest_active_timestamp {
334                        oldest_active_timestamp = timestamp;
335                    }
336                }
337            }
338
339            // Put all entries back in the queue
340            for entry in entries {
341                self.requests.push(entry);
342            }
343
344            // Update the atomic counter with the accurate weight
345            self.active_weight.store(active_weight, Ordering::SeqCst);
346
347            if active_weight + weight <= self.max_weight {
348                // We have capacity, add a new entry
349                self.requests.push((now, weight));
350                self.active_weight.fetch_add(weight, Ordering::SeqCst);
351                return Ok(());
352            }
353
354            // We need to wait until the oldest active request expires
355            let elapsed = now - oldest_active_timestamp;
356            if elapsed >= self.window_milliseconds {
357                // The oldest request has already expired (this shouldn't happen with our counting logic)
358                self.requests.push((now, weight));
359                self.active_weight.fetch_add(weight, Ordering::SeqCst);
360                return Ok(());
361            }
362
363            // Wait until the oldest request expires
364            let wait_time_ms = self.window_milliseconds - elapsed + 1; // Add 1ms buffer
365
366            // Wait for the calculated time
367            time::sleep(Duration::from_millis(wait_time_ms)).await;
368        }
369    }
370
371    /// Tries to acquire permission to make a request without waiting
372    ///
373    /// # Parameters
374    /// * `weight` - The weight of the request
375    ///
376    /// # Returns
377    /// `true` if permission was granted, `false` if the rate limit would be exceeded
378    pub fn try_acquire(&self, weight: usize) -> bool {
379        let now = self.clock.raw() / 1_000_000; // Convert ns to ms
380
381        // First, try a fast path using the atomic counter
382        // This is an approximation and may reject some requests unnecessarily
383        let current_weight = self.active_weight.load(Ordering::Relaxed);
384        if current_weight + weight <= self.max_weight {
385            // Optimistically assume we have capacity
386            // We can't use compare_exchange here because we're adding a variable amount (weight)
387            // Instead, we'll add the weight and then verify we didn't exceed the limit
388            let new_weight = self.active_weight.fetch_add(weight, Ordering::SeqCst) + weight;
389            if new_weight <= self.max_weight {
390                // We successfully claimed capacity
391                self.requests.push((now, weight));
392                return true;
393            }
394            // We exceeded the limit, rollback the weight addition
395            self.active_weight.fetch_sub(weight, Ordering::SeqCst);
396        }
397
398        // Slow path: count active weight accurately
399        let mut active_weight = 0;
400        let mut entries = Vec::new();
401
402        // Drain the queue temporarily to scan all entries
403        while let Some(entry) = self.requests.pop() {
404            entries.push(entry);
405
406            // Check if this entry is still active
407            let (timestamp, entry_weight) = entry;
408            if now - timestamp <= self.window_milliseconds {
409                active_weight += entry_weight;
410            }
411        }
412
413        // Put all entries back in the queue
414        for entry in entries {
415            self.requests.push(entry);
416        }
417
418        // Update the atomic counter with the accurate weight
419        self.active_weight.store(active_weight, Ordering::SeqCst);
420
421        if active_weight + weight <= self.max_weight {
422            // We have capacity
423            self.requests.push((now, weight));
424            self.active_weight.fetch_add(weight, Ordering::SeqCst);
425            true
426        } else {
427            // Rate limit would be exceeded
428            false
429        }
430    }
431
432    /// Marks a request with a specific weight as completed
433    pub fn mark_request(&self, weight: usize) {
434        let now = self.clock.raw() / 1_000_000; // Convert ns to ms
435
436        // Add the entry to the queue
437        self.requests.push((now, weight));
438
439        // Increment the active weight
440        self.active_weight.fetch_add(weight, Ordering::SeqCst);
441
442        // Note: We don't need to remove old entries here since they'll be
443        // filtered out when counting active weight in acquire/try_acquire
444    }
445
446    /// Returns the remaining weight available for immediate use
447    pub fn remaining_weight(&self) -> usize {
448        // Fast path: use the atomic counter for an approximate count
449        let current_weight = self.active_weight.load(Ordering::Relaxed);
450
451        // If we're well below the limit, we can return immediately
452        if current_weight < self.max_weight / 2 {
453            return self.max_weight - current_weight;
454        }
455
456        // Slow path: count active weight accurately
457        let now = self.clock.raw() / 1_000_000; // Convert ns to ms
458        let mut active_weight = 0;
459        let mut entries = Vec::new();
460
461        // Drain the queue temporarily to scan all entries
462        while let Some(entry) = self.requests.pop() {
463            entries.push(entry);
464
465            // Check if this entry is still active
466            let (timestamp, entry_weight) = entry;
467            if now - timestamp <= self.window_milliseconds {
468                active_weight += entry_weight;
469            }
470        }
471
472        // Put all entries back in the queue
473        for entry in entries {
474            self.requests.push(entry);
475        }
476
477        // Update the atomic counter with the accurate weight
478        self.active_weight.store(active_weight, Ordering::SeqCst);
479
480        self.max_weight.saturating_sub(active_weight)
481    }
482}