blob: 195264e6b1b71327a77d607d9182cb66f67707dd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.util;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.internal.A;
import static java.lang.Math.max;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
* The simplified version of Google Guava smooth rate limiter.<br><br>
*
* The primary feature of a rate limiter is its "stable rate", the maximum rate that is should
* allow at normal conditions. This is enforced by "throttling" incoming requests as needed, i.e.
* compute, for an incoming request, the appropriate throttle time, and make the calling thread
* wait as much.<br><br>
*
* The simplest way to maintain a rate of QPS is to keep the timestamp of the last granted
* request, and ensure that (1/QPS) seconds have elapsed since then. For example, for a rate of
* QPS=5 (5 tokens per second), if we ensure that a request isn't granted earlier than 200ms after
* the last one, then we achieve the intended rate. If a request comes and the last request was
* granted only 100ms ago, then we wait for another 100ms. At this rate, serving 15 fresh permits
* (i.e. for an acquire(15) request) naturally takes 3 seconds.<br><br>
*
* It is important to realize that such a limiter has a very superficial memory of the past:
* it only remembers the last request. if the limiter was unused for a long period of
* time, then a request arrived and was immediately granted? This limiter would immediately
* forget about that past underutilization.
*/
public class BasicRateLimiter {
/** Start timestamp. */
private final long startTime = System.nanoTime();
/** Synchronization mutex. */
private final Object mux = new Object();
/**
* The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
* per second has a stable interval of 200ms.
*/
private double stableIntervalMicros;
/**
* The time when the next request (no matter its size) will be granted. After granting a request,
* this is pushed further in the future. Large requests push this further than small requests.
*/
private long nextFreeTicketMicros;
/**
* The flag indicates that the rate is not limited.
*/
private volatile boolean unlimited;
/**
* @param permitsPerSecond Estimated number of permits per second.
*/
public BasicRateLimiter(double permitsPerSecond) {
setRate(permitsPerSecond);
}
/**
* Updates the stable rate.
*
* @param permitsPerSecond The new stable rate of this {@code RateLimiter}, set {@code 0} for unlimited rate.
* @throws IllegalArgumentException If {@code permitsPerSecond} is negative or zero.
*/
public void setRate(double permitsPerSecond) {
A.ensure(permitsPerSecond >= 0, "Requested permits (" + permitsPerSecond + ") must be non-negative.");
if (unlimited = (permitsPerSecond == 0))
return;
synchronized (mux) {
resync();
stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
}
}
/**
* @return The stable rate as {@code permits per seconds} ({@code 0} means that the rate is unlimited).
*/
public double getRate() {
if (unlimited)
return 0;
synchronized (mux) {
return SECONDS.toMicros(1L) / stableIntervalMicros;
}
}
/**
* Acquires the given number of permits from this {@code RateLimiter}, blocking until the request
* can be granted. Tells the amount of time slept, if any.
*
* @param permits The number of permits to acquire.
* @throws IllegalArgumentException If the requested number of permits is negative or zero.
*/
public void acquire(int permits) throws IgniteInterruptedCheckedException {
if (unlimited)
return;
long microsToWait = reserve(permits);
try {
MICROSECONDS.sleep(microsToWait);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IgniteInterruptedCheckedException(e);
}
}
/**
* Reserves the given number of permits for future use.
*
* @param permits The number of permits.
* @return Time in microseconds to wait until the resource can be acquired, never negative.
*/
private long reserve(int permits) {
A.ensure(permits > 0, "Requested permits (" + permits + ") must be positive");
synchronized (mux) {
long nowMicros = resync();
long momentAvailable = nextFreeTicketMicros;
nextFreeTicketMicros = momentAvailable + (long)(permits * stableIntervalMicros);
return max(momentAvailable - nowMicros, 0);
}
}
/**
* Updates {@code nextFreeTicketMicros} based on the current time.
*
* @return Time passed (since start) in microseconds.
*/
private long resync() {
long passed = MICROSECONDS.convert(System.nanoTime() - startTime, NANOSECONDS);
// if nextFreeTicket is in the past, resync to now
if (passed > nextFreeTicketMicros)
nextFreeTicketMicros = passed;
return passed;
}
}