| /* |
| * Copyright (c) Facebook, Inc. and its affiliates. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #pragma once |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <chrono> |
| #include <thread> |
| #include <boost/optional.hpp> |
| #include "utils/ports.h" |
| |
| namespace folly { |
| |
| /** |
| * Thread-safe (atomic) token bucket implementation. |
| * |
| * A token bucket (http://en.wikipedia.org/wiki/Token_bucket) models a stream |
| * of events with an average rate and some amount of burstiness. The canonical |
| * example is a packet switched network: the network can accept some number of |
| * bytes per second and the bytes come in finite packets (bursts). A token |
| * bucket stores up to a fixed number of tokens (the burst size). Some number |
| * of tokens are removed when an event occurs. The tokens are replenished at a |
| * fixed rate. Failure to allocate tokens implies resource is unavailable and |
| * caller needs to implement its own retry mechanism. For simple cases where |
| * caller is okay with a FIFO starvation-free scheduling behavior, there are |
| * also APIs to 'borrow' from the future effectively assigning a start time to |
| * the caller when it should proceed with using the resource. It is also |
| * possible to 'return' previously allocated tokens to make them available to |
| * other users. Returns in excess of burstSize are considered expired and |
| * will not be available to later callers. |
| * |
| * This implementation records the last time it was updated. This allows the |
| * token bucket to add tokens "just in time" when tokens are requested. |
| * |
| * The "dynamic" base variant allows the token generation rate and maximum |
| * burst size to change with every token consumption. |
| * |
| * @tparam Clock Clock type, must be steady i.e. monotonic. |
| */ |
| template <typename Clock = std::chrono::steady_clock> |
| class BasicDynamicTokenBucket |
| { |
| static_assert(Clock::is_steady, "clock must be steady"); |
| |
| public: |
| /** |
| * Constructor. |
| * |
| * @param zeroTime Initial time at which to consider the token bucket |
| * starting to fill. Defaults to 0, so by default token |
| * buckets are "full" after construction. |
| */ |
| explicit BasicDynamicTokenBucket(double zeroTime = 0) noexcept : zeroTime_(zeroTime) {} |
| |
| /** |
| * Copy constructor. |
| * |
| * Thread-safe. (Copy constructors of derived classes may not be thread-safe |
| * however.) |
| */ |
| BasicDynamicTokenBucket(const BasicDynamicTokenBucket &other) noexcept |
| : zeroTime_(other.zeroTime_.load()) |
| { |
| } |
| |
| /** |
| * Copy-assignment operator. |
| * |
| * Warning: not thread safe for the object being assigned to (including |
| * self-assignment). Thread-safe for the other object. |
| */ |
| BasicDynamicTokenBucket &operator=(const BasicDynamicTokenBucket &other) noexcept |
| { |
| zeroTime_ = other.zeroTime_.load(); |
| return *this; |
| } |
| |
| /** |
| * Re-initialize token bucket. |
| * |
| * Thread-safe. |
| * |
| * @param zeroTime Initial time at which to consider the token bucket |
| * starting to fill. Defaults to 0, so by default token |
| * bucket is reset to "full". |
| */ |
| void reset(double zeroTime = 0) noexcept { zeroTime_ = zeroTime; } |
| |
| /** |
| * Returns the current time in seconds since Epoch. |
| */ |
| static double defaultClockNow() noexcept |
| { |
| auto const now = Clock::now().time_since_epoch(); |
| return std::chrono::duration<double>(now).count(); |
| } |
| |
| /** |
| * Attempts to consume some number of tokens. Tokens are first added to the |
| * bucket based on the time elapsed since the last attempt to consume tokens. |
| * Note: Attempts to consume more tokens than the burst size will always |
| * fail. |
| * |
| * Thread-safe. |
| * |
| * @param toConsume The number of tokens to consume. |
| * @param rate Number of tokens to generate per second. |
| * @param burstSize Maximum burst size. Must be greater than 0. |
| * @param nowInSeconds Current time in seconds. Should be monotonically |
| * increasing from the nowInSeconds specified in |
| * this token bucket's constructor. |
| * @return True if the rate limit check passed, false otherwise. |
| */ |
| bool consume(double toConsume, |
| double rate, |
| double burstSize, |
| double nowInSeconds = defaultClockNow()) |
| { |
| assert(rate > 0); |
| assert(burstSize > 0); |
| |
| if (nowInSeconds <= zeroTime_.load()) { |
| return 0; |
| } |
| |
| return consumeImpl(rate, burstSize, nowInSeconds, [toConsume](double &tokens) { |
| if (tokens < toConsume) { |
| return false; |
| } |
| tokens -= toConsume; |
| return true; |
| }); |
| } |
| |
| /** |
| * Similar to consume, but always consumes some number of tokens. If the |
| * bucket contains enough tokens - consumes toConsume tokens. Otherwise the |
| * bucket is drained. |
| * |
| * Thread-safe. |
| * |
| * @param toConsume The number of tokens to consume. |
| * @param rate Number of tokens to generate per second. |
| * @param burstSize Maximum burst size. Must be greater than 0. |
| * @param nowInSeconds Current time in seconds. Should be monotonically |
| * increasing from the nowInSeconds specified in |
| * this token bucket's constructor. |
| * @return number of tokens that were consumed. |
| */ |
| double consumeOrDrain(double toConsume, |
| double rate, |
| double burstSize, |
| double nowInSeconds = defaultClockNow()) |
| { |
| assert(rate > 0); |
| assert(burstSize > 0); |
| |
| if (nowInSeconds <= zeroTime_.load()) { |
| return 0; |
| } |
| |
| double consumed; |
| consumeImpl(rate, burstSize, nowInSeconds, [&consumed, toConsume](double &tokens) { |
| if (tokens < toConsume) { |
| consumed = tokens; |
| tokens = 0.0; |
| } else { |
| consumed = toConsume; |
| tokens -= toConsume; |
| } |
| return true; |
| }); |
| return consumed; |
| } |
| |
| /** |
| * Return extra tokens back to the bucket. This will move the zeroTime_ |
| * value back based on the rate. |
| * |
| * Thread-safe. |
| */ |
| void returnTokens(double tokensToReturn, double rate) |
| { |
| assert(rate > 0); |
| assert(tokensToReturn > 0); |
| |
| returnTokensImpl(tokensToReturn, rate); |
| } |
| |
| /** |
| * Like consumeOrDrain but the call will always satisfy the asked for count. |
| * It does so by borrowing tokens from the future (zeroTime_ will move |
| * forward) if the currently available count isn't sufficient. |
| * |
| * Returns a folly::Optional<double>. The optional wont be set if the request |
| * cannot be satisfied: only case is when it is larger than burstSize. The |
| * value of the optional is a double indicating the time in seconds that the |
| * caller needs to wait at which the reservation becomes valid. The caller |
| * could simply sleep for the returned duration to smooth out the allocation |
| * to match the rate limiter or do some other computation in the meantime. In |
| * any case, any regular consume or consumeOrDrain calls will fail to allocate |
| * any tokens until the future time is reached. |
| * |
| * Note: It is assumed the caller will not ask for a very large count nor use |
| * it immediately (if not waiting inline) as that would break the burst |
| * prevention the limiter is meant to be used for. |
| * |
| * Thread-safe. |
| */ |
| boost::optional<double> consumeWithBorrowNonBlocking(double toConsume, |
| double rate, |
| double burstSize, |
| double nowInSeconds = defaultClockNow()) |
| { |
| assert(rate > 0); |
| assert(burstSize > 0); |
| |
| if (burstSize < toConsume) { |
| // boost::none |
| // if we use boost::none here, some compilers will generate warning |
| // that's actually a false positive of "-Wmaybe-uninitialized". |
| // https://www.boost.org/doc/libs/1_65_1/libs/optional/doc/html/boost_optional/tutorial/gotchas/false_positive_with__wmaybe_uninitialized.html |
| return boost::make_optional(false, double()); |
| } |
| |
| while (toConsume > 0) { |
| double consumed = consumeOrDrain(toConsume, rate, burstSize, nowInSeconds); |
| if (consumed > 0) { |
| toConsume -= consumed; |
| } else { |
| double zeroTimeNew = returnTokensImpl(-toConsume, rate); |
| double napTime = std::max(0.0, zeroTimeNew - nowInSeconds); |
| return boost::optional<double>(napTime); |
| } |
| } |
| return boost::optional<double>(0); |
| } |
| |
| /** |
| * Convenience wrapper around non-blocking borrow to sleep inline until |
| * reservation is valid. |
| */ |
| bool consumeWithBorrowAndWait(double toConsume, |
| double rate, |
| double burstSize, |
| double nowInSeconds = defaultClockNow()) |
| { |
| auto res = consumeWithBorrowNonBlocking(toConsume, rate, burstSize, nowInSeconds); |
| if (res.get_value_or(0) > 0) { |
| int64_t napUSec = res.get() * 1000000; |
| std::this_thread::sleep_for(std::chrono::microseconds(napUSec)); |
| } |
| return res.is_initialized(); |
| } |
| |
| /** |
| * Returns the number of tokens currently available. |
| * |
| * Thread-safe (but returned value may immediately be outdated). |
| */ |
| double available(double rate, double burstSize, double nowInSeconds = defaultClockNow()) const |
| noexcept |
| { |
| assert(rate > 0); |
| assert(burstSize > 0); |
| |
| double zt = this->zeroTime_.load(); |
| if (nowInSeconds <= zt) { |
| return 0; |
| } |
| return std::min((nowInSeconds - zt) * rate, burstSize); |
| } |
| |
| private: |
| template <typename TCallback> |
| bool consumeImpl(double rate, double burstSize, double nowInSeconds, const TCallback &callback) |
| { |
| auto zeroTimeOld = zeroTime_.load(); |
| double zeroTimeNew; |
| do { |
| auto tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize); |
| if (!callback(tokens)) { |
| return false; |
| } |
| zeroTimeNew = nowInSeconds - tokens / rate; |
| } while (dsn_unlikely(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew))); |
| |
| return true; |
| } |
| |
| /** |
| * Adjust zeroTime based on rate and tokenCount and return the new value of |
| * zeroTime_. Note: Token count can be negative to move the zeroTime_ value |
| * into the future. |
| */ |
| double returnTokensImpl(double tokenCount, double rate) |
| { |
| auto zeroTimeOld = zeroTime_.load(); |
| double zeroTimeNew; |
| do { |
| zeroTimeNew = zeroTimeOld - tokenCount / rate; |
| } while (dsn_unlikely(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew))); |
| return zeroTimeNew; |
| } |
| |
| std::atomic<double> zeroTime_; |
| }; |
| |
| /** |
| * Specialization of BasicDynamicTokenBucket with a fixed token |
| * generation rate and a fixed maximum burst size. |
| */ |
| template <typename Clock = std::chrono::steady_clock> |
| class BasicTokenBucket |
| { |
| static_assert(Clock::is_steady, "clock must be steady"); |
| |
| private: |
| using Impl = BasicDynamicTokenBucket<Clock>; |
| |
| public: |
| /** |
| * Construct a token bucket with a specific maximum rate and burst size. |
| * |
| * @param genRate Number of tokens to generate per second. |
| * @param burstSize Maximum burst size. Must be greater than 0. |
| * @param zeroTime Initial time at which to consider the token bucket |
| * starting to fill. Defaults to 0, so by default token |
| * bucket is "full" after construction. |
| */ |
| BasicTokenBucket(double genRate, double burstSize, double zeroTime = 0) noexcept |
| : tokenBucket_(zeroTime), rate_(genRate), burstSize_(burstSize) |
| { |
| assert(rate_ > 0); |
| assert(burstSize_ > 0); |
| } |
| |
| /** |
| * Copy constructor. |
| * |
| * Warning: not thread safe! |
| */ |
| BasicTokenBucket(const BasicTokenBucket &other) noexcept = default; |
| |
| /** |
| * Copy-assignment operator. |
| * |
| * Warning: not thread safe! |
| */ |
| BasicTokenBucket &operator=(const BasicTokenBucket &other) noexcept = default; |
| |
| /** |
| * Returns the current time in seconds since Epoch. |
| */ |
| static double defaultClockNow() noexcept(noexcept(Impl::defaultClockNow())) |
| { |
| return Impl::defaultClockNow(); |
| } |
| |
| /** |
| * Change rate and burst size. |
| * |
| * Warning: not thread safe! |
| * |
| * @param genRate Number of tokens to generate per second. |
| * @param burstSize Maximum burst size. Must be greater than 0. |
| * @param nowInSeconds Current time in seconds. Should be monotonically |
| * increasing from the nowInSeconds specified in |
| * this token bucket's constructor. |
| */ |
| void reset(double genRate, double burstSize, double nowInSeconds = defaultClockNow()) noexcept |
| { |
| assert(genRate > 0); |
| assert(burstSize > 0); |
| const double availTokens = available(nowInSeconds); |
| rate_ = genRate; |
| burstSize_ = burstSize; |
| setCapacity(availTokens, nowInSeconds); |
| } |
| |
| /** |
| * Change number of tokens in bucket. |
| * |
| * Warning: not thread safe! |
| * |
| * @param tokens Desired number of tokens in bucket after the call. |
| * @param nowInSeconds Current time in seconds. Should be monotonically |
| * increasing from the nowInSeconds specified in |
| * this token bucket's constructor. |
| */ |
| void setCapacity(double tokens, double nowInSeconds) noexcept |
| { |
| tokenBucket_.reset(nowInSeconds - tokens / rate_); |
| } |
| |
| /** |
| * Attempts to consume some number of tokens. Tokens are first added to the |
| * bucket based on the time elapsed since the last attempt to consume tokens. |
| * Note: Attempts to consume more tokens than the burst size will always |
| * fail. |
| * |
| * Thread-safe. |
| * |
| * @param toConsume The number of tokens to consume. |
| * @param nowInSeconds Current time in seconds. Should be monotonically |
| * increasing from the nowInSeconds specified in |
| * this token bucket's constructor. |
| * @return True if the rate limit check passed, false otherwise. |
| */ |
| bool consume(double toConsume, double nowInSeconds = defaultClockNow()) |
| { |
| return tokenBucket_.consume(toConsume, rate_, burstSize_, nowInSeconds); |
| } |
| |
| /** |
| * Similar to consume, but always consumes some number of tokens. If the |
| * bucket contains enough tokens - consumes toConsume tokens. Otherwise the |
| * bucket is drained. |
| * |
| * Thread-safe. |
| * |
| * @param toConsume The number of tokens to consume. |
| * @param nowInSeconds Current time in seconds. Should be monotonically |
| * increasing from the nowInSeconds specified in |
| * this token bucket's constructor. |
| * @return number of tokens that were consumed. |
| */ |
| double consumeOrDrain(double toConsume, double nowInSeconds = defaultClockNow()) |
| { |
| return tokenBucket_.consumeOrDrain(toConsume, rate_, burstSize_, nowInSeconds); |
| } |
| |
| /** |
| * Returns extra token back to the bucket. |
| */ |
| void returnTokens(double tokensToReturn) |
| { |
| return tokenBucket_.returnTokens(tokensToReturn, rate_); |
| } |
| |
| /** |
| * Reserve tokens and return time to wait for in order for the reservation to |
| * be compatible with the bucket configuration. |
| */ |
| boost::optional<double> consumeWithBorrowNonBlocking(double toConsume, |
| double nowInSeconds = defaultClockNow()) |
| { |
| return tokenBucket_.consumeWithBorrowNonBlocking( |
| toConsume, rate_, burstSize_, nowInSeconds); |
| } |
| |
| /** |
| * Reserve tokens. Blocks if need be until reservation is satisfied. |
| */ |
| bool consumeWithBorrowAndWait(double toConsume, double nowInSeconds = defaultClockNow()) |
| { |
| return tokenBucket_.consumeWithBorrowAndWait(toConsume, rate_, burstSize_, nowInSeconds); |
| } |
| |
| /** |
| * Returns the number of tokens currently available. |
| * |
| * Thread-safe (but returned value may immediately be outdated). |
| */ |
| double available(double nowInSeconds = defaultClockNow()) const |
| { |
| return tokenBucket_.available(rate_, burstSize_, nowInSeconds); |
| } |
| |
| /** |
| * Returns the number of tokens generated per second. |
| * |
| * Thread-safe (but returned value may immediately be outdated). |
| */ |
| double rate() const noexcept { return rate_; } |
| |
| /** |
| * Returns the maximum burst size. |
| * |
| * Thread-safe (but returned value may immediately be outdated). |
| */ |
| double burst() const noexcept { return burstSize_; } |
| |
| private: |
| Impl tokenBucket_; |
| double rate_; |
| double burstSize_; |
| }; |
| |
| using TokenBucket = BasicTokenBucket<>; |
| using DynamicTokenBucket = BasicDynamicTokenBucket<>; |
| |
| } // namespace folly |