| // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| |
| #include "db/write_controller.h" |
| |
| #include <atomic> |
| #include <cassert> |
| #include <ratio> |
| #include "rocksdb/env.h" |
| |
| namespace rocksdb { |
| |
| std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() { |
| ++total_stopped_; |
| return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this)); |
| } |
| |
| std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken( |
| uint64_t write_rate) { |
| total_delayed_++; |
| // Reset counters. |
| last_refill_time_ = 0; |
| bytes_left_ = 0; |
| set_delayed_write_rate(write_rate); |
| return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this)); |
| } |
| |
| std::unique_ptr<WriteControllerToken> |
| WriteController::GetCompactionPressureToken() { |
| ++total_compaction_pressure_; |
| return std::unique_ptr<WriteControllerToken>( |
| new CompactionPressureToken(this)); |
| } |
| |
| bool WriteController::IsStopped() const { |
| return total_stopped_.load(std::memory_order_relaxed) > 0; |
| } |
| // This is inside DB mutex, so we can't sleep and need to minimize |
| // frequency to get time. |
| // If it turns out to be a performance issue, we can redesign the thread |
| // synchronization model here. |
| // The function trust caller will sleep micros returned. |
| uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) { |
| if (total_stopped_.load(std::memory_order_relaxed) > 0) { |
| return 0; |
| } |
| if (total_delayed_.load(std::memory_order_relaxed) == 0) { |
| return 0; |
| } |
| |
| const uint64_t kMicrosPerSecond = 1000000; |
| const uint64_t kRefillInterval = 1024U; |
| |
| if (bytes_left_ >= num_bytes) { |
| bytes_left_ -= num_bytes; |
| return 0; |
| } |
| // The frequency to get time inside DB mutex is less than one per refill |
| // interval. |
| auto time_now = NowMicrosMonotonic(env); |
| |
| uint64_t sleep_debt = 0; |
| uint64_t time_since_last_refill = 0; |
| if (last_refill_time_ != 0) { |
| if (last_refill_time_ > time_now) { |
| sleep_debt = last_refill_time_ - time_now; |
| } else { |
| time_since_last_refill = time_now - last_refill_time_; |
| bytes_left_ += |
| static_cast<uint64_t>(static_cast<double>(time_since_last_refill) / |
| kMicrosPerSecond * delayed_write_rate_); |
| if (time_since_last_refill >= kRefillInterval && |
| bytes_left_ > num_bytes) { |
| // If refill interval already passed and we have enough bytes |
| // return without extra sleeping. |
| last_refill_time_ = time_now; |
| bytes_left_ -= num_bytes; |
| return 0; |
| } |
| } |
| } |
| |
| uint64_t single_refill_amount = |
| delayed_write_rate_ * kRefillInterval / kMicrosPerSecond; |
| if (bytes_left_ + single_refill_amount >= num_bytes) { |
| // Wait until a refill interval |
| // Never trigger expire for less than one refill interval to avoid to get |
| // time. |
| bytes_left_ = bytes_left_ + single_refill_amount - num_bytes; |
| last_refill_time_ = time_now + kRefillInterval; |
| return kRefillInterval + sleep_debt; |
| } |
| |
| // Need to refill more than one interval. Need to sleep longer. Check |
| // whether expiration will hit |
| |
| // Sleep just until `num_bytes` is allowed. |
| uint64_t sleep_amount = |
| static_cast<uint64_t>(num_bytes / |
| static_cast<long double>(delayed_write_rate_) * |
| kMicrosPerSecond) + |
| sleep_debt; |
| last_refill_time_ = time_now + sleep_amount; |
| return sleep_amount; |
| } |
| |
| uint64_t WriteController::NowMicrosMonotonic(Env* env) { |
| return env->NowNanos() / std::milli::den; |
| } |
| |
| StopWriteToken::~StopWriteToken() { |
| assert(controller_->total_stopped_ >= 1); |
| --controller_->total_stopped_; |
| } |
| |
| DelayWriteToken::~DelayWriteToken() { |
| controller_->total_delayed_--; |
| assert(controller_->total_delayed_.load() >= 0); |
| } |
| |
| CompactionPressureToken::~CompactionPressureToken() { |
| controller_->total_compaction_pressure_--; |
| assert(controller_->total_compaction_pressure_ >= 0); |
| } |
| |
| } // namespace rocksdb |