blob: d76a77bd49a8fd908a3e0630a58285ebb35fc931 [file] [log] [blame]
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use tokio::time::sleep;
/// Thread-safe rate limiter using linger-based algorithm
pub struct RateLimiter {
bytes_per_second: u64,
last_operation: Arc<Mutex<Instant>>,
}
impl RateLimiter {
pub fn new(bytes_per_second: u64) -> Self {
Self {
bytes_per_second,
last_operation: Arc::new(Mutex::new(Instant::now())),
}
}
/// Wait for the required time based on the desired throughput rate
pub async fn wait_and_consume(&self, bytes: u64) {
let now = Instant::now();
let mut last_op = self.last_operation.lock().await;
// Calculate time per byte in seconds
let time_per_byte = 1.0 / self.bytes_per_second as f64;
// Calculate target duration for these bytes
let target_duration = Duration::from_secs_f64(bytes as f64 * time_per_byte);
// Calculate how long it's been since last operation
let elapsed = now.duration_since(*last_op);
// If we need to wait longer, sleep for the remaining time
if elapsed < target_duration {
let sleep_duration = target_duration - elapsed;
// Update last_op before sleeping to account for operation time
*last_op = now + sleep_duration;
drop(last_op); // Release the lock before sleeping
sleep(sleep_duration).await;
} else {
// If we don't need to wait, just update the timestamp
*last_op = now;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_rate_limiter() {
let limiter = RateLimiter::new(1000); // 1000 bytes per second
let start = Instant::now();
// Try to send 100 bytes 5 times
for _ in 0..5 {
limiter.wait_and_consume(100).await;
}
// Should take approximately 0.5 seconds (500ms) to send 500 bytes at 1000 bytes/sec
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(450)); // Allow some wiggle room
assert!(elapsed <= Duration::from_millis(550));
}
}