| // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| // |
| // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. See the AUTHORS file for names of contributors. |
| |
| #include "util/rate_limiter.h" |
| #include "monitoring/statistics.h" |
| #include "port/port.h" |
| #include "rocksdb/env.h" |
| #include "util/aligned_buffer.h" |
| #include "util/sync_point.h" |
| |
| namespace rocksdb { |
| |
| size_t RateLimiter::RequestToken(size_t bytes, size_t alignment, |
| Env::IOPriority io_priority, Statistics* stats, |
| RateLimiter::OpType op_type) { |
| if (io_priority < Env::IO_TOTAL && IsRateLimited(op_type)) { |
| bytes = std::min(bytes, static_cast<size_t>(GetSingleBurstBytes())); |
| |
| if (alignment > 0) { |
| // Here we may actually require more than burst and block |
| // but we can not write less than one page at a time on direct I/O |
| // thus we may want not to use ratelimiter |
| bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes)); |
| } |
| Request(bytes, io_priority, stats, op_type); |
| } |
| return bytes; |
| } |
| |
| // Pending request |
| struct GenericRateLimiter::Req { |
| explicit Req(int64_t _bytes, port::Mutex* _mu) |
| : request_bytes(_bytes), bytes(_bytes), cv(_mu), granted(false) {} |
| int64_t request_bytes; |
| int64_t bytes; |
| port::CondVar cv; |
| bool granted; |
| }; |
| |
| GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec, |
| int64_t refill_period_us, |
| int32_t fairness, RateLimiter::Mode mode) |
| : RateLimiter(mode), |
| refill_period_us_(refill_period_us), |
| rate_bytes_per_sec_(rate_bytes_per_sec), |
| refill_bytes_per_period_( |
| CalculateRefillBytesPerPeriod(rate_bytes_per_sec)), |
| env_(Env::Default()), |
| stop_(false), |
| exit_cv_(&request_mutex_), |
| requests_to_wait_(0), |
| available_bytes_(0), |
| next_refill_us_(NowMicrosMonotonic(env_)), |
| fairness_(fairness > 100 ? 100 : fairness), |
| rnd_((uint32_t)time(nullptr)), |
| leader_(nullptr) { |
| total_requests_[0] = 0; |
| total_requests_[1] = 0; |
| total_bytes_through_[0] = 0; |
| total_bytes_through_[1] = 0; |
| } |
| |
| GenericRateLimiter::~GenericRateLimiter() { |
| MutexLock g(&request_mutex_); |
| stop_ = true; |
| requests_to_wait_ = static_cast<int32_t>(queue_[Env::IO_LOW].size() + |
| queue_[Env::IO_HIGH].size()); |
| for (auto& r : queue_[Env::IO_HIGH]) { |
| r->cv.Signal(); |
| } |
| for (auto& r : queue_[Env::IO_LOW]) { |
| r->cv.Signal(); |
| } |
| while (requests_to_wait_ > 0) { |
| exit_cv_.Wait(); |
| } |
| } |
| |
| // This API allows user to dynamically change rate limiter's bytes per second. |
| void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) { |
| assert(bytes_per_second > 0); |
| rate_bytes_per_sec_ = bytes_per_second; |
| refill_bytes_per_period_.store( |
| CalculateRefillBytesPerPeriod(bytes_per_second), |
| std::memory_order_relaxed); |
| } |
| |
| void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, |
| Statistics* stats) { |
| assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed)); |
| TEST_SYNC_POINT("GenericRateLimiter::Request"); |
| TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:1", |
| &rate_bytes_per_sec_); |
| MutexLock g(&request_mutex_); |
| if (stop_) { |
| return; |
| } |
| |
| ++total_requests_[pri]; |
| |
| if (available_bytes_ >= bytes) { |
| // Refill thread assigns quota and notifies requests waiting on |
| // the queue under mutex. So if we get here, that means nobody |
| // is waiting? |
| available_bytes_ -= bytes; |
| total_bytes_through_[pri] += bytes; |
| return; |
| } |
| |
| // Request cannot be satisfied at this moment, enqueue |
| Req r(bytes, &request_mutex_); |
| queue_[pri].push_back(&r); |
| |
| do { |
| bool timedout = false; |
| // Leader election, candidates can be: |
| // (1) a new incoming request, |
| // (2) a previous leader, whose quota has not been not assigned yet due |
| // to lower priority |
| // (3) a previous waiter at the front of queue, who got notified by |
| // previous leader |
| if (leader_ == nullptr && |
| ((!queue_[Env::IO_HIGH].empty() && |
| &r == queue_[Env::IO_HIGH].front()) || |
| (!queue_[Env::IO_LOW].empty() && |
| &r == queue_[Env::IO_LOW].front()))) { |
| leader_ = &r; |
| int64_t delta = next_refill_us_ - NowMicrosMonotonic(env_); |
| delta = delta > 0 ? delta : 0; |
| if (delta == 0) { |
| timedout = true; |
| } else { |
| int64_t wait_until = env_->NowMicros() + delta; |
| RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS); |
| timedout = r.cv.TimedWait(wait_until); |
| } |
| } else { |
| // Not at the front of queue or an leader has already been elected |
| r.cv.Wait(); |
| } |
| |
| // request_mutex_ is held from now on |
| if (stop_) { |
| --requests_to_wait_; |
| exit_cv_.Signal(); |
| return; |
| } |
| |
| // Make sure the waken up request is always the header of its queue |
| assert(r.granted || |
| (!queue_[Env::IO_HIGH].empty() && |
| &r == queue_[Env::IO_HIGH].front()) || |
| (!queue_[Env::IO_LOW].empty() && |
| &r == queue_[Env::IO_LOW].front())); |
| assert(leader_ == nullptr || |
| (!queue_[Env::IO_HIGH].empty() && |
| leader_ == queue_[Env::IO_HIGH].front()) || |
| (!queue_[Env::IO_LOW].empty() && |
| leader_ == queue_[Env::IO_LOW].front())); |
| |
| if (leader_ == &r) { |
| // Waken up from TimedWait() |
| if (timedout) { |
| // Time to do refill! |
| Refill(); |
| |
| // Re-elect a new leader regardless. This is to simplify the |
| // election handling. |
| leader_ = nullptr; |
| |
| // Notify the header of queue if current leader is going away |
| if (r.granted) { |
| // Current leader already got granted with quota. Notify header |
| // of waiting queue to participate next round of election. |
| assert((queue_[Env::IO_HIGH].empty() || |
| &r != queue_[Env::IO_HIGH].front()) && |
| (queue_[Env::IO_LOW].empty() || |
| &r != queue_[Env::IO_LOW].front())); |
| if (!queue_[Env::IO_HIGH].empty()) { |
| queue_[Env::IO_HIGH].front()->cv.Signal(); |
| } else if (!queue_[Env::IO_LOW].empty()) { |
| queue_[Env::IO_LOW].front()->cv.Signal(); |
| } |
| // Done |
| break; |
| } |
| } else { |
| // Spontaneous wake up, need to continue to wait |
| assert(!r.granted); |
| leader_ = nullptr; |
| } |
| } else { |
| // Waken up by previous leader: |
| // (1) if requested quota is granted, it is done. |
| // (2) if requested quota is not granted, this means current thread |
| // was picked as a new leader candidate (previous leader got quota). |
| // It needs to participate leader election because a new request may |
| // come in before this thread gets waken up. So it may actually need |
| // to do Wait() again. |
| assert(!timedout); |
| } |
| } while (!r.granted); |
| } |
| |
| void GenericRateLimiter::Refill() { |
| TEST_SYNC_POINT("GenericRateLimiter::Refill"); |
| next_refill_us_ = NowMicrosMonotonic(env_) + refill_period_us_; |
| // Carry over the left over quota from the last period |
| auto refill_bytes_per_period = |
| refill_bytes_per_period_.load(std::memory_order_relaxed); |
| if (available_bytes_ < refill_bytes_per_period) { |
| available_bytes_ += refill_bytes_per_period; |
| } |
| |
| int use_low_pri_first = rnd_.OneIn(fairness_) ? 0 : 1; |
| for (int q = 0; q < 2; ++q) { |
| auto use_pri = (use_low_pri_first == q) ? Env::IO_LOW : Env::IO_HIGH; |
| auto* queue = &queue_[use_pri]; |
| while (!queue->empty()) { |
| auto* next_req = queue->front(); |
| if (available_bytes_ < next_req->request_bytes) { |
| // avoid starvation |
| next_req->request_bytes -= available_bytes_; |
| available_bytes_ = 0; |
| break; |
| } |
| available_bytes_ -= next_req->request_bytes; |
| next_req->request_bytes = 0; |
| total_bytes_through_[use_pri] += next_req->bytes; |
| queue->pop_front(); |
| |
| next_req->granted = true; |
| if (next_req != leader_) { |
| // Quota granted, signal the thread |
| next_req->cv.Signal(); |
| } |
| } |
| } |
| } |
| |
| int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod( |
| int64_t rate_bytes_per_sec) { |
| if (port::kMaxInt64 / rate_bytes_per_sec < refill_period_us_) { |
| // Avoid unexpected result in the overflow case. The result now is still |
| // inaccurate but is a number that is large enough. |
| return port::kMaxInt64 / 1000000; |
| } else { |
| return std::max(kMinRefillBytesPerPeriod, |
| rate_bytes_per_sec * refill_period_us_ / 1000000); |
| } |
| } |
| |
| RateLimiter* NewGenericRateLimiter( |
| int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */, |
| int32_t fairness /* = 10 */, |
| RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */) { |
| assert(rate_bytes_per_sec > 0); |
| assert(refill_period_us > 0); |
| assert(fairness > 0); |
| return new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness, |
| mode); |
| } |
| |
| } // namespace rocksdb |