| // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| // |
| // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. See the AUTHORS file for names of contributors. |
| #include "db/db_impl.h" |
| |
| #ifndef __STDC_FORMAT_MACROS |
| #define __STDC_FORMAT_MACROS |
| #endif |
| #include <inttypes.h> |
| |
| #include "db/builder.h" |
| #include "db/event_helpers.h" |
| #include "monitoring/iostats_context_imp.h" |
| #include "monitoring/perf_context_imp.h" |
| #include "monitoring/thread_status_updater.h" |
| #include "monitoring/thread_status_util.h" |
| #include "util/sst_file_manager_impl.h" |
| #include "util/sync_point.h" |
| |
| namespace rocksdb { |
| Status DBImpl::SyncClosedLogs(JobContext* job_context) { |
| TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start"); |
| mutex_.AssertHeld(); |
| autovector<log::Writer*, 1> logs_to_sync; |
| uint64_t current_log_number = logfile_number_; |
| while (logs_.front().number < current_log_number && |
| logs_.front().getting_synced) { |
| log_sync_cv_.Wait(); |
| } |
| for (auto it = logs_.begin(); |
| it != logs_.end() && it->number < current_log_number; ++it) { |
| auto& log = *it; |
| assert(!log.getting_synced); |
| log.getting_synced = true; |
| logs_to_sync.push_back(log.writer); |
| } |
| |
| Status s; |
| if (!logs_to_sync.empty()) { |
| mutex_.Unlock(); |
| |
| for (log::Writer* log : logs_to_sync) { |
| ROCKS_LOG_INFO(immutable_db_options_.info_log, |
| "[JOB %d] Syncing log #%" PRIu64, job_context->job_id, |
| log->get_log_number()); |
| s = log->file()->Sync(immutable_db_options_.use_fsync); |
| } |
| if (s.ok()) { |
| s = directories_.GetWalDir()->Fsync(); |
| } |
| |
| mutex_.Lock(); |
| |
| // "number <= current_log_number - 1" is equivalent to |
| // "number < current_log_number". |
| MarkLogsSynced(current_log_number - 1, true, s); |
| if (!s.ok()) { |
| Status new_bg_error = s; |
| // may temporarily unlock and lock the mutex. |
| EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners, |
| BackgroundErrorReason::kFlush, |
| &new_bg_error, &mutex_); |
| if (!new_bg_error.ok()) { |
| bg_error_ = new_bg_error; |
| } |
| TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed"); |
| return s; |
| } |
| } |
| return s; |
| } |
| |
| Status DBImpl::FlushMemTableToOutputFile( |
| ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, |
| bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) { |
| mutex_.AssertHeld(); |
| assert(cfd->imm()->NumNotFlushed() != 0); |
| assert(cfd->imm()->IsFlushPending()); |
| |
| SequenceNumber earliest_write_conflict_snapshot; |
| std::vector<SequenceNumber> snapshot_seqs = |
| snapshots_.GetAll(&earliest_write_conflict_snapshot); |
| |
| FlushJob flush_job( |
| dbname_, cfd, immutable_db_options_, mutable_cf_options, env_options_, |
| versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, |
| earliest_write_conflict_snapshot, job_context, log_buffer, |
| directories_.GetDbDir(), directories_.GetDataDir(0U), |
| GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, |
| &event_logger_, mutable_cf_options.report_bg_io_stats); |
| |
| FileMetaData file_meta; |
| |
| flush_job.PickMemTable(); |
| |
| #ifndef ROCKSDB_LITE |
| // may temporarily unlock and lock the mutex. |
| NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id, |
| flush_job.GetTableProperties()); |
| #endif // ROCKSDB_LITE |
| |
| Status s; |
| if (logfile_number_ > 0 && |
| versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 0) { |
| // If there are more than one column families, we need to make sure that |
| // all the log files except the most recent one are synced. Otherwise if |
| // the host crashes after flushing and before WAL is persistent, the |
| // flushed SST may contain data from write batches whose updates to |
| // other column families are missing. |
| // SyncClosedLogs() may unlock and re-lock the db_mutex. |
| s = SyncClosedLogs(job_context); |
| } |
| |
| // Within flush_job.Run, rocksdb may call event listener to notify |
| // file creation and deletion. |
| // |
| // Note that flush_job.Run will unlock and lock the db_mutex, |
| // and EventListener callback will be called when the db_mutex |
| // is unlocked by the current thread. |
| if (s.ok()) { |
| s = flush_job.Run(&file_meta); |
| } else { |
| flush_job.Cancel(); |
| } |
| |
| if (s.ok()) { |
| InstallSuperVersionAndScheduleWorkWrapper(cfd, job_context, |
| mutable_cf_options); |
| if (made_progress) { |
| *made_progress = 1; |
| } |
| VersionStorageInfo::LevelSummaryStorage tmp; |
| ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n", |
| cfd->GetName().c_str(), |
| cfd->current()->storage_info()->LevelSummary(&tmp)); |
| } |
| |
| if (!s.ok() && !s.IsShutdownInProgress() && |
| immutable_db_options_.paranoid_checks && bg_error_.ok()) { |
| Status new_bg_error = s; |
| // may temporarily unlock and lock the mutex. |
| EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners, |
| BackgroundErrorReason::kFlush, |
| &new_bg_error, &mutex_); |
| if (!new_bg_error.ok()) { |
| // if a bad error happened (not ShutdownInProgress), paranoid_checks is |
| // true, and the error isn't handled by callback, mark DB read-only |
| bg_error_ = new_bg_error; |
| } |
| } |
| if (s.ok()) { |
| #ifndef ROCKSDB_LITE |
| // may temporarily unlock and lock the mutex. |
| NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options, |
| job_context->job_id, flush_job.GetTableProperties()); |
| auto sfm = static_cast<SstFileManagerImpl*>( |
| immutable_db_options_.sst_file_manager.get()); |
| if (sfm) { |
| // Notify sst_file_manager that a new file was added |
| std::string file_path = MakeTableFileName( |
| immutable_db_options_.db_paths[0].path, file_meta.fd.GetNumber()); |
| sfm->OnAddFile(file_path); |
| if (sfm->IsMaxAllowedSpaceReached() && bg_error_.ok()) { |
| Status new_bg_error = Status::IOError("Max allowed space was reached"); |
| TEST_SYNC_POINT_CALLBACK( |
| "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached", |
| &new_bg_error); |
| // may temporarily unlock and lock the mutex. |
| EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners, |
| BackgroundErrorReason::kFlush, |
| &new_bg_error, &mutex_); |
| if (!new_bg_error.ok()) { |
| bg_error_ = new_bg_error; |
| } |
| } |
| } |
| #endif // ROCKSDB_LITE |
| } |
| return s; |
| } |
| |
| void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, |
| const MutableCFOptions& mutable_cf_options, |
| int job_id, TableProperties prop) { |
| #ifndef ROCKSDB_LITE |
| if (immutable_db_options_.listeners.size() == 0U) { |
| return; |
| } |
| mutex_.AssertHeld(); |
| if (shutting_down_.load(std::memory_order_acquire)) { |
| return; |
| } |
| bool triggered_writes_slowdown = |
| (cfd->current()->storage_info()->NumLevelFiles(0) >= |
| mutable_cf_options.level0_slowdown_writes_trigger); |
| bool triggered_writes_stop = |
| (cfd->current()->storage_info()->NumLevelFiles(0) >= |
| mutable_cf_options.level0_stop_writes_trigger); |
| // release lock while notifying events |
| mutex_.Unlock(); |
| { |
| FlushJobInfo info; |
| info.cf_name = cfd->GetName(); |
| // TODO(yhchiang): make db_paths dynamic in case flush does not |
| // go to L0 in the future. |
| info.file_path = MakeTableFileName(immutable_db_options_.db_paths[0].path, |
| file_meta->fd.GetNumber()); |
| info.thread_id = env_->GetThreadID(); |
| info.job_id = job_id; |
| info.triggered_writes_slowdown = triggered_writes_slowdown; |
| info.triggered_writes_stop = triggered_writes_stop; |
| info.smallest_seqno = file_meta->smallest_seqno; |
| info.largest_seqno = file_meta->largest_seqno; |
| info.table_properties = prop; |
| for (auto listener : immutable_db_options_.listeners) { |
| listener->OnFlushBegin(this, info); |
| } |
| } |
| mutex_.Lock(); |
| // no need to signal bg_cv_ as it will be signaled at the end of the |
| // flush process. |
| #endif // ROCKSDB_LITE |
| } |
| |
| void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd, |
| FileMetaData* file_meta, |
| const MutableCFOptions& mutable_cf_options, |
| int job_id, TableProperties prop) { |
| #ifndef ROCKSDB_LITE |
| if (immutable_db_options_.listeners.size() == 0U) { |
| return; |
| } |
| mutex_.AssertHeld(); |
| if (shutting_down_.load(std::memory_order_acquire)) { |
| return; |
| } |
| bool triggered_writes_slowdown = |
| (cfd->current()->storage_info()->NumLevelFiles(0) >= |
| mutable_cf_options.level0_slowdown_writes_trigger); |
| bool triggered_writes_stop = |
| (cfd->current()->storage_info()->NumLevelFiles(0) >= |
| mutable_cf_options.level0_stop_writes_trigger); |
| // release lock while notifying events |
| mutex_.Unlock(); |
| { |
| FlushJobInfo info; |
| info.cf_name = cfd->GetName(); |
| // TODO(yhchiang): make db_paths dynamic in case flush does not |
| // go to L0 in the future. |
| info.file_path = MakeTableFileName(immutable_db_options_.db_paths[0].path, |
| file_meta->fd.GetNumber()); |
| info.thread_id = env_->GetThreadID(); |
| info.job_id = job_id; |
| info.triggered_writes_slowdown = triggered_writes_slowdown; |
| info.triggered_writes_stop = triggered_writes_stop; |
| info.smallest_seqno = file_meta->smallest_seqno; |
| info.largest_seqno = file_meta->largest_seqno; |
| info.table_properties = prop; |
| for (auto listener : immutable_db_options_.listeners) { |
| listener->OnFlushCompleted(this, info); |
| } |
| } |
| mutex_.Lock(); |
| // no need to signal bg_cv_ as it will be signaled at the end of the |
| // flush process. |
| #endif // ROCKSDB_LITE |
| } |
| |
| Status DBImpl::CompactRange(const CompactRangeOptions& options, |
| ColumnFamilyHandle* column_family, |
| const Slice* begin, const Slice* end) { |
| if (options.target_path_id >= immutable_db_options_.db_paths.size()) { |
| return Status::InvalidArgument("Invalid target path ID"); |
| } |
| |
| auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); |
| auto cfd = cfh->cfd(); |
| bool exclusive = options.exclusive_manual_compaction; |
| |
| Status s = FlushMemTable(cfd, FlushOptions()); |
| if (!s.ok()) { |
| LogFlush(immutable_db_options_.info_log); |
| return s; |
| } |
| |
| int max_level_with_files = 0; |
| { |
| InstrumentedMutexLock l(&mutex_); |
| Version* base = cfd->current(); |
| for (int level = 1; level < base->storage_info()->num_non_empty_levels(); |
| level++) { |
| if (base->storage_info()->OverlapInLevel(level, begin, end)) { |
| max_level_with_files = level; |
| } |
| } |
| } |
| |
| int final_output_level = 0; |
| if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal && |
| cfd->NumberLevels() > 1) { |
| // Always compact all files together. |
| final_output_level = cfd->NumberLevels() - 1; |
| // if bottom most level is reserved |
| if (immutable_db_options_.allow_ingest_behind) { |
| final_output_level--; |
| } |
| s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels, |
| final_output_level, options.target_path_id, |
| begin, end, exclusive); |
| } else { |
| for (int level = 0; level <= max_level_with_files; level++) { |
| int output_level; |
| // in case the compaction is universal or if we're compacting the |
| // bottom-most level, the output level will be the same as input one. |
| // level 0 can never be the bottommost level (i.e. if all files are in |
| // level 0, we will compact to level 1) |
| if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal || |
| cfd->ioptions()->compaction_style == kCompactionStyleFIFO) { |
| output_level = level; |
| } else if (level == max_level_with_files && level > 0) { |
| if (options.bottommost_level_compaction == |
| BottommostLevelCompaction::kSkip) { |
| // Skip bottommost level compaction |
| continue; |
| } else if (options.bottommost_level_compaction == |
| BottommostLevelCompaction::kIfHaveCompactionFilter && |
| cfd->ioptions()->compaction_filter == nullptr && |
| cfd->ioptions()->compaction_filter_factory == nullptr) { |
| // Skip bottommost level compaction since we don't have a compaction |
| // filter |
| continue; |
| } |
| output_level = level; |
| } else { |
| output_level = level + 1; |
| if (cfd->ioptions()->compaction_style == kCompactionStyleLevel && |
| cfd->ioptions()->level_compaction_dynamic_level_bytes && |
| level == 0) { |
| output_level = ColumnFamilyData::kCompactToBaseLevel; |
| } |
| } |
| s = RunManualCompaction(cfd, level, output_level, options.target_path_id, |
| begin, end, exclusive); |
| if (!s.ok()) { |
| break; |
| } |
| if (output_level == ColumnFamilyData::kCompactToBaseLevel) { |
| final_output_level = cfd->NumberLevels() - 1; |
| } else if (output_level > final_output_level) { |
| final_output_level = output_level; |
| } |
| TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1"); |
| TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2"); |
| } |
| } |
| if (!s.ok()) { |
| LogFlush(immutable_db_options_.info_log); |
| return s; |
| } |
| |
| if (options.change_level) { |
| ROCKS_LOG_INFO(immutable_db_options_.info_log, |
| "[RefitLevel] waiting for background threads to stop"); |
| s = PauseBackgroundWork(); |
| if (s.ok()) { |
| s = ReFitLevel(cfd, final_output_level, options.target_level); |
| } |
| ContinueBackgroundWork(); |
| } |
| LogFlush(immutable_db_options_.info_log); |
| |
| { |
| InstrumentedMutexLock l(&mutex_); |
| // an automatic compaction that has been scheduled might have been |
| // preempted by the manual compactions. Need to schedule it back. |
| MaybeScheduleFlushOrCompaction(); |
| } |
| |
| return s; |
| } |
| |
| Status DBImpl::CompactFiles( |
| const CompactionOptions& compact_options, |
| ColumnFamilyHandle* column_family, |
| const std::vector<std::string>& input_file_names, |
| const int output_level, const int output_path_id) { |
| #ifdef ROCKSDB_LITE |
| // not supported in lite version |
| return Status::NotSupported("Not supported in ROCKSDB LITE"); |
| #else |
| if (column_family == nullptr) { |
| return Status::InvalidArgument("ColumnFamilyHandle must be non-null."); |
| } |
| |
| auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); |
| assert(cfd); |
| |
| Status s; |
| JobContext job_context(0, true); |
| LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, |
| immutable_db_options_.info_log.get()); |
| |
| // Perform CompactFiles |
| SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); |
| { |
| InstrumentedMutexLock l(&mutex_); |
| |
| // This call will unlock/lock the mutex to wait for current running |
| // IngestExternalFile() calls to finish. |
| WaitForIngestFile(); |
| |
| s = CompactFilesImpl(compact_options, cfd, sv->current, |
| input_file_names, output_level, |
| output_path_id, &job_context, &log_buffer); |
| } |
| if (sv->Unref()) { |
| mutex_.Lock(); |
| sv->Cleanup(); |
| mutex_.Unlock(); |
| delete sv; |
| } |
| |
| // Find and delete obsolete files |
| { |
| InstrumentedMutexLock l(&mutex_); |
| // If !s.ok(), this means that Compaction failed. In that case, we want |
| // to delete all obsolete files we might have created and we force |
| // FindObsoleteFiles(). This is because job_context does not |
| // catch all created files if compaction failed. |
| FindObsoleteFiles(&job_context, !s.ok()); |
| } // release the mutex |
| |
| // delete unnecessary files if any, this is done outside the mutex |
| if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { |
| // Have to flush the info logs before bg_compaction_scheduled_-- |
| // because if bg_flush_scheduled_ becomes 0 and the lock is |
| // released, the deconstructor of DB can kick in and destroy all the |
| // states of DB so info_log might not be available after that point. |
| // It also applies to access other states that DB owns. |
| log_buffer.FlushBufferToLog(); |
| if (job_context.HaveSomethingToDelete()) { |
| // no mutex is locked here. No need to Unlock() and Lock() here. |
| PurgeObsoleteFiles(job_context); |
| } |
| job_context.Clean(); |
| } |
| |
| return s; |
| #endif // ROCKSDB_LITE |
| } |
| |
| #ifndef ROCKSDB_LITE |
| Status DBImpl::CompactFilesImpl( |
| const CompactionOptions& compact_options, ColumnFamilyData* cfd, |
| Version* version, const std::vector<std::string>& input_file_names, |
| const int output_level, int output_path_id, JobContext* job_context, |
| LogBuffer* log_buffer) { |
| mutex_.AssertHeld(); |
| |
| if (shutting_down_.load(std::memory_order_acquire)) { |
| return Status::ShutdownInProgress(); |
| } |
| |
| std::unordered_set<uint64_t> input_set; |
| for (auto file_name : input_file_names) { |
| input_set.insert(TableFileNameToNumber(file_name)); |
| } |
| |
| ColumnFamilyMetaData cf_meta; |
| // TODO(yhchiang): can directly use version here if none of the |
| // following functions call is pluggable to external developers. |
| version->GetColumnFamilyMetaData(&cf_meta); |
| |
| if (output_path_id < 0) { |
| if (immutable_db_options_.db_paths.size() == 1U) { |
| output_path_id = 0; |
| } else { |
| return Status::NotSupported( |
| "Automatic output path selection is not " |
| "yet supported in CompactFiles()"); |
| } |
| } |
| |
| Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles( |
| &input_set, cf_meta, output_level); |
| if (!s.ok()) { |
| return s; |
| } |
| |
| std::vector<CompactionInputFiles> input_files; |
| s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers( |
| &input_files, &input_set, version->storage_info(), compact_options); |
| if (!s.ok()) { |
| return s; |
| } |
| |
| for (auto inputs : input_files) { |
| if (cfd->compaction_picker()->AreFilesInCompaction(inputs.files)) { |
| return Status::Aborted( |
| "Some of the necessary compaction input " |
| "files are already being compacted"); |
| } |
| } |
| |
| // At this point, CompactFiles will be run. |
| bg_compaction_scheduled_++; |
| |
| unique_ptr<Compaction> c; |
| assert(cfd->compaction_picker()); |
| c.reset(cfd->compaction_picker()->CompactFiles( |
| compact_options, input_files, output_level, version->storage_info(), |
| *cfd->GetLatestMutableCFOptions(), output_path_id)); |
| if (!c) { |
| return Status::Aborted("Another Level 0 compaction is running"); |
| } |
| c->SetInputVersion(version); |
| // deletion compaction currently not allowed in CompactFiles. |
| assert(!c->deletion_compaction()); |
| |
| SequenceNumber earliest_write_conflict_snapshot; |
| std::vector<SequenceNumber> snapshot_seqs = |
| snapshots_.GetAll(&earliest_write_conflict_snapshot); |
| |
| auto pending_outputs_inserted_elem = |
| CaptureCurrentFileNumberInPendingOutputs(); |
| |
| assert(is_snapshot_supported_ || snapshots_.empty()); |
| CompactionJob compaction_job( |
| job_context->job_id, c.get(), immutable_db_options_, env_options_, |
| versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), |
| directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, &bg_error_, |
| snapshot_seqs, earliest_write_conflict_snapshot, table_cache_, |
| &event_logger_, c->mutable_cf_options()->paranoid_file_checks, |
| c->mutable_cf_options()->report_bg_io_stats, dbname_, |
| nullptr); // Here we pass a nullptr for CompactionJobStats because |
| // CompactFiles does not trigger OnCompactionCompleted(), |
| // which is the only place where CompactionJobStats is |
| // returned. The idea of not triggering OnCompationCompleted() |
| // is that CompactFiles runs in the caller thread, so the user |
| // should always know when it completes. As a result, it makes |
| // less sense to notify the users something they should already |
| // know. |
| // |
| // In the future, if we would like to add CompactionJobStats |
| // support for CompactFiles, we should have CompactFiles API |
| // pass a pointer of CompactionJobStats as the out-value |
| // instead of using EventListener. |
| |
| // Creating a compaction influences the compaction score because the score |
| // takes running compactions into account (by skipping files that are already |
| // being compacted). Since we just changed compaction score, we recalculate it |
| // here. |
| version->storage_info()->ComputeCompactionScore(*cfd->ioptions(), |
| *c->mutable_cf_options()); |
| |
| compaction_job.Prepare(); |
| |
| mutex_.Unlock(); |
| TEST_SYNC_POINT("CompactFilesImpl:0"); |
| TEST_SYNC_POINT("CompactFilesImpl:1"); |
| compaction_job.Run(); |
| TEST_SYNC_POINT("CompactFilesImpl:2"); |
| TEST_SYNC_POINT("CompactFilesImpl:3"); |
| mutex_.Lock(); |
| |
| Status status = compaction_job.Install(*c->mutable_cf_options()); |
| if (status.ok()) { |
| InstallSuperVersionAndScheduleWorkWrapper( |
| c->column_family_data(), job_context, *c->mutable_cf_options()); |
| } |
| c->ReleaseCompactionFiles(s); |
| |
| ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); |
| |
| if (status.ok()) { |
| // Done |
| } else if (status.IsShutdownInProgress()) { |
| // Ignore compaction errors found during shutting down |
| } else { |
| ROCKS_LOG_WARN(immutable_db_options_.info_log, |
| "[%s] [JOB %d] Compaction error: %s", |
| c->column_family_data()->GetName().c_str(), |
| job_context->job_id, status.ToString().c_str()); |
| if (immutable_db_options_.paranoid_checks && bg_error_.ok()) { |
| Status new_bg_error = status; |
| // may temporarily unlock and lock the mutex. |
| EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners, |
| BackgroundErrorReason::kCompaction, |
| &new_bg_error, &mutex_); |
| if (!new_bg_error.ok()) { |
| bg_error_ = new_bg_error; |
| } |
| } |
| } |
| |
| c.reset(); |
| |
| bg_compaction_scheduled_--; |
| if (bg_compaction_scheduled_ == 0) { |
| bg_cv_.SignalAll(); |
| } |
| |
| return status; |
| } |
| #endif // ROCKSDB_LITE |
| |
| Status DBImpl::PauseBackgroundWork() { |
| InstrumentedMutexLock guard_lock(&mutex_); |
| bg_compaction_paused_++; |
| while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 || |
| bg_flush_scheduled_ > 0) { |
| bg_cv_.Wait(); |
| } |
| bg_work_paused_++; |
| return Status::OK(); |
| } |
| |
| Status DBImpl::ContinueBackgroundWork() { |
| InstrumentedMutexLock guard_lock(&mutex_); |
| if (bg_work_paused_ == 0) { |
| return Status::InvalidArgument(); |
| } |
| assert(bg_work_paused_ > 0); |
| assert(bg_compaction_paused_ > 0); |
| bg_compaction_paused_--; |
| bg_work_paused_--; |
| // It's sufficient to check just bg_work_paused_ here since |
| // bg_work_paused_ is always no greater than bg_compaction_paused_ |
| if (bg_work_paused_ == 0) { |
| MaybeScheduleFlushOrCompaction(); |
| } |
| return Status::OK(); |
| } |
| |
| void DBImpl::NotifyOnCompactionCompleted( |
| ColumnFamilyData* cfd, Compaction *c, const Status &st, |
| const CompactionJobStats& compaction_job_stats, |
| const int job_id) { |
| #ifndef ROCKSDB_LITE |
| if (immutable_db_options_.listeners.size() == 0U) { |
| return; |
| } |
| mutex_.AssertHeld(); |
| if (shutting_down_.load(std::memory_order_acquire)) { |
| return; |
| } |
| // release lock while notifying events |
| mutex_.Unlock(); |
| TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex"); |
| { |
| CompactionJobInfo info; |
| info.cf_name = cfd->GetName(); |
| info.status = st; |
| info.thread_id = env_->GetThreadID(); |
| info.job_id = job_id; |
| info.base_input_level = c->start_level(); |
| info.output_level = c->output_level(); |
| info.stats = compaction_job_stats; |
| info.table_properties = c->GetOutputTableProperties(); |
| info.compaction_reason = c->compaction_reason(); |
| info.compression = c->output_compression(); |
| for (size_t i = 0; i < c->num_input_levels(); ++i) { |
| for (const auto fmd : *c->inputs(i)) { |
| auto fn = TableFileName(immutable_db_options_.db_paths, |
| fmd->fd.GetNumber(), fmd->fd.GetPathId()); |
| info.input_files.push_back(fn); |
| if (info.table_properties.count(fn) == 0) { |
| std::shared_ptr<const TableProperties> tp; |
| auto s = cfd->current()->GetTableProperties(&tp, fmd, &fn); |
| if (s.ok()) { |
| info.table_properties[fn] = tp; |
| } |
| } |
| } |
| } |
| for (const auto newf : c->edit()->GetNewFiles()) { |
| info.output_files.push_back(TableFileName(immutable_db_options_.db_paths, |
| newf.second.fd.GetNumber(), |
| newf.second.fd.GetPathId())); |
| } |
| for (auto listener : immutable_db_options_.listeners) { |
| listener->OnCompactionCompleted(this, info); |
| } |
| } |
| mutex_.Lock(); |
| // no need to signal bg_cv_ as it will be signaled at the end of the |
| // flush process. |
| #endif // ROCKSDB_LITE |
| } |
| |
| // REQUIREMENT: block all background work by calling PauseBackgroundWork() |
| // before calling this function |
| Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { |
| assert(level < cfd->NumberLevels()); |
| if (target_level >= cfd->NumberLevels()) { |
| return Status::InvalidArgument("Target level exceeds number of levels"); |
| } |
| |
| std::unique_ptr<SuperVersion> superversion_to_free; |
| std::unique_ptr<SuperVersion> new_superversion(new SuperVersion()); |
| |
| Status status; |
| |
| InstrumentedMutexLock guard_lock(&mutex_); |
| |
| // only allow one thread refitting |
| if (refitting_level_) { |
| ROCKS_LOG_INFO(immutable_db_options_.info_log, |
| "[ReFitLevel] another thread is refitting"); |
| return Status::NotSupported("another thread is refitting"); |
| } |
| refitting_level_ = true; |
| |
| const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); |
| // move to a smaller level |
| int to_level = target_level; |
| if (target_level < 0) { |
| to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level); |
| } |
| |
| auto* vstorage = cfd->current()->storage_info(); |
| if (to_level > level) { |
| if (level == 0) { |
| return Status::NotSupported( |
| "Cannot change from level 0 to other levels."); |
| } |
| // Check levels are empty for a trivial move |
| for (int l = level + 1; l <= to_level; l++) { |
| if (vstorage->NumLevelFiles(l) > 0) { |
| return Status::NotSupported( |
| "Levels between source and target are not empty for a move."); |
| } |
| } |
| } |
| if (to_level != level) { |
| ROCKS_LOG_DEBUG(immutable_db_options_.info_log, |
| "[%s] Before refitting:\n%s", cfd->GetName().c_str(), |
| cfd->current()->DebugString().data()); |
| |
| VersionEdit edit; |
| edit.SetColumnFamily(cfd->GetID()); |
| for (const auto& f : vstorage->LevelFiles(level)) { |
| edit.DeleteFile(level, f->fd.GetNumber()); |
| edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(), |
| f->fd.GetFileSize(), f->smallest, f->largest, |
| f->smallest_seqno, f->largest_seqno, |
| f->marked_for_compaction); |
| } |
| ROCKS_LOG_DEBUG(immutable_db_options_.info_log, |
| "[%s] Apply version edit:\n%s", cfd->GetName().c_str(), |
| edit.DebugString().data()); |
| |
| status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, |
| directories_.GetDbDir()); |
| superversion_to_free.reset(InstallSuperVersionAndScheduleWork( |
| cfd, new_superversion.release(), mutable_cf_options)); |
| |
| ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n", |
| cfd->GetName().c_str(), status.ToString().data()); |
| |
| if (status.ok()) { |
| ROCKS_LOG_DEBUG(immutable_db_options_.info_log, |
| "[%s] After refitting:\n%s", cfd->GetName().c_str(), |
| cfd->current()->DebugString().data()); |
| } |
| } |
| |
| refitting_level_ = false; |
| |
| return status; |
| } |
| |
| int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) { |
| auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); |
| return cfh->cfd()->NumberLevels(); |
| } |
| |
| int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* column_family) { |
| return 0; |
| } |
| |
| int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) { |
| auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); |
| InstrumentedMutexLock l(&mutex_); |
| return cfh->cfd()->GetSuperVersion()-> |
| mutable_cf_options.level0_stop_writes_trigger; |
| } |
| |
| Status DBImpl::Flush(const FlushOptions& flush_options, |
| ColumnFamilyHandle* column_family) { |
| auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); |
| return FlushMemTable(cfh->cfd(), flush_options); |
| } |
| |
| Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, |
| int output_level, uint32_t output_path_id, |
| const Slice* begin, const Slice* end, |
| bool exclusive, bool disallow_trivial_move) { |
| assert(input_level == ColumnFamilyData::kCompactAllLevels || |
| input_level >= 0); |
| |
| InternalKey begin_storage, end_storage; |
| CompactionArg* ca; |
| |
| bool scheduled = false; |
| bool manual_conflict = false; |
| ManualCompactionState manual; |
| manual.cfd = cfd; |
| manual.input_level = input_level; |
| manual.output_level = output_level; |
| manual.output_path_id = output_path_id; |
| manual.done = false; |
| manual.in_progress = false; |
| manual.incomplete = false; |
| manual.exclusive = exclusive; |
| manual.disallow_trivial_move = disallow_trivial_move; |
| // For universal compaction, we enforce every manual compaction to compact |
| // all files. |
| if (begin == nullptr || |
| cfd->ioptions()->compaction_style == kCompactionStyleUniversal || |
| cfd->ioptions()->compaction_style == kCompactionStyleFIFO) { |
| manual.begin = nullptr; |
| } else { |
| begin_storage.SetMaxPossibleForUserKey(*begin); |
| manual.begin = &begin_storage; |
| } |
| if (end == nullptr || |
| cfd->ioptions()->compaction_style == kCompactionStyleUniversal || |
| cfd->ioptions()->compaction_style == kCompactionStyleFIFO) { |
| manual.end = nullptr; |
| } else { |
| end_storage.SetMinPossibleForUserKey(*end); |
| manual.end = &end_storage; |
| } |
| |
| TEST_SYNC_POINT("DBImpl::RunManualCompaction:0"); |
| TEST_SYNC_POINT("DBImpl::RunManualCompaction:1"); |
| InstrumentedMutexLock l(&mutex_); |
| |
| // When a manual compaction arrives, temporarily disable scheduling of |
| // non-manual compactions and wait until the number of scheduled compaction |
| // jobs drops to zero. This is needed to ensure that this manual compaction |
| // can compact any range of keys/files. |
| // |
| // HasPendingManualCompaction() is true when at least one thread is inside |
| // RunManualCompaction(), i.e. during that time no other compaction will |
| // get scheduled (see MaybeScheduleFlushOrCompaction). |
| // |
| // Note that the following loop doesn't stop more that one thread calling |
| // RunManualCompaction() from getting to the second while loop below. |
| // However, only one of them will actually schedule compaction, while |
| // others will wait on a condition variable until it completes. |
| |
| AddManualCompaction(&manual); |
| TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_); |
| if (exclusive) { |
| while (bg_bottom_compaction_scheduled_ > 0 || |
| bg_compaction_scheduled_ > 0) { |
| TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled"); |
| ROCKS_LOG_INFO( |
| immutable_db_options_.info_log, |
| "[%s] Manual compaction waiting for all other scheduled background " |
| "compactions to finish", |
| cfd->GetName().c_str()); |
| bg_cv_.Wait(); |
| } |
| } |
| |
| ROCKS_LOG_INFO(immutable_db_options_.info_log, |
| "[%s] Manual compaction starting", cfd->GetName().c_str()); |
| |
| // We don't check bg_error_ here, because if we get the error in compaction, |
| // the compaction will set manual.status to bg_error_ and set manual.done to |
| // true. |
| while (!manual.done) { |
| assert(HasPendingManualCompaction()); |
| manual_conflict = false; |
| Compaction* compaction; |
| if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) || |
| scheduled || |
| ((manual.manual_end = &manual.tmp_storage1) && |
| ((compaction = manual.cfd->CompactRange( |
| *manual.cfd->GetLatestMutableCFOptions(), manual.input_level, |
| manual.output_level, manual.output_path_id, manual.begin, |
| manual.end, &manual.manual_end, &manual_conflict)) == nullptr) && |
| manual_conflict)) { |
| // exclusive manual compactions should not see a conflict during |
| // CompactRange |
| assert(!exclusive || !manual_conflict); |
| // Running either this or some other manual compaction |
| bg_cv_.Wait(); |
| if (scheduled && manual.incomplete == true) { |
| assert(!manual.in_progress); |
| scheduled = false; |
| manual.incomplete = false; |
| } |
| } else if (!scheduled) { |
| if (compaction == nullptr) { |
| manual.done = true; |
| bg_cv_.SignalAll(); |
| continue; |
| } |
| ca = new CompactionArg; |
| ca->db = this; |
| ca->prepicked_compaction = new PrepickedCompaction; |
| ca->prepicked_compaction->manual_compaction_state = &manual; |
| ca->prepicked_compaction->compaction = compaction; |
| manual.incomplete = false; |
| bg_compaction_scheduled_++; |
| env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, |
| &DBImpl::UnscheduleCallback); |
| scheduled = true; |
| } |
| } |
| |
| assert(!manual.in_progress); |
| assert(HasPendingManualCompaction()); |
| RemoveManualCompaction(&manual); |
| bg_cv_.SignalAll(); |
| return manual.status; |
| } |
| |
| Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, |
| const FlushOptions& flush_options, |
| bool writes_stopped) { |
| Status s; |
| { |
| WriteContext context; |
| InstrumentedMutexLock guard_lock(&mutex_); |
| |
| if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty()) { |
| // Nothing to flush |
| return Status::OK(); |
| } |
| |
| WriteThread::Writer w; |
| if (!writes_stopped) { |
| write_thread_.EnterUnbatched(&w, &mutex_); |
| } |
| |
| // SwitchMemtable() will release and reacquire mutex |
| // during execution |
| s = SwitchMemtable(cfd, &context); |
| |
| if (!writes_stopped) { |
| write_thread_.ExitUnbatched(&w); |
| } |
| |
| cfd->imm()->FlushRequested(); |
| |
| // schedule flush |
| SchedulePendingFlush(cfd); |
| MaybeScheduleFlushOrCompaction(); |
| } |
| |
| if (s.ok() && flush_options.wait) { |
| // Wait until the compaction completes |
| s = WaitForFlushMemTable(cfd); |
| } |
| return s; |
| } |
| |
| Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) { |
| Status s; |
| // Wait until the compaction completes |
| InstrumentedMutexLock l(&mutex_); |
| while (cfd->imm()->NumNotFlushed() > 0 && bg_error_.ok()) { |
| if (shutting_down_.load(std::memory_order_acquire)) { |
| return Status::ShutdownInProgress(); |
| } |
| if (cfd->IsDropped()) { |
| // FlushJob cannot flush a dropped CF, if we did not break here |
| // we will loop forever since cfd->imm()->NumNotFlushed() will never |
| // drop to zero |
| return Status::InvalidArgument("Cannot flush a dropped CF"); |
| } |
| bg_cv_.Wait(); |
| } |
| if (!bg_error_.ok()) { |
| s = bg_error_; |
| } |
| return s; |
| } |
| |
| Status DBImpl::EnableAutoCompaction( |
| const std::vector<ColumnFamilyHandle*>& column_family_handles) { |
| Status s; |
| for (auto cf_ptr : column_family_handles) { |
| Status status = |
| this->SetOptions(cf_ptr, {{"disable_auto_compactions", "false"}}); |
| if (!status.ok()) { |
| s = status; |
| } |
| } |
| |
| return s; |
| } |
| |
| void DBImpl::MaybeScheduleFlushOrCompaction() { |
| mutex_.AssertHeld(); |
| if (!opened_successfully_) { |
| // Compaction may introduce data race to DB open |
| return; |
| } |
| if (bg_work_paused_ > 0) { |
| // we paused the background work |
| return; |
| } else if (shutting_down_.load(std::memory_order_acquire)) { |
| // DB is being deleted; no more background compactions |
| return; |
| } |
| auto bg_job_limits = GetBGJobLimits(); |
| bool is_flush_pool_empty = |
| env_->GetBackgroundThreads(Env::Priority::HIGH) == 0; |
| while (!is_flush_pool_empty && unscheduled_flushes_ > 0 && |
| bg_flush_scheduled_ < bg_job_limits.max_flushes) { |
| unscheduled_flushes_--; |
| bg_flush_scheduled_++; |
| env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this); |
| } |
| |
| // special case -- if high-pri (flush) thread pool is empty, then schedule |
| // flushes in low-pri (compaction) thread pool. |
| if (is_flush_pool_empty) { |
| while (unscheduled_flushes_ > 0 && |
| bg_flush_scheduled_ + bg_compaction_scheduled_ < |
| bg_job_limits.max_flushes) { |
| unscheduled_flushes_--; |
| bg_flush_scheduled_++; |
| env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::LOW, this); |
| } |
| } |
| |
| if (bg_compaction_paused_ > 0) { |
| // we paused the background compaction |
| return; |
| } |
| |
| if (HasExclusiveManualCompaction()) { |
| // only manual compactions are allowed to run. don't schedule automatic |
| // compactions |
| return; |
| } |
| |
| while (bg_compaction_scheduled_ < bg_job_limits.max_compactions && |
| unscheduled_compactions_ > 0) { |
| CompactionArg* ca = new CompactionArg; |
| ca->db = this; |
| ca->prepicked_compaction = nullptr; |
| bg_compaction_scheduled_++; |
| unscheduled_compactions_--; |
| env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, |
| &DBImpl::UnscheduleCallback); |
| } |
| } |
| |
| DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const { |
| mutex_.AssertHeld(); |
| return GetBGJobLimits(immutable_db_options_.max_background_flushes, |
| mutable_db_options_.max_background_compactions, |
| mutable_db_options_.max_background_jobs, |
| write_controller_.NeedSpeedupCompaction()); |
| } |
| |
| DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes, |
| int max_background_compactions, |
| int max_background_jobs, |
| bool parallelize_compactions) { |
| BGJobLimits res; |
| if (max_background_flushes == -1 && max_background_compactions == -1) { |
| // for our first stab implementing max_background_jobs, simply allocate a |
| // quarter of the threads to flushes. |
| res.max_flushes = std::max(1, max_background_jobs / 4); |
| res.max_compactions = std::max(1, max_background_jobs - res.max_flushes); |
| } else { |
| // compatibility code in case users haven't migrated to max_background_jobs, |
| // which automatically computes flush/compaction limits |
| res.max_flushes = std::max(1, max_background_flushes); |
| res.max_compactions = std::max(1, max_background_compactions); |
| } |
| if (!parallelize_compactions) { |
| // throttle background compactions until we deem necessary |
| res.max_compactions = 1; |
| } |
| return res; |
| } |
| |
| void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) { |
| assert(!cfd->pending_compaction()); |
| cfd->Ref(); |
| compaction_queue_.push_back(cfd); |
| cfd->set_pending_compaction(true); |
| } |
| |
| ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() { |
| assert(!compaction_queue_.empty()); |
| auto cfd = *compaction_queue_.begin(); |
| compaction_queue_.pop_front(); |
| assert(cfd->pending_compaction()); |
| cfd->set_pending_compaction(false); |
| return cfd; |
| } |
| |
| void DBImpl::AddToFlushQueue(ColumnFamilyData* cfd) { |
| assert(!cfd->pending_flush()); |
| cfd->Ref(); |
| flush_queue_.push_back(cfd); |
| cfd->set_pending_flush(true); |
| } |
| |
| ColumnFamilyData* DBImpl::PopFirstFromFlushQueue() { |
| assert(!flush_queue_.empty()); |
| auto cfd = *flush_queue_.begin(); |
| flush_queue_.pop_front(); |
| assert(cfd->pending_flush()); |
| cfd->set_pending_flush(false); |
| return cfd; |
| } |
| |
| void DBImpl::SchedulePendingFlush(ColumnFamilyData* cfd) { |
| if (!cfd->pending_flush() && cfd->imm()->IsFlushPending()) { |
| AddToFlushQueue(cfd); |
| ++unscheduled_flushes_; |
| } |
| } |
| |
| void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) { |
| if (!cfd->pending_compaction() && cfd->NeedsCompaction()) { |
| AddToCompactionQueue(cfd); |
| ++unscheduled_compactions_; |
| } |
| } |
| |
| void DBImpl::SchedulePendingPurge(std::string fname, FileType type, |
| uint64_t number, uint32_t path_id, |
| int job_id) { |
| mutex_.AssertHeld(); |
| PurgeFileInfo file_info(fname, type, number, path_id, job_id); |
| purge_queue_.push_back(std::move(file_info)); |
| } |
| |
| void DBImpl::BGWorkFlush(void* db) { |
| IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH); |
| TEST_SYNC_POINT("DBImpl::BGWorkFlush"); |
| reinterpret_cast<DBImpl*>(db)->BackgroundCallFlush(); |
| TEST_SYNC_POINT("DBImpl::BGWorkFlush:done"); |
| } |
| |
| void DBImpl::BGWorkCompaction(void* arg) { |
| CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg)); |
| delete reinterpret_cast<CompactionArg*>(arg); |
| IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW); |
| TEST_SYNC_POINT("DBImpl::BGWorkCompaction"); |
| auto prepicked_compaction = |
| static_cast<PrepickedCompaction*>(ca.prepicked_compaction); |
| reinterpret_cast<DBImpl*>(ca.db)->BackgroundCallCompaction( |
| prepicked_compaction, Env::Priority::LOW); |
| delete prepicked_compaction; |
| } |
| |
| void DBImpl::BGWorkBottomCompaction(void* arg) { |
| CompactionArg ca = *(static_cast<CompactionArg*>(arg)); |
| delete static_cast<CompactionArg*>(arg); |
| IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM); |
| TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction"); |
| auto* prepicked_compaction = ca.prepicked_compaction; |
| assert(prepicked_compaction && prepicked_compaction->compaction && |
| !prepicked_compaction->manual_compaction_state); |
| ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM); |
| delete prepicked_compaction; |
| } |
| |
| void DBImpl::BGWorkPurge(void* db) { |
| IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH); |
| TEST_SYNC_POINT("DBImpl::BGWorkPurge:start"); |
| reinterpret_cast<DBImpl*>(db)->BackgroundCallPurge(); |
| TEST_SYNC_POINT("DBImpl::BGWorkPurge:end"); |
| } |
| |
| void DBImpl::UnscheduleCallback(void* arg) { |
| CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg)); |
| delete reinterpret_cast<CompactionArg*>(arg); |
| if (ca.prepicked_compaction != nullptr) { |
| if (ca.prepicked_compaction->compaction != nullptr) { |
| delete ca.prepicked_compaction->compaction; |
| } |
| delete ca.prepicked_compaction; |
| } |
| TEST_SYNC_POINT("DBImpl::UnscheduleCallback"); |
| } |
| |
| Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, |
| LogBuffer* log_buffer) { |
| mutex_.AssertHeld(); |
| |
| Status status = bg_error_; |
| if (status.ok() && shutting_down_.load(std::memory_order_acquire)) { |
| status = Status::ShutdownInProgress(); |
| } |
| |
| if (!status.ok()) { |
| return status; |
| } |
| |
| ColumnFamilyData* cfd = nullptr; |
| while (!flush_queue_.empty()) { |
| // This cfd is already referenced |
| auto first_cfd = PopFirstFromFlushQueue(); |
| |
| if (first_cfd->IsDropped() || !first_cfd->imm()->IsFlushPending()) { |
| // can't flush this CF, try next one |
| if (first_cfd->Unref()) { |
| delete first_cfd; |
| } |
| continue; |
| } |
| |
| // found a flush! |
| cfd = first_cfd; |
| break; |
| } |
| |
| if (cfd != nullptr) { |
| const MutableCFOptions mutable_cf_options = |
| *cfd->GetLatestMutableCFOptions(); |
| auto bg_job_limits = GetBGJobLimits(); |
| ROCKS_LOG_BUFFER( |
| log_buffer, |
| "Calling FlushMemTableToOutputFile with column " |
| "family [%s], flush slots available %d, compaction slots available %d, " |
| "flush slots scheduled %d, compaction slots scheduled %d", |
| cfd->GetName().c_str(), bg_job_limits.max_flushes, |
| bg_job_limits.max_compactions, bg_flush_scheduled_, |
| bg_compaction_scheduled_); |
| status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress, |
| job_context, log_buffer); |
| if (cfd->Unref()) { |
| delete cfd; |
| } |
| } |
| return status; |
| } |
| |
| void DBImpl::BackgroundCallFlush() { |
| bool made_progress = false; |
| JobContext job_context(next_job_id_.fetch_add(1), true); |
| |
| TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start"); |
| |
| LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, |
| immutable_db_options_.info_log.get()); |
| { |
| InstrumentedMutexLock l(&mutex_); |
| assert(bg_flush_scheduled_); |
| num_running_flushes_++; |
| |
| auto pending_outputs_inserted_elem = |
| CaptureCurrentFileNumberInPendingOutputs(); |
| |
| Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer); |
| if (!s.ok() && !s.IsShutdownInProgress()) { |
| // Wait a little bit before retrying background flush in |
| // case this is an environmental problem and we do not want to |
| // chew up resources for failed flushes for the duration of |
| // the problem. |
| uint64_t error_cnt = |
| default_cf_internal_stats_->BumpAndGetBackgroundErrorCount(); |
| bg_cv_.SignalAll(); // In case a waiter can proceed despite the error |
| mutex_.Unlock(); |
| ROCKS_LOG_ERROR(immutable_db_options_.info_log, |
| "Waiting after background flush error: %s" |
| "Accumulated background error counts: %" PRIu64, |
| s.ToString().c_str(), error_cnt); |
| log_buffer.FlushBufferToLog(); |
| LogFlush(immutable_db_options_.info_log); |
| env_->SleepForMicroseconds(1000000); |
| mutex_.Lock(); |
| } |
| |
| ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); |
| |
| // If flush failed, we want to delete all temporary files that we might have |
| // created. Thus, we force full scan in FindObsoleteFiles() |
| FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress()); |
| // delete unnecessary files if any, this is done outside the mutex |
| if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { |
| mutex_.Unlock(); |
| // Have to flush the info logs before bg_flush_scheduled_-- |
| // because if bg_flush_scheduled_ becomes 0 and the lock is |
| // released, the deconstructor of DB can kick in and destroy all the |
| // states of DB so info_log might not be available after that point. |
| // It also applies to access other states that DB owns. |
| log_buffer.FlushBufferToLog(); |
| if (job_context.HaveSomethingToDelete()) { |
| PurgeObsoleteFiles(job_context); |
| } |
| job_context.Clean(); |
| mutex_.Lock(); |
| } |
| |
| assert(num_running_flushes_ > 0); |
| num_running_flushes_--; |
| bg_flush_scheduled_--; |
| // See if there's more work to be done |
| MaybeScheduleFlushOrCompaction(); |
| bg_cv_.SignalAll(); |
| // IMPORTANT: there should be no code after calling SignalAll. This call may |
| // signal the DB destructor that it's OK to proceed with destruction. In |
| // that case, all DB variables will be dealloacated and referencing them |
| // will cause trouble. |
| } |
| } |
| |
| void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, |
| Env::Priority bg_thread_pri) { |
| bool made_progress = false; |
| JobContext job_context(next_job_id_.fetch_add(1), true); |
| TEST_SYNC_POINT("BackgroundCallCompaction:0"); |
| MaybeDumpStats(); |
| LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, |
| immutable_db_options_.info_log.get()); |
| { |
| InstrumentedMutexLock l(&mutex_); |
| |
| // This call will unlock/lock the mutex to wait for current running |
| // IngestExternalFile() calls to finish. |
| WaitForIngestFile(); |
| |
| num_running_compactions_++; |
| |
| auto pending_outputs_inserted_elem = |
| CaptureCurrentFileNumberInPendingOutputs(); |
| |
| assert((bg_thread_pri == Env::Priority::BOTTOM && |
| bg_bottom_compaction_scheduled_) || |
| (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_)); |
| Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer, |
| prepicked_compaction); |
| TEST_SYNC_POINT("BackgroundCallCompaction:1"); |
| if (!s.ok() && !s.IsShutdownInProgress()) { |
| // Wait a little bit before retrying background compaction in |
| // case this is an environmental problem and we do not want to |
| // chew up resources for failed compactions for the duration of |
| // the problem. |
| uint64_t error_cnt = |
| default_cf_internal_stats_->BumpAndGetBackgroundErrorCount(); |
| bg_cv_.SignalAll(); // In case a waiter can proceed despite the error |
| mutex_.Unlock(); |
| log_buffer.FlushBufferToLog(); |
| ROCKS_LOG_ERROR(immutable_db_options_.info_log, |
| "Waiting after background compaction error: %s, " |
| "Accumulated background error counts: %" PRIu64, |
| s.ToString().c_str(), error_cnt); |
| LogFlush(immutable_db_options_.info_log); |
| env_->SleepForMicroseconds(1000000); |
| mutex_.Lock(); |
| } |
| |
| ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); |
| |
| // If compaction failed, we want to delete all temporary files that we might |
| // have created (they might not be all recorded in job_context in case of a |
| // failure). Thus, we force full scan in FindObsoleteFiles() |
| FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress()); |
| |
| // delete unnecessary files if any, this is done outside the mutex |
| if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { |
| mutex_.Unlock(); |
| // Have to flush the info logs before bg_compaction_scheduled_-- |
| // because if bg_flush_scheduled_ becomes 0 and the lock is |
| // released, the deconstructor of DB can kick in and destroy all the |
| // states of DB so info_log might not be available after that point. |
| // It also applies to access other states that DB owns. |
| log_buffer.FlushBufferToLog(); |
| if (job_context.HaveSomethingToDelete()) { |
| PurgeObsoleteFiles(job_context); |
| } |
| job_context.Clean(); |
| mutex_.Lock(); |
| } |
| |
| assert(num_running_compactions_ > 0); |
| num_running_compactions_--; |
| if (bg_thread_pri == Env::Priority::LOW) { |
| bg_compaction_scheduled_--; |
| } else { |
| assert(bg_thread_pri == Env::Priority::BOTTOM); |
| bg_bottom_compaction_scheduled_--; |
| } |
| |
| versions_->GetColumnFamilySet()->FreeDeadColumnFamilies(); |
| |
| // See if there's more work to be done |
| MaybeScheduleFlushOrCompaction(); |
| if (made_progress || |
| (bg_compaction_scheduled_ == 0 && |
| bg_bottom_compaction_scheduled_ == 0) || |
| HasPendingManualCompaction()) { |
| // signal if |
| // * made_progress -- need to wakeup DelayWrite |
| // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl |
| // * HasPendingManualCompaction -- need to wakeup RunManualCompaction |
| // If none of this is true, there is no need to signal since nobody is |
| // waiting for it |
| bg_cv_.SignalAll(); |
| } |
| // IMPORTANT: there should be no code after calling SignalAll. This call may |
| // signal the DB destructor that it's OK to proceed with destruction. In |
| // that case, all DB variables will be dealloacated and referencing them |
| // will cause trouble. |
| } |
| } |
| |
| Status DBImpl::BackgroundCompaction(bool* made_progress, |
| JobContext* job_context, |
| LogBuffer* log_buffer, |
| PrepickedCompaction* prepicked_compaction) { |
| ManualCompactionState* manual_compaction = |
| prepicked_compaction == nullptr |
| ? nullptr |
| : prepicked_compaction->manual_compaction_state; |
| *made_progress = false; |
| mutex_.AssertHeld(); |
| TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start"); |
| |
| bool is_manual = (manual_compaction != nullptr); |
| unique_ptr<Compaction> c; |
| if (prepicked_compaction != nullptr && |
| prepicked_compaction->compaction != nullptr) { |
| c.reset(prepicked_compaction->compaction); |
| } |
| bool is_prepicked = is_manual || c; |
| |
| // (manual_compaction->in_progress == false); |
| bool trivial_move_disallowed = |
| is_manual && manual_compaction->disallow_trivial_move; |
| |
| CompactionJobStats compaction_job_stats; |
| Status status = bg_error_; |
| if (status.ok() && shutting_down_.load(std::memory_order_acquire)) { |
| status = Status::ShutdownInProgress(); |
| } |
| |
| if (!status.ok()) { |
| if (is_manual) { |
| manual_compaction->status = status; |
| manual_compaction->done = true; |
| manual_compaction->in_progress = false; |
| manual_compaction = nullptr; |
| } |
| return status; |
| } |
| |
| if (is_manual) { |
| // another thread cannot pick up the same work |
| manual_compaction->in_progress = true; |
| } |
| |
| // InternalKey manual_end_storage; |
| // InternalKey* manual_end = &manual_end_storage; |
| if (is_manual) { |
| ManualCompactionState* m = manual_compaction; |
| assert(m->in_progress); |
| if (!c) { |
| m->done = true; |
| m->manual_end = nullptr; |
| ROCKS_LOG_BUFFER(log_buffer, |
| "[%s] Manual compaction from level-%d from %s .. " |
| "%s; nothing to do\n", |
| m->cfd->GetName().c_str(), m->input_level, |
| (m->begin ? m->begin->DebugString().c_str() : "(begin)"), |
| (m->end ? m->end->DebugString().c_str() : "(end)")); |
| } else { |
| ROCKS_LOG_BUFFER( |
| log_buffer, |
| "[%s] Manual compaction from level-%d to level-%d from %s .. " |
| "%s; will stop at %s\n", |
| m->cfd->GetName().c_str(), m->input_level, c->output_level(), |
| (m->begin ? m->begin->DebugString().c_str() : "(begin)"), |
| (m->end ? m->end->DebugString().c_str() : "(end)"), |
| ((m->done || m->manual_end == nullptr) |
| ? "(end)" |
| : m->manual_end->DebugString().c_str())); |
| } |
| } else if (!is_prepicked && !compaction_queue_.empty()) { |
| if (HaveManualCompaction(compaction_queue_.front())) { |
| // Can't compact right now, but try again later |
| TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict"); |
| |
| // Stay in the compaction queue. |
| unscheduled_compactions_++; |
| |
| return Status::OK(); |
| } |
| |
| // cfd is referenced here |
| auto cfd = PopFirstFromCompactionQueue(); |
| // We unreference here because the following code will take a Ref() on |
| // this cfd if it is going to use it (Compaction class holds a |
| // reference). |
| // This will all happen under a mutex so we don't have to be afraid of |
| // somebody else deleting it. |
| if (cfd->Unref()) { |
| delete cfd; |
| // This was the last reference of the column family, so no need to |
| // compact. |
| return Status::OK(); |
| } |
| |
| // Pick up latest mutable CF Options and use it throughout the |
| // compaction job |
| // Compaction makes a copy of the latest MutableCFOptions. It should be used |
| // throughout the compaction procedure to make sure consistency. It will |
| // eventually be installed into SuperVersion |
| auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); |
| if (!mutable_cf_options->disable_auto_compactions && !cfd->IsDropped()) { |
| // NOTE: try to avoid unnecessary copy of MutableCFOptions if |
| // compaction is not necessary. Need to make sure mutex is held |
| // until we make a copy in the following code |
| TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction"); |
| c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer)); |
| TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction"); |
| if (c != nullptr) { |
| // update statistics |
| MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION, |
| c->inputs(0)->size()); |
| // There are three things that can change compaction score: |
| // 1) When flush or compaction finish. This case is covered by |
| // InstallSuperVersionAndScheduleWork |
| // 2) When MutableCFOptions changes. This case is also covered by |
| // InstallSuperVersionAndScheduleWork, because this is when the new |
| // options take effect. |
| // 3) When we Pick a new compaction, we "remove" those files being |
| // compacted from the calculation, which then influences compaction |
| // score. Here we check if we need the new compaction even without the |
| // files that are currently being compacted. If we need another |
| // compaction, we might be able to execute it in parallel, so we add it |
| // to the queue and schedule a new thread. |
| if (cfd->NeedsCompaction()) { |
| // Yes, we need more compactions! |
| AddToCompactionQueue(cfd); |
| ++unscheduled_compactions_; |
| MaybeScheduleFlushOrCompaction(); |
| } |
| } |
| } |
| } |
| |
| if (!c) { |
| // Nothing to do |
| ROCKS_LOG_BUFFER(log_buffer, "Compaction nothing to do"); |
| } else if (c->deletion_compaction()) { |
| // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old |
| // file if there is alive snapshot pointing to it |
| assert(c->num_input_files(1) == 0); |
| assert(c->level() == 0); |
| assert(c->column_family_data()->ioptions()->compaction_style == |
| kCompactionStyleFIFO); |
| |
| compaction_job_stats.num_input_files = c->num_input_files(0); |
| |
| for (const auto& f : *c->inputs(0)) { |
| c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); |
| } |
| status = versions_->LogAndApply(c->column_family_data(), |
| *c->mutable_cf_options(), c->edit(), |
| &mutex_, directories_.GetDbDir()); |
| InstallSuperVersionAndScheduleWorkWrapper( |
| c->column_family_data(), job_context, *c->mutable_cf_options()); |
| ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n", |
| c->column_family_data()->GetName().c_str(), |
| c->num_input_files(0)); |
| *made_progress = true; |
| } else if (!trivial_move_disallowed && c->IsTrivialMove()) { |
| TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove"); |
| // Instrument for event update |
| // TODO(yhchiang): add op details for showing trivial-move. |
| ThreadStatusUtil::SetColumnFamily( |
| c->column_family_data(), c->column_family_data()->ioptions()->env, |
| immutable_db_options_.enable_thread_tracking); |
| ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION); |
| |
| compaction_job_stats.num_input_files = c->num_input_files(0); |
| |
| // Move files to next level |
| int32_t moved_files = 0; |
| int64_t moved_bytes = 0; |
| for (unsigned int l = 0; l < c->num_input_levels(); l++) { |
| if (c->level(l) == c->output_level()) { |
| continue; |
| } |
| for (size_t i = 0; i < c->num_input_files(l); i++) { |
| FileMetaData* f = c->input(l, i); |
| c->edit()->DeleteFile(c->level(l), f->fd.GetNumber()); |
| c->edit()->AddFile(c->output_level(), f->fd.GetNumber(), |
| f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, |
| f->largest, f->smallest_seqno, f->largest_seqno, |
| f->marked_for_compaction); |
| |
| ROCKS_LOG_BUFFER(log_buffer, "[%s] Moving #%" PRIu64 |
| " to level-%d %" PRIu64 " bytes\n", |
| c->column_family_data()->GetName().c_str(), |
| f->fd.GetNumber(), c->output_level(), |
| f->fd.GetFileSize()); |
| ++moved_files; |
| moved_bytes += f->fd.GetFileSize(); |
| } |
| } |
| |
| status = versions_->LogAndApply(c->column_family_data(), |
| *c->mutable_cf_options(), c->edit(), |
| &mutex_, directories_.GetDbDir()); |
| // Use latest MutableCFOptions |
| InstallSuperVersionAndScheduleWorkWrapper( |
| c->column_family_data(), job_context, *c->mutable_cf_options()); |
| |
| VersionStorageInfo::LevelSummaryStorage tmp; |
| c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(), |
| moved_bytes); |
| { |
| event_logger_.LogToBuffer(log_buffer) |
| << "job" << job_context->job_id << "event" |
| << "trivial_move" |
| << "destination_level" << c->output_level() << "files" << moved_files |
| << "total_files_size" << moved_bytes; |
| } |
| ROCKS_LOG_BUFFER( |
| log_buffer, |
| "[%s] Moved #%d files to level-%d %" PRIu64 " bytes %s: %s\n", |
| c->column_family_data()->GetName().c_str(), moved_files, |
| c->output_level(), moved_bytes, status.ToString().c_str(), |
| c->column_family_data()->current()->storage_info()->LevelSummary(&tmp)); |
| *made_progress = true; |
| |
| // Clear Instrument |
| ThreadStatusUtil::ResetThreadStatus(); |
| } else if (c->column_family_data()->ioptions()->compaction_style == |
| kCompactionStyleUniversal && |
| !is_prepicked && c->output_level() > 0 && |
| c->output_level() == |
| c->column_family_data() |
| ->current() |
| ->storage_info() |
| ->MaxOutputLevel( |
| immutable_db_options_.allow_ingest_behind) && |
| env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) { |
| // Forward universal compactions involving last level to the bottom pool |
| // if it exists, such that long-running compactions can't block short- |
| // lived ones, like L0->L0s. |
| TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool"); |
| CompactionArg* ca = new CompactionArg; |
| ca->db = this; |
| ca->prepicked_compaction = new PrepickedCompaction; |
| ca->prepicked_compaction->compaction = c.release(); |
| ca->prepicked_compaction->manual_compaction_state = nullptr; |
| ++bg_bottom_compaction_scheduled_; |
| env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM, |
| this, &DBImpl::UnscheduleCallback); |
| } else { |
| int output_level __attribute__((unused)) = c->output_level(); |
| TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial", |
| &output_level); |
| |
| SequenceNumber earliest_write_conflict_snapshot; |
| std::vector<SequenceNumber> snapshot_seqs = |
| snapshots_.GetAll(&earliest_write_conflict_snapshot); |
| |
| assert(is_snapshot_supported_ || snapshots_.empty()); |
| CompactionJob compaction_job( |
| job_context->job_id, c.get(), immutable_db_options_, env_options_, |
| versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), |
| directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, |
| &bg_error_, snapshot_seqs, earliest_write_conflict_snapshot, |
| table_cache_, &event_logger_, |
| c->mutable_cf_options()->paranoid_file_checks, |
| c->mutable_cf_options()->report_bg_io_stats, dbname_, |
| &compaction_job_stats); |
| compaction_job.Prepare(); |
| |
| mutex_.Unlock(); |
| compaction_job.Run(); |
| TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun"); |
| mutex_.Lock(); |
| |
| status = compaction_job.Install(*c->mutable_cf_options()); |
| if (status.ok()) { |
| InstallSuperVersionAndScheduleWorkWrapper( |
| c->column_family_data(), job_context, *c->mutable_cf_options()); |
| } |
| *made_progress = true; |
| } |
| if (c != nullptr) { |
| c->ReleaseCompactionFiles(status); |
| *made_progress = true; |
| NotifyOnCompactionCompleted( |
| c->column_family_data(), c.get(), status, |
| compaction_job_stats, job_context->job_id); |
| } |
| // this will unref its input_version and column_family_data |
| c.reset(); |
| |
| if (status.ok()) { |
| // Done |
| } else if (status.IsShutdownInProgress()) { |
| // Ignore compaction errors found during shutting down |
| } else { |
| ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s", |
| status.ToString().c_str()); |
| if (immutable_db_options_.paranoid_checks && bg_error_.ok()) { |
| Status new_bg_error = status; |
| // may temporarily unlock and lock the mutex. |
| EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners, |
| BackgroundErrorReason::kCompaction, |
| &new_bg_error, &mutex_); |
| if (!new_bg_error.ok()) { |
| bg_error_ = new_bg_error; |
| } |
| } |
| } |
| |
| if (is_manual) { |
| ManualCompactionState* m = manual_compaction; |
| if (!status.ok()) { |
| m->status = status; |
| m->done = true; |
| } |
| // For universal compaction: |
| // Because universal compaction always happens at level 0, so one |
| // compaction will pick up all overlapped files. No files will be |
| // filtered out due to size limit and left for a successive compaction. |
| // So we can safely conclude the current compaction. |
| // |
| // Also note that, if we don't stop here, then the current compaction |
| // writes a new file back to level 0, which will be used in successive |
| // compaction. Hence the manual compaction will never finish. |
| // |
| // Stop the compaction if manual_end points to nullptr -- this means |
| // that we compacted the whole range. manual_end should always point |
| // to nullptr in case of universal compaction |
| if (m->manual_end == nullptr) { |
| m->done = true; |
| } |
| if (!m->done) { |
| // We only compacted part of the requested range. Update *m |
| // to the range that is left to be compacted. |
| // Universal and FIFO compactions should always compact the whole range |
| assert(m->cfd->ioptions()->compaction_style != |
| kCompactionStyleUniversal || |
| m->cfd->ioptions()->num_levels > 1); |
| assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO); |
| m->tmp_storage = *m->manual_end; |
| m->begin = &m->tmp_storage; |
| m->incomplete = true; |
| } |
| m->in_progress = false; // not being processed anymore |
| } |
| TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish"); |
| return status; |
| } |
| |
| bool DBImpl::HasPendingManualCompaction() { |
| return (!manual_compaction_dequeue_.empty()); |
| } |
| |
| void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) { |
| manual_compaction_dequeue_.push_back(m); |
| } |
| |
| void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) { |
| // Remove from queue |
| std::deque<ManualCompactionState*>::iterator it = |
| manual_compaction_dequeue_.begin(); |
| while (it != manual_compaction_dequeue_.end()) { |
| if (m == (*it)) { |
| it = manual_compaction_dequeue_.erase(it); |
| return; |
| } |
| it++; |
| } |
| assert(false); |
| return; |
| } |
| |
| bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) { |
| if (num_running_ingest_file_ > 0) { |
| // We need to wait for other IngestExternalFile() calls to finish |
| // before running a manual compaction. |
| return true; |
| } |
| if (m->exclusive) { |
| return (bg_bottom_compaction_scheduled_ > 0 || |
| bg_compaction_scheduled_ > 0); |
| } |
| std::deque<ManualCompactionState*>::iterator it = |
| manual_compaction_dequeue_.begin(); |
| bool seen = false; |
| while (it != manual_compaction_dequeue_.end()) { |
| if (m == (*it)) { |
| it++; |
| seen = true; |
| continue; |
| } else if (MCOverlap(m, (*it)) && (!seen && !(*it)->in_progress)) { |
| // Consider the other manual compaction *it, conflicts if: |
| // overlaps with m |
| // and (*it) is ahead in the queue and is not yet in progress |
| return true; |
| } |
| it++; |
| } |
| return false; |
| } |
| |
| bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) { |
| // Remove from priority queue |
| std::deque<ManualCompactionState*>::iterator it = |
| manual_compaction_dequeue_.begin(); |
| while (it != manual_compaction_dequeue_.end()) { |
| if ((*it)->exclusive) { |
| return true; |
| } |
| if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) { |
| // Allow automatic compaction if manual compaction is |
| // in progress |
| return true; |
| } |
| it++; |
| } |
| return false; |
| } |
| |
| bool DBImpl::HasExclusiveManualCompaction() { |
| // Remove from priority queue |
| std::deque<ManualCompactionState*>::iterator it = |
| manual_compaction_dequeue_.begin(); |
| while (it != manual_compaction_dequeue_.end()) { |
| if ((*it)->exclusive) { |
| return true; |
| } |
| it++; |
| } |
| return false; |
| } |
| |
| bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) { |
| if ((m->exclusive) || (m1->exclusive)) { |
| return true; |
| } |
| if (m->cfd != m1->cfd) { |
| return false; |
| } |
| return true; |
| } |
| |
| // JobContext gets created and destructed outside of the lock -- |
| // we |
| // use this convinently to: |
| // * malloc one SuperVersion() outside of the lock -- new_superversion |
| // * delete SuperVersion()s outside of the lock -- superversions_to_free |
| // |
| // However, if InstallSuperVersionAndScheduleWork() gets called twice with the |
| // same job_context, we can't reuse the SuperVersion() that got |
| // malloced because |
| // first call already used it. In that rare case, we take a hit and create a |
| // new SuperVersion() inside of the mutex. We do similar thing |
| // for superversion_to_free |
| void DBImpl::InstallSuperVersionAndScheduleWorkWrapper( |
| ColumnFamilyData* cfd, JobContext* job_context, |
| const MutableCFOptions& mutable_cf_options) { |
| mutex_.AssertHeld(); |
| SuperVersion* old_superversion = InstallSuperVersionAndScheduleWork( |
| cfd, job_context->new_superversion, mutable_cf_options); |
| job_context->new_superversion = nullptr; |
| job_context->superversions_to_free.push_back(old_superversion); |
| } |
| |
| SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork( |
| ColumnFamilyData* cfd, SuperVersion* new_sv, |
| const MutableCFOptions& mutable_cf_options) { |
| mutex_.AssertHeld(); |
| |
| // Update max_total_in_memory_state_ |
| size_t old_memtable_size = 0; |
| auto* old_sv = cfd->GetSuperVersion(); |
| if (old_sv) { |
| old_memtable_size = old_sv->mutable_cf_options.write_buffer_size * |
| old_sv->mutable_cf_options.max_write_buffer_number; |
| } |
| |
| auto* old = cfd->InstallSuperVersion( |
| new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options); |
| |
| // Whenever we install new SuperVersion, we might need to issue new flushes or |
| // compactions. |
| SchedulePendingFlush(cfd); |
| SchedulePendingCompaction(cfd); |
| MaybeScheduleFlushOrCompaction(); |
| |
| // Update max_total_in_memory_state_ |
| max_total_in_memory_state_ = |
| max_total_in_memory_state_ - old_memtable_size + |
| mutable_cf_options.write_buffer_size * |
| mutable_cf_options.max_write_buffer_number; |
| return old; |
| } |
| } // namespace rocksdb |