| // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| // |
| // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. See the AUTHORS file for names of contributors. |
| |
| #include "db/column_family.h" |
| |
| #ifndef __STDC_FORMAT_MACROS |
| #define __STDC_FORMAT_MACROS |
| #endif |
| |
| #include <inttypes.h> |
| #include <vector> |
| #include <string> |
| #include <algorithm> |
| #include <limits> |
| |
| #include "db/compaction_picker.h" |
| #include "db/compaction_picker_universal.h" |
| #include "db/db_impl.h" |
| #include "db/internal_stats.h" |
| #include "db/job_context.h" |
| #include "db/table_properties_collector.h" |
| #include "db/version_set.h" |
| #include "db/write_controller.h" |
| #include "memtable/hash_skiplist_rep.h" |
| #include "monitoring/thread_status_util.h" |
| #include "options/options_helper.h" |
| #include "table/block_based_table_factory.h" |
| #include "util/autovector.h" |
| #include "util/compression.h" |
| |
| namespace rocksdb { |
| |
| ColumnFamilyHandleImpl::ColumnFamilyHandleImpl( |
| ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex) |
| : cfd_(column_family_data), db_(db), mutex_(mutex) { |
| if (cfd_ != nullptr) { |
| cfd_->Ref(); |
| } |
| } |
| |
| ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() { |
| if (cfd_ != nullptr) { |
| #ifndef ROCKSDB_LITE |
| for (auto& listener : cfd_->ioptions()->listeners) { |
| listener->OnColumnFamilyHandleDeletionStarted(this); |
| } |
| #endif // ROCKSDB_LITE |
| // Job id == 0 means that this is not our background process, but rather |
| // user thread |
| JobContext job_context(0); |
| mutex_->Lock(); |
| if (cfd_->Unref()) { |
| delete cfd_; |
| } |
| db_->FindObsoleteFiles(&job_context, false, true); |
| mutex_->Unlock(); |
| if (job_context.HaveSomethingToDelete()) { |
| db_->PurgeObsoleteFiles(job_context); |
| } |
| job_context.Clean(); |
| } |
| } |
| |
| uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); } |
| |
| const std::string& ColumnFamilyHandleImpl::GetName() const { |
| return cfd()->GetName(); |
| } |
| |
| Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) { |
| #ifndef ROCKSDB_LITE |
| // accessing mutable cf-options requires db mutex. |
| InstrumentedMutexLock l(mutex_); |
| *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions()); |
| return Status::OK(); |
| #else |
| return Status::NotSupported(); |
| #endif // !ROCKSDB_LITE |
| } |
| |
| const Comparator* ColumnFamilyHandleImpl::GetComparator() const { |
| return cfd()->user_comparator(); |
| } |
| |
| void GetIntTblPropCollectorFactory( |
| const ImmutableCFOptions& ioptions, |
| std::vector<std::unique_ptr<IntTblPropCollectorFactory>>* |
| int_tbl_prop_collector_factories) { |
| auto& collector_factories = ioptions.table_properties_collector_factories; |
| for (size_t i = 0; i < ioptions.table_properties_collector_factories.size(); |
| ++i) { |
| assert(collector_factories[i]); |
| int_tbl_prop_collector_factories->emplace_back( |
| new UserKeyTablePropertiesCollectorFactory(collector_factories[i])); |
| } |
| // Add collector to collect internal key statistics |
| int_tbl_prop_collector_factories->emplace_back( |
| new InternalKeyPropertiesCollectorFactory); |
| } |
| |
| Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) { |
| if (!cf_options.compression_per_level.empty()) { |
| for (size_t level = 0; level < cf_options.compression_per_level.size(); |
| ++level) { |
| if (!CompressionTypeSupported(cf_options.compression_per_level[level])) { |
| return Status::InvalidArgument( |
| "Compression type " + |
| CompressionTypeToString(cf_options.compression_per_level[level]) + |
| " is not linked with the binary."); |
| } |
| } |
| } else { |
| if (!CompressionTypeSupported(cf_options.compression)) { |
| return Status::InvalidArgument( |
| "Compression type " + |
| CompressionTypeToString(cf_options.compression) + |
| " is not linked with the binary."); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) { |
| if (cf_options.inplace_update_support) { |
| return Status::InvalidArgument( |
| "In-place memtable updates (inplace_update_support) is not compatible " |
| "with concurrent writes (allow_concurrent_memtable_write)"); |
| } |
| if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) { |
| return Status::InvalidArgument( |
| "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)"); |
| } |
| return Status::OK(); |
| } |
| |
| ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options, |
| const ColumnFamilyOptions& src) { |
| ColumnFamilyOptions result = src; |
| size_t clamp_max = std::conditional< |
| sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>, |
| std::integral_constant<uint64_t, 64ull << 30>>::type::value; |
| ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max); |
| // if user sets arena_block_size, we trust user to use this value. Otherwise, |
| // calculate a proper value from writer_buffer_size; |
| if (result.arena_block_size <= 0) { |
| result.arena_block_size = result.write_buffer_size / 8; |
| |
| // Align up to 4k |
| const size_t align = 4 * 1024; |
| result.arena_block_size = |
| ((result.arena_block_size + align - 1) / align) * align; |
| } |
| result.min_write_buffer_number_to_merge = |
| std::min(result.min_write_buffer_number_to_merge, |
| result.max_write_buffer_number - 1); |
| if (result.min_write_buffer_number_to_merge < 1) { |
| result.min_write_buffer_number_to_merge = 1; |
| } |
| |
| if (result.num_levels < 1) { |
| result.num_levels = 1; |
| } |
| if (result.compaction_style == kCompactionStyleLevel && |
| result.num_levels < 2) { |
| result.num_levels = 2; |
| } |
| |
| if (result.compaction_style == kCompactionStyleUniversal && |
| db_options.allow_ingest_behind && result.num_levels < 3) { |
| result.num_levels = 3; |
| } |
| |
| if (result.max_write_buffer_number < 2) { |
| result.max_write_buffer_number = 2; |
| } |
| if (result.max_write_buffer_number_to_maintain < 0) { |
| result.max_write_buffer_number_to_maintain = result.max_write_buffer_number; |
| } |
| // bloom filter size shouldn't exceed 1/4 of memtable size. |
| if (result.memtable_prefix_bloom_size_ratio > 0.25) { |
| result.memtable_prefix_bloom_size_ratio = 0.25; |
| } else if (result.memtable_prefix_bloom_size_ratio < 0) { |
| result.memtable_prefix_bloom_size_ratio = 0; |
| } |
| |
| if (!result.prefix_extractor) { |
| assert(result.memtable_factory); |
| Slice name = result.memtable_factory->Name(); |
| if (name.compare("HashSkipListRepFactory") == 0 || |
| name.compare("HashLinkListRepFactory") == 0) { |
| result.memtable_factory = std::make_shared<SkipListFactory>(); |
| } |
| } |
| |
| if (result.compaction_style == kCompactionStyleFIFO) { |
| result.num_levels = 1; |
| // since we delete level0 files in FIFO compaction when there are too many |
| // of them, these options don't really mean anything |
| result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max(); |
| result.level0_stop_writes_trigger = std::numeric_limits<int>::max(); |
| } |
| |
| if (result.max_bytes_for_level_multiplier <= 0) { |
| result.max_bytes_for_level_multiplier = 1; |
| } |
| |
| if (result.level0_file_num_compaction_trigger == 0) { |
| ROCKS_LOG_WARN(db_options.info_log.get(), |
| "level0_file_num_compaction_trigger cannot be 0"); |
| result.level0_file_num_compaction_trigger = 1; |
| } |
| |
| if (result.level0_stop_writes_trigger < |
| result.level0_slowdown_writes_trigger || |
| result.level0_slowdown_writes_trigger < |
| result.level0_file_num_compaction_trigger) { |
| ROCKS_LOG_WARN(db_options.info_log.get(), |
| "This condition must be satisfied: " |
| "level0_stop_writes_trigger(%d) >= " |
| "level0_slowdown_writes_trigger(%d) >= " |
| "level0_file_num_compaction_trigger(%d)", |
| result.level0_stop_writes_trigger, |
| result.level0_slowdown_writes_trigger, |
| result.level0_file_num_compaction_trigger); |
| if (result.level0_slowdown_writes_trigger < |
| result.level0_file_num_compaction_trigger) { |
| result.level0_slowdown_writes_trigger = |
| result.level0_file_num_compaction_trigger; |
| } |
| if (result.level0_stop_writes_trigger < |
| result.level0_slowdown_writes_trigger) { |
| result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger; |
| } |
| ROCKS_LOG_WARN(db_options.info_log.get(), |
| "Adjust the value to " |
| "level0_stop_writes_trigger(%d)" |
| "level0_slowdown_writes_trigger(%d)" |
| "level0_file_num_compaction_trigger(%d)", |
| result.level0_stop_writes_trigger, |
| result.level0_slowdown_writes_trigger, |
| result.level0_file_num_compaction_trigger); |
| } |
| |
| if (result.soft_pending_compaction_bytes_limit == 0) { |
| result.soft_pending_compaction_bytes_limit = |
| result.hard_pending_compaction_bytes_limit; |
| } else if (result.hard_pending_compaction_bytes_limit > 0 && |
| result.soft_pending_compaction_bytes_limit > |
| result.hard_pending_compaction_bytes_limit) { |
| result.soft_pending_compaction_bytes_limit = |
| result.hard_pending_compaction_bytes_limit; |
| } |
| |
| if (result.level_compaction_dynamic_level_bytes) { |
| if (result.compaction_style != kCompactionStyleLevel || |
| db_options.db_paths.size() > 1U) { |
| // 1. level_compaction_dynamic_level_bytes only makes sense for |
| // level-based compaction. |
| // 2. we don't yet know how to make both of this feature and multiple |
| // DB path work. |
| result.level_compaction_dynamic_level_bytes = false; |
| } |
| } |
| |
| if (result.max_compaction_bytes == 0) { |
| result.max_compaction_bytes = result.target_file_size_base * 25; |
| } |
| |
| return result; |
| } |
| |
| int SuperVersion::dummy = 0; |
| void* const SuperVersion::kSVInUse = &SuperVersion::dummy; |
| void* const SuperVersion::kSVObsolete = nullptr; |
| |
| SuperVersion::~SuperVersion() { |
| for (auto td : to_delete) { |
| delete td; |
| } |
| } |
| |
| SuperVersion* SuperVersion::Ref() { |
| refs.fetch_add(1, std::memory_order_relaxed); |
| return this; |
| } |
| |
| bool SuperVersion::Unref() { |
| // fetch_sub returns the previous value of ref |
| uint32_t previous_refs = refs.fetch_sub(1); |
| assert(previous_refs > 0); |
| return previous_refs == 1; |
| } |
| |
| void SuperVersion::Cleanup() { |
| assert(refs.load(std::memory_order_relaxed) == 0); |
| imm->Unref(&to_delete); |
| MemTable* m = mem->Unref(); |
| if (m != nullptr) { |
| auto* memory_usage = current->cfd()->imm()->current_memory_usage(); |
| assert(*memory_usage >= m->ApproximateMemoryUsage()); |
| *memory_usage -= m->ApproximateMemoryUsage(); |
| to_delete.push_back(m); |
| } |
| current->Unref(); |
| } |
| |
| void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm, |
| Version* new_current) { |
| mem = new_mem; |
| imm = new_imm; |
| current = new_current; |
| mem->Ref(); |
| imm->Ref(); |
| current->Ref(); |
| refs.store(1, std::memory_order_relaxed); |
| } |
| |
| namespace { |
| void SuperVersionUnrefHandle(void* ptr) { |
| // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets |
| // destroyed. When former happens, the thread shouldn't see kSVInUse. |
| // When latter happens, we are in ~ColumnFamilyData(), no get should happen as |
| // well. |
| SuperVersion* sv = static_cast<SuperVersion*>(ptr); |
| if (sv->Unref()) { |
| sv->db_mutex->Lock(); |
| sv->Cleanup(); |
| sv->db_mutex->Unlock(); |
| delete sv; |
| } |
| } |
| } // anonymous namespace |
| |
| ColumnFamilyData::ColumnFamilyData( |
| uint32_t id, const std::string& name, Version* _dummy_versions, |
| Cache* _table_cache, WriteBufferManager* write_buffer_manager, |
| const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options, |
| const EnvOptions& env_options, ColumnFamilySet* column_family_set) |
| : id_(id), |
| name_(name), |
| dummy_versions_(_dummy_versions), |
| current_(nullptr), |
| refs_(0), |
| initialized_(false), |
| dropped_(false), |
| internal_comparator_(cf_options.comparator), |
| initial_cf_options_(SanitizeOptions(db_options, cf_options)), |
| ioptions_(db_options, initial_cf_options_), |
| mutable_cf_options_(initial_cf_options_), |
| is_delete_range_supported_( |
| cf_options.table_factory->IsDeleteRangeSupported()), |
| write_buffer_manager_(write_buffer_manager), |
| mem_(nullptr), |
| imm_(ioptions_.min_write_buffer_number_to_merge, |
| ioptions_.max_write_buffer_number_to_maintain), |
| super_version_(nullptr), |
| super_version_number_(0), |
| local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)), |
| next_(nullptr), |
| prev_(nullptr), |
| log_number_(0), |
| column_family_set_(column_family_set), |
| pending_flush_(false), |
| pending_compaction_(false), |
| prev_compaction_needed_bytes_(0), |
| allow_2pc_(db_options.allow_2pc) { |
| Ref(); |
| |
| // Convert user defined table properties collector factories to internal ones. |
| GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_); |
| |
| // if _dummy_versions is nullptr, then this is a dummy column family. |
| if (_dummy_versions != nullptr) { |
| internal_stats_.reset( |
| new InternalStats(ioptions_.num_levels, db_options.env, this)); |
| table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache)); |
| if (ioptions_.compaction_style == kCompactionStyleLevel) { |
| compaction_picker_.reset( |
| new LevelCompactionPicker(ioptions_, &internal_comparator_)); |
| #ifndef ROCKSDB_LITE |
| } else if (ioptions_.compaction_style == kCompactionStyleUniversal) { |
| compaction_picker_.reset( |
| new UniversalCompactionPicker(ioptions_, &internal_comparator_)); |
| } else if (ioptions_.compaction_style == kCompactionStyleFIFO) { |
| compaction_picker_.reset( |
| new FIFOCompactionPicker(ioptions_, &internal_comparator_)); |
| } else if (ioptions_.compaction_style == kCompactionStyleNone) { |
| compaction_picker_.reset(new NullCompactionPicker( |
| ioptions_, &internal_comparator_)); |
| ROCKS_LOG_WARN(ioptions_.info_log, |
| "Column family %s does not use any background compaction. " |
| "Compactions can only be done via CompactFiles\n", |
| GetName().c_str()); |
| #endif // !ROCKSDB_LITE |
| } else { |
| ROCKS_LOG_ERROR(ioptions_.info_log, |
| "Unable to recognize the specified compaction style %d. " |
| "Column family %s will use kCompactionStyleLevel.\n", |
| ioptions_.compaction_style, GetName().c_str()); |
| compaction_picker_.reset( |
| new LevelCompactionPicker(ioptions_, &internal_comparator_)); |
| } |
| |
| if (column_family_set_->NumberOfColumnFamilies() < 10) { |
| ROCKS_LOG_INFO(ioptions_.info_log, |
| "--------------- Options for column family [%s]:\n", |
| name.c_str()); |
| initial_cf_options_.Dump(ioptions_.info_log); |
| } else { |
| ROCKS_LOG_INFO(ioptions_.info_log, "\t(skipping printing options)\n"); |
| } |
| } |
| |
| RecalculateWriteStallConditions(mutable_cf_options_); |
| } |
| |
| // DB mutex held |
| ColumnFamilyData::~ColumnFamilyData() { |
| assert(refs_.load(std::memory_order_relaxed) == 0); |
| // remove from linked list |
| auto prev = prev_; |
| auto next = next_; |
| prev->next_ = next; |
| next->prev_ = prev; |
| |
| if (!dropped_ && column_family_set_ != nullptr) { |
| // If it's dropped, it's already removed from column family set |
| // If column_family_set_ == nullptr, this is dummy CFD and not in |
| // ColumnFamilySet |
| column_family_set_->RemoveColumnFamily(this); |
| } |
| |
| if (current_ != nullptr) { |
| current_->Unref(); |
| } |
| |
| // It would be wrong if this ColumnFamilyData is in flush_queue_ or |
| // compaction_queue_ and we destroyed it |
| assert(!pending_flush_); |
| assert(!pending_compaction_); |
| |
| if (super_version_ != nullptr) { |
| // Release SuperVersion reference kept in ThreadLocalPtr. |
| // This must be done outside of mutex_ since unref handler can lock mutex. |
| super_version_->db_mutex->Unlock(); |
| local_sv_.reset(); |
| super_version_->db_mutex->Lock(); |
| |
| bool is_last_reference __attribute__((unused)); |
| is_last_reference = super_version_->Unref(); |
| assert(is_last_reference); |
| super_version_->Cleanup(); |
| delete super_version_; |
| super_version_ = nullptr; |
| } |
| |
| if (dummy_versions_ != nullptr) { |
| // List must be empty |
| assert(dummy_versions_->TEST_Next() == dummy_versions_); |
| bool deleted __attribute__((unused)) = dummy_versions_->Unref(); |
| assert(deleted); |
| } |
| |
| if (mem_ != nullptr) { |
| delete mem_->Unref(); |
| } |
| autovector<MemTable*> to_delete; |
| imm_.current()->Unref(&to_delete); |
| for (MemTable* m : to_delete) { |
| delete m; |
| } |
| } |
| |
| void ColumnFamilyData::SetDropped() { |
| // can't drop default CF |
| assert(id_ != 0); |
| dropped_ = true; |
| write_controller_token_.reset(); |
| |
| // remove from column_family_set |
| column_family_set_->RemoveColumnFamily(this); |
| } |
| |
| ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const { |
| return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_); |
| } |
| |
| uint64_t ColumnFamilyData::OldestLogToKeep() { |
| auto current_log = GetLogNumber(); |
| |
| if (allow_2pc_) { |
| auto imm_prep_log = imm()->GetMinLogContainingPrepSection(); |
| auto mem_prep_log = mem()->GetMinLogContainingPrepSection(); |
| |
| if (imm_prep_log > 0 && imm_prep_log < current_log) { |
| current_log = imm_prep_log; |
| } |
| |
| if (mem_prep_log > 0 && mem_prep_log < current_log) { |
| current_log = mem_prep_log; |
| } |
| } |
| |
| return current_log; |
| } |
| |
| const double kIncSlowdownRatio = 0.8; |
| const double kDecSlowdownRatio = 1 / kIncSlowdownRatio; |
| const double kNearStopSlowdownRatio = 0.6; |
| const double kDelayRecoverSlowdownRatio = 1.4; |
| |
| namespace { |
| // If penalize_stop is true, we further reduce slowdown rate. |
| std::unique_ptr<WriteControllerToken> SetupDelay( |
| WriteController* write_controller, uint64_t compaction_needed_bytes, |
| uint64_t prev_compaction_need_bytes, bool penalize_stop, |
| bool auto_comapctions_disabled) { |
| const uint64_t kMinWriteRate = 16 * 1024u; // Minimum write rate 16KB/s. |
| |
| uint64_t max_write_rate = write_controller->max_delayed_write_rate(); |
| uint64_t write_rate = write_controller->delayed_write_rate(); |
| |
| if (auto_comapctions_disabled) { |
| // When auto compaction is disabled, always use the value user gave. |
| write_rate = max_write_rate; |
| } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) { |
| // If user gives rate less than kMinWriteRate, don't adjust it. |
| // |
| // If already delayed, need to adjust based on previous compaction debt. |
| // When there are two or more column families require delay, we always |
| // increase or reduce write rate based on information for one single |
| // column family. It is likely to be OK but we can improve if there is a |
| // problem. |
| // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes |
| // is only available in level-based compaction |
| // |
| // If the compaction debt stays the same as previously, we also further slow |
| // down. It usually means a mem table is full. It's mainly for the case |
| // where both of flush and compaction are much slower than the speed we |
| // insert to mem tables, so we need to actively slow down before we get |
| // feedback signal from compaction and flushes to avoid the full stop |
| // because of hitting the max write buffer number. |
| // |
| // If DB just falled into the stop condition, we need to further reduce |
| // the write rate to avoid the stop condition. |
| if (penalize_stop) { |
| // Penalize the near stop or stop condition by more aggressive slowdown. |
| // This is to provide the long term slowdown increase signal. |
| // The penalty is more than the reward of recovering to the normal |
| // condition. |
| write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) * |
| kNearStopSlowdownRatio); |
| if (write_rate < kMinWriteRate) { |
| write_rate = kMinWriteRate; |
| } |
| } else if (prev_compaction_need_bytes > 0 && |
| prev_compaction_need_bytes <= compaction_needed_bytes) { |
| write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) * |
| kIncSlowdownRatio); |
| if (write_rate < kMinWriteRate) { |
| write_rate = kMinWriteRate; |
| } |
| } else if (prev_compaction_need_bytes > compaction_needed_bytes) { |
| // We are speeding up by ratio of kSlowdownRatio when we have paid |
| // compaction debt. But we'll never speed up to faster than the write rate |
| // given by users. |
| write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) * |
| kDecSlowdownRatio); |
| if (write_rate > max_write_rate) { |
| write_rate = max_write_rate; |
| } |
| } |
| } |
| return write_controller->GetDelayToken(write_rate); |
| } |
| |
| int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger, |
| int level0_slowdown_writes_trigger) { |
| // SanitizeOptions() ensures it. |
| assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger); |
| |
| if (level0_file_num_compaction_trigger < 0) { |
| return std::numeric_limits<int>::max(); |
| } |
| |
| const int64_t twice_level0_trigger = |
| static_cast<int64_t>(level0_file_num_compaction_trigger) * 2; |
| |
| const int64_t one_fourth_trigger_slowdown = |
| static_cast<int64_t>(level0_file_num_compaction_trigger) + |
| ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) / |
| 4); |
| |
| assert(twice_level0_trigger >= 0); |
| assert(one_fourth_trigger_slowdown >= 0); |
| |
| // 1/4 of the way between L0 compaction trigger threshold and slowdown |
| // condition. |
| // Or twice as compaction trigger, if it is smaller. |
| int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown); |
| if (res >= port::kMaxInt32) { |
| return port::kMaxInt32; |
| } else { |
| // res fits in int |
| return static_cast<int>(res); |
| } |
| } |
| } // namespace |
| |
| void ColumnFamilyData::RecalculateWriteStallConditions( |
| const MutableCFOptions& mutable_cf_options) { |
| if (current_ != nullptr) { |
| auto* vstorage = current_->storage_info(); |
| auto write_controller = column_family_set_->write_controller_; |
| uint64_t compaction_needed_bytes = |
| vstorage->estimated_compaction_needed_bytes(); |
| |
| bool was_stopped = write_controller->IsStopped(); |
| bool needed_delay = write_controller->NeedsDelay(); |
| |
| if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) { |
| write_controller_token_ = write_controller->GetStopToken(); |
| internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1); |
| ROCKS_LOG_WARN( |
| ioptions_.info_log, |
| "[%s] Stopping writes because we have %d immutable memtables " |
| "(waiting for flush), max_write_buffer_number is set to %d", |
| name_.c_str(), imm()->NumNotFlushed(), |
| mutable_cf_options.max_write_buffer_number); |
| } else if (!mutable_cf_options.disable_auto_compactions && |
| vstorage->l0_delay_trigger_count() >= |
| mutable_cf_options.level0_stop_writes_trigger) { |
| write_controller_token_ = write_controller->GetStopToken(); |
| internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES_TOTAL, 1); |
| if (compaction_picker_->IsLevel0CompactionInProgress()) { |
| internal_stats_->AddCFStats( |
| InternalStats::LEVEL0_NUM_FILES_WITH_COMPACTION, 1); |
| } |
| ROCKS_LOG_WARN(ioptions_.info_log, |
| "[%s] Stopping writes because we have %d level-0 files", |
| name_.c_str(), vstorage->l0_delay_trigger_count()); |
| } else if (!mutable_cf_options.disable_auto_compactions && |
| mutable_cf_options.hard_pending_compaction_bytes_limit > 0 && |
| compaction_needed_bytes >= |
| mutable_cf_options.hard_pending_compaction_bytes_limit) { |
| write_controller_token_ = write_controller->GetStopToken(); |
| internal_stats_->AddCFStats( |
| InternalStats::HARD_PENDING_COMPACTION_BYTES_LIMIT, 1); |
| ROCKS_LOG_WARN( |
| ioptions_.info_log, |
| "[%s] Stopping writes because of estimated pending compaction " |
| "bytes %" PRIu64, |
| name_.c_str(), compaction_needed_bytes); |
| } else if (mutable_cf_options.max_write_buffer_number > 3 && |
| imm()->NumNotFlushed() >= |
| mutable_cf_options.max_write_buffer_number - 1) { |
| write_controller_token_ = |
| SetupDelay(write_controller, compaction_needed_bytes, |
| prev_compaction_needed_bytes_, was_stopped, |
| mutable_cf_options.disable_auto_compactions); |
| internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1); |
| ROCKS_LOG_WARN( |
| ioptions_.info_log, |
| "[%s] Stalling writes because we have %d immutable memtables " |
| "(waiting for flush), max_write_buffer_number is set to %d " |
| "rate %" PRIu64, |
| name_.c_str(), imm()->NumNotFlushed(), |
| mutable_cf_options.max_write_buffer_number, |
| write_controller->delayed_write_rate()); |
| } else if (!mutable_cf_options.disable_auto_compactions && |
| mutable_cf_options.level0_slowdown_writes_trigger >= 0 && |
| vstorage->l0_delay_trigger_count() >= |
| mutable_cf_options.level0_slowdown_writes_trigger) { |
| // L0 is the last two files from stopping. |
| bool near_stop = vstorage->l0_delay_trigger_count() >= |
| mutable_cf_options.level0_stop_writes_trigger - 2; |
| write_controller_token_ = |
| SetupDelay(write_controller, compaction_needed_bytes, |
| prev_compaction_needed_bytes_, was_stopped || near_stop, |
| mutable_cf_options.disable_auto_compactions); |
| internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN_TOTAL, 1); |
| if (compaction_picker_->IsLevel0CompactionInProgress()) { |
| internal_stats_->AddCFStats( |
| InternalStats::LEVEL0_SLOWDOWN_WITH_COMPACTION, 1); |
| } |
| ROCKS_LOG_WARN(ioptions_.info_log, |
| "[%s] Stalling writes because we have %d level-0 files " |
| "rate %" PRIu64, |
| name_.c_str(), vstorage->l0_delay_trigger_count(), |
| write_controller->delayed_write_rate()); |
| } else if (!mutable_cf_options.disable_auto_compactions && |
| mutable_cf_options.soft_pending_compaction_bytes_limit > 0 && |
| vstorage->estimated_compaction_needed_bytes() >= |
| mutable_cf_options.soft_pending_compaction_bytes_limit) { |
| // If the distance to hard limit is less than 1/4 of the gap between soft |
| // and |
| // hard bytes limit, we think it is near stop and speed up the slowdown. |
| bool near_stop = |
| mutable_cf_options.hard_pending_compaction_bytes_limit > 0 && |
| (compaction_needed_bytes - |
| mutable_cf_options.soft_pending_compaction_bytes_limit) > |
| 3 * (mutable_cf_options.hard_pending_compaction_bytes_limit - |
| mutable_cf_options.soft_pending_compaction_bytes_limit) / |
| 4; |
| |
| write_controller_token_ = |
| SetupDelay(write_controller, compaction_needed_bytes, |
| prev_compaction_needed_bytes_, was_stopped || near_stop, |
| mutable_cf_options.disable_auto_compactions); |
| internal_stats_->AddCFStats( |
| InternalStats::SOFT_PENDING_COMPACTION_BYTES_LIMIT, 1); |
| ROCKS_LOG_WARN( |
| ioptions_.info_log, |
| "[%s] Stalling writes because of estimated pending compaction " |
| "bytes %" PRIu64 " rate %" PRIu64, |
| name_.c_str(), vstorage->estimated_compaction_needed_bytes(), |
| write_controller->delayed_write_rate()); |
| } else { |
| if (vstorage->l0_delay_trigger_count() >= |
| GetL0ThresholdSpeedupCompaction( |
| mutable_cf_options.level0_file_num_compaction_trigger, |
| mutable_cf_options.level0_slowdown_writes_trigger)) { |
| write_controller_token_ = |
| write_controller->GetCompactionPressureToken(); |
| ROCKS_LOG_INFO( |
| ioptions_.info_log, |
| "[%s] Increasing compaction threads because we have %d level-0 " |
| "files ", |
| name_.c_str(), vstorage->l0_delay_trigger_count()); |
| } else if (vstorage->estimated_compaction_needed_bytes() >= |
| mutable_cf_options.soft_pending_compaction_bytes_limit / 4) { |
| // Increase compaction threads if bytes needed for compaction exceeds |
| // 1/4 of threshold for slowing down. |
| // If soft pending compaction byte limit is not set, always speed up |
| // compaction. |
| write_controller_token_ = |
| write_controller->GetCompactionPressureToken(); |
| if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) { |
| ROCKS_LOG_INFO( |
| ioptions_.info_log, |
| "[%s] Increasing compaction threads because of estimated pending " |
| "compaction " |
| "bytes %" PRIu64, |
| name_.c_str(), vstorage->estimated_compaction_needed_bytes()); |
| } |
| } else { |
| write_controller_token_.reset(); |
| } |
| // If the DB recovers from delay conditions, we reward with reducing |
| // double the slowdown ratio. This is to balance the long term slowdown |
| // increase signal. |
| if (needed_delay) { |
| uint64_t write_rate = write_controller->delayed_write_rate(); |
| write_controller->set_delayed_write_rate(static_cast<uint64_t>( |
| static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio)); |
| // Set the low pri limit to be 1/4 the delayed write rate. |
| // Note we don't reset this value even after delay condition is relased. |
| // Low-pri rate will continue to apply if there is a compaction |
| // pressure. |
| write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate / |
| 4); |
| } |
| } |
| prev_compaction_needed_bytes_ = compaction_needed_bytes; |
| } |
| } |
| |
| const EnvOptions* ColumnFamilyData::soptions() const { |
| return &(column_family_set_->env_options_); |
| } |
| |
| void ColumnFamilyData::SetCurrent(Version* current_version) { |
| current_ = current_version; |
| } |
| |
| uint64_t ColumnFamilyData::GetNumLiveVersions() const { |
| return VersionSet::GetNumLiveVersions(dummy_versions_); |
| } |
| |
| uint64_t ColumnFamilyData::GetTotalSstFilesSize() const { |
| return VersionSet::GetTotalSstFilesSize(dummy_versions_); |
| } |
| |
| MemTable* ColumnFamilyData::ConstructNewMemtable( |
| const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) { |
| return new MemTable(internal_comparator_, ioptions_, mutable_cf_options, |
| write_buffer_manager_, earliest_seq, id_); |
| } |
| |
| void ColumnFamilyData::CreateNewMemtable( |
| const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) { |
| if (mem_ != nullptr) { |
| delete mem_->Unref(); |
| } |
| SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq)); |
| mem_->Ref(); |
| } |
| |
| bool ColumnFamilyData::NeedsCompaction() const { |
| return compaction_picker_->NeedsCompaction(current_->storage_info()); |
| } |
| |
| Compaction* ColumnFamilyData::PickCompaction( |
| const MutableCFOptions& mutable_options, LogBuffer* log_buffer) { |
| auto* result = compaction_picker_->PickCompaction( |
| GetName(), mutable_options, current_->storage_info(), log_buffer); |
| if (result != nullptr) { |
| result->SetInputVersion(current_); |
| } |
| return result; |
| } |
| |
| bool ColumnFamilyData::RangeOverlapWithCompaction( |
| const Slice& smallest_user_key, const Slice& largest_user_key, |
| int level) const { |
| return compaction_picker_->RangeOverlapWithCompaction( |
| smallest_user_key, largest_user_key, level); |
| } |
| |
| const int ColumnFamilyData::kCompactAllLevels = -1; |
| const int ColumnFamilyData::kCompactToBaseLevel = -2; |
| |
| Compaction* ColumnFamilyData::CompactRange( |
| const MutableCFOptions& mutable_cf_options, int input_level, |
| int output_level, uint32_t output_path_id, const InternalKey* begin, |
| const InternalKey* end, InternalKey** compaction_end, bool* conflict) { |
| auto* result = compaction_picker_->CompactRange( |
| GetName(), mutable_cf_options, current_->storage_info(), input_level, |
| output_level, output_path_id, begin, end, compaction_end, conflict); |
| if (result != nullptr) { |
| result->SetInputVersion(current_); |
| } |
| return result; |
| } |
| |
| SuperVersion* ColumnFamilyData::GetReferencedSuperVersion( |
| InstrumentedMutex* db_mutex) { |
| SuperVersion* sv = nullptr; |
| sv = GetThreadLocalSuperVersion(db_mutex); |
| sv->Ref(); |
| if (!ReturnThreadLocalSuperVersion(sv)) { |
| sv->Unref(); |
| } |
| return sv; |
| } |
| |
| SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion( |
| InstrumentedMutex* db_mutex) { |
| SuperVersion* sv = nullptr; |
| // The SuperVersion is cached in thread local storage to avoid acquiring |
| // mutex when SuperVersion does not change since the last use. When a new |
| // SuperVersion is installed, the compaction or flush thread cleans up |
| // cached SuperVersion in all existing thread local storage. To avoid |
| // acquiring mutex for this operation, we use atomic Swap() on the thread |
| // local pointer to guarantee exclusive access. If the thread local pointer |
| // is being used while a new SuperVersion is installed, the cached |
| // SuperVersion can become stale. In that case, the background thread would |
| // have swapped in kSVObsolete. We re-check the value at when returning |
| // SuperVersion back to thread local, with an atomic compare and swap. |
| // The superversion will need to be released if detected to be stale. |
| void* ptr = local_sv_->Swap(SuperVersion::kSVInUse); |
| // Invariant: |
| // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage |
| // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage |
| // should only keep kSVInUse before ReturnThreadLocalSuperVersion call |
| // (if no Scrape happens). |
| assert(ptr != SuperVersion::kSVInUse); |
| sv = static_cast<SuperVersion*>(ptr); |
| if (sv == SuperVersion::kSVObsolete || |
| sv->version_number != super_version_number_.load()) { |
| RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES); |
| SuperVersion* sv_to_delete = nullptr; |
| |
| if (sv && sv->Unref()) { |
| RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS); |
| db_mutex->Lock(); |
| // NOTE: underlying resources held by superversion (sst files) might |
| // not be released until the next background job. |
| sv->Cleanup(); |
| sv_to_delete = sv; |
| } else { |
| db_mutex->Lock(); |
| } |
| sv = super_version_->Ref(); |
| db_mutex->Unlock(); |
| |
| delete sv_to_delete; |
| } |
| assert(sv != nullptr); |
| return sv; |
| } |
| |
| bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) { |
| assert(sv != nullptr); |
| // Put the SuperVersion back |
| void* expected = SuperVersion::kSVInUse; |
| if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) { |
| // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal |
| // storage has not been altered and no Scrape has happened. The |
| // SuperVersion is still current. |
| return true; |
| } else { |
| // ThreadLocal scrape happened in the process of this GetImpl call (after |
| // thread local Swap() at the beginning and before CompareAndSwap()). |
| // This means the SuperVersion it holds is obsolete. |
| assert(expected == SuperVersion::kSVObsolete); |
| } |
| return false; |
| } |
| |
| SuperVersion* ColumnFamilyData::InstallSuperVersion( |
| SuperVersion* new_superversion, InstrumentedMutex* db_mutex) { |
| db_mutex->AssertHeld(); |
| return InstallSuperVersion(new_superversion, db_mutex, mutable_cf_options_); |
| } |
| |
| SuperVersion* ColumnFamilyData::InstallSuperVersion( |
| SuperVersion* new_superversion, InstrumentedMutex* db_mutex, |
| const MutableCFOptions& mutable_cf_options) { |
| new_superversion->db_mutex = db_mutex; |
| new_superversion->mutable_cf_options = mutable_cf_options; |
| new_superversion->Init(mem_, imm_.current(), current_); |
| SuperVersion* old_superversion = super_version_; |
| super_version_ = new_superversion; |
| ++super_version_number_; |
| super_version_->version_number = super_version_number_; |
| if (old_superversion != nullptr) { |
| if (old_superversion->mutable_cf_options.write_buffer_size != |
| mutable_cf_options.write_buffer_size) { |
| mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size); |
| } |
| } |
| |
| // Reset SuperVersions cached in thread local storage |
| ResetThreadLocalSuperVersions(); |
| |
| RecalculateWriteStallConditions(mutable_cf_options); |
| |
| if (old_superversion != nullptr && old_superversion->Unref()) { |
| old_superversion->Cleanup(); |
| return old_superversion; // will let caller delete outside of mutex |
| } |
| return nullptr; |
| } |
| |
| void ColumnFamilyData::ResetThreadLocalSuperVersions() { |
| autovector<void*> sv_ptrs; |
| local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete); |
| for (auto ptr : sv_ptrs) { |
| assert(ptr); |
| if (ptr == SuperVersion::kSVInUse) { |
| continue; |
| } |
| auto sv = static_cast<SuperVersion*>(ptr); |
| if (sv->Unref()) { |
| sv->Cleanup(); |
| delete sv; |
| } |
| } |
| } |
| |
| #ifndef ROCKSDB_LITE |
| Status ColumnFamilyData::SetOptions( |
| const std::unordered_map<std::string, std::string>& options_map) { |
| MutableCFOptions new_mutable_cf_options; |
| Status s = GetMutableOptionsFromStrings(mutable_cf_options_, options_map, |
| &new_mutable_cf_options); |
| if (s.ok()) { |
| mutable_cf_options_ = new_mutable_cf_options; |
| mutable_cf_options_.RefreshDerivedOptions(ioptions_); |
| } |
| return s; |
| } |
| #endif // ROCKSDB_LITE |
| |
| ColumnFamilySet::ColumnFamilySet(const std::string& dbname, |
| const ImmutableDBOptions* db_options, |
| const EnvOptions& env_options, |
| Cache* table_cache, |
| WriteBufferManager* write_buffer_manager, |
| WriteController* write_controller) |
| : max_column_family_(0), |
| dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr, |
| ColumnFamilyOptions(), *db_options, |
| env_options, nullptr)), |
| default_cfd_cache_(nullptr), |
| db_name_(dbname), |
| db_options_(db_options), |
| env_options_(env_options), |
| table_cache_(table_cache), |
| write_buffer_manager_(write_buffer_manager), |
| write_controller_(write_controller) { |
| // initialize linked list |
| dummy_cfd_->prev_ = dummy_cfd_; |
| dummy_cfd_->next_ = dummy_cfd_; |
| } |
| |
| ColumnFamilySet::~ColumnFamilySet() { |
| while (column_family_data_.size() > 0) { |
| // cfd destructor will delete itself from column_family_data_ |
| auto cfd = column_family_data_.begin()->second; |
| cfd->Unref(); |
| delete cfd; |
| } |
| dummy_cfd_->Unref(); |
| delete dummy_cfd_; |
| } |
| |
| ColumnFamilyData* ColumnFamilySet::GetDefault() const { |
| assert(default_cfd_cache_ != nullptr); |
| return default_cfd_cache_; |
| } |
| |
| ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const { |
| auto cfd_iter = column_family_data_.find(id); |
| if (cfd_iter != column_family_data_.end()) { |
| return cfd_iter->second; |
| } else { |
| return nullptr; |
| } |
| } |
| |
| ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name) |
| const { |
| auto cfd_iter = column_families_.find(name); |
| if (cfd_iter != column_families_.end()) { |
| auto cfd = GetColumnFamily(cfd_iter->second); |
| assert(cfd != nullptr); |
| return cfd; |
| } else { |
| return nullptr; |
| } |
| } |
| |
| uint32_t ColumnFamilySet::GetNextColumnFamilyID() { |
| return ++max_column_family_; |
| } |
| |
| uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; } |
| |
| void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) { |
| max_column_family_ = std::max(new_max_column_family, max_column_family_); |
| } |
| |
| size_t ColumnFamilySet::NumberOfColumnFamilies() const { |
| return column_families_.size(); |
| } |
| |
| // under a DB mutex AND write thread |
| ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( |
| const std::string& name, uint32_t id, Version* dummy_versions, |
| const ColumnFamilyOptions& options) { |
| assert(column_families_.find(name) == column_families_.end()); |
| ColumnFamilyData* new_cfd = new ColumnFamilyData( |
| id, name, dummy_versions, table_cache_, write_buffer_manager_, options, |
| *db_options_, env_options_, this); |
| column_families_.insert({name, id}); |
| column_family_data_.insert({id, new_cfd}); |
| max_column_family_ = std::max(max_column_family_, id); |
| // add to linked list |
| new_cfd->next_ = dummy_cfd_; |
| auto prev = dummy_cfd_->prev_; |
| new_cfd->prev_ = prev; |
| prev->next_ = new_cfd; |
| dummy_cfd_->prev_ = new_cfd; |
| if (id == 0) { |
| default_cfd_cache_ = new_cfd; |
| } |
| return new_cfd; |
| } |
| |
| // REQUIRES: DB mutex held |
| void ColumnFamilySet::FreeDeadColumnFamilies() { |
| autovector<ColumnFamilyData*> to_delete; |
| for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) { |
| if (cfd->refs_.load(std::memory_order_relaxed) == 0) { |
| to_delete.push_back(cfd); |
| } |
| } |
| for (auto cfd : to_delete) { |
| // this is very rare, so it's not a problem that we do it under a mutex |
| delete cfd; |
| } |
| } |
| |
| // under a DB mutex AND from a write thread |
| void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) { |
| auto cfd_iter = column_family_data_.find(cfd->GetID()); |
| assert(cfd_iter != column_family_data_.end()); |
| column_family_data_.erase(cfd_iter); |
| column_families_.erase(cfd->GetName()); |
| } |
| |
| // under a DB mutex OR from a write thread |
| bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) { |
| if (column_family_id == 0) { |
| // optimization for common case |
| current_ = column_family_set_->GetDefault(); |
| } else { |
| current_ = column_family_set_->GetColumnFamily(column_family_id); |
| } |
| handle_.SetCFD(current_); |
| return current_ != nullptr; |
| } |
| |
| uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const { |
| assert(current_ != nullptr); |
| return current_->GetLogNumber(); |
| } |
| |
| MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const { |
| assert(current_ != nullptr); |
| return current_->mem(); |
| } |
| |
| ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() { |
| assert(current_ != nullptr); |
| return &handle_; |
| } |
| |
| uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) { |
| uint32_t column_family_id = 0; |
| if (column_family != nullptr) { |
| auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); |
| column_family_id = cfh->GetID(); |
| } |
| return column_family_id; |
| } |
| |
| const Comparator* GetColumnFamilyUserComparator( |
| ColumnFamilyHandle* column_family) { |
| if (column_family != nullptr) { |
| return column_family->GetComparator(); |
| } |
| return nullptr; |
| } |
| |
| } // namespace rocksdb |