| // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| // |
| // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. See the AUTHORS file for names of contributors. |
| |
| #include "db/flush_job.h" |
| |
| #ifndef __STDC_FORMAT_MACROS |
| #define __STDC_FORMAT_MACROS |
| #endif |
| |
| #include <inttypes.h> |
| |
| #include <algorithm> |
| #include <vector> |
| |
| #include "db/builder.h" |
| #include "db/db_iter.h" |
| #include "db/dbformat.h" |
| #include "db/event_helpers.h" |
| #include "db/log_reader.h" |
| #include "db/log_writer.h" |
| #include "db/memtable_list.h" |
| #include "db/merge_context.h" |
| #include "db/version_set.h" |
| #include "monitoring/iostats_context_imp.h" |
| #include "monitoring/perf_context_imp.h" |
| #include "monitoring/thread_status_util.h" |
| #include "port/likely.h" |
| #include "port/port.h" |
| #include "db/memtable.h" |
| #include "rocksdb/db.h" |
| #include "rocksdb/env.h" |
| #include "rocksdb/statistics.h" |
| #include "rocksdb/status.h" |
| #include "rocksdb/table.h" |
| #include "table/block.h" |
| #include "table/block_based_table_factory.h" |
| #include "table/merging_iterator.h" |
| #include "table/table_builder.h" |
| #include "table/two_level_iterator.h" |
| #include "util/coding.h" |
| #include "util/event_logger.h" |
| #include "util/file_util.h" |
| #include "util/filename.h" |
| #include "util/log_buffer.h" |
| #include "util/logging.h" |
| #include "util/mutexlock.h" |
| #include "util/stop_watch.h" |
| #include "util/sync_point.h" |
| |
| namespace rocksdb { |
| |
| FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, |
| const ImmutableDBOptions& db_options, |
| const MutableCFOptions& mutable_cf_options, |
| const EnvOptions& env_options, VersionSet* versions, |
| InstrumentedMutex* db_mutex, |
| std::atomic<bool>* shutting_down, |
| std::vector<SequenceNumber> existing_snapshots, |
| SequenceNumber earliest_write_conflict_snapshot, |
| JobContext* job_context, LogBuffer* log_buffer, |
| Directory* db_directory, Directory* output_file_directory, |
| CompressionType output_compression, Statistics* stats, |
| EventLogger* event_logger, bool measure_io_stats) |
| : dbname_(dbname), |
| cfd_(cfd), |
| db_options_(db_options), |
| mutable_cf_options_(mutable_cf_options), |
| env_options_(env_options), |
| versions_(versions), |
| db_mutex_(db_mutex), |
| shutting_down_(shutting_down), |
| existing_snapshots_(std::move(existing_snapshots)), |
| earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), |
| job_context_(job_context), |
| log_buffer_(log_buffer), |
| db_directory_(db_directory), |
| output_file_directory_(output_file_directory), |
| output_compression_(output_compression), |
| stats_(stats), |
| event_logger_(event_logger), |
| measure_io_stats_(measure_io_stats), |
| pick_memtable_called(false) { |
| // Update the thread status to indicate flush. |
| ReportStartedFlush(); |
| TEST_SYNC_POINT("FlushJob::FlushJob()"); |
| } |
| |
| FlushJob::~FlushJob() { |
| ThreadStatusUtil::ResetThreadStatus(); |
| } |
| |
| void FlushJob::ReportStartedFlush() { |
| ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env, |
| db_options_.enable_thread_tracking); |
| ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH); |
| ThreadStatusUtil::SetThreadOperationProperty( |
| ThreadStatus::COMPACTION_JOB_ID, |
| job_context_->job_id); |
| IOSTATS_RESET(bytes_written); |
| } |
| |
| void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) { |
| uint64_t input_size = 0; |
| for (auto* mem : mems) { |
| input_size += mem->ApproximateMemoryUsage(); |
| } |
| ThreadStatusUtil::IncreaseThreadOperationProperty( |
| ThreadStatus::FLUSH_BYTES_MEMTABLES, |
| input_size); |
| } |
| |
| void FlushJob::RecordFlushIOStats() { |
| RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written)); |
| ThreadStatusUtil::IncreaseThreadOperationProperty( |
| ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written)); |
| IOSTATS_RESET(bytes_written); |
| } |
| |
| void FlushJob::PickMemTable() { |
| db_mutex_->AssertHeld(); |
| assert(!pick_memtable_called); |
| pick_memtable_called = true; |
| // Save the contents of the earliest memtable as a new Table |
| cfd_->imm()->PickMemtablesToFlush(&mems_); |
| if (mems_.empty()) { |
| return; |
| } |
| |
| ReportFlushInputSize(mems_); |
| |
| // entries mems are (implicitly) sorted in ascending order by their created |
| // time. We will use the first memtable's `edit` to keep the meta info for |
| // this flush. |
| MemTable* m = mems_[0]; |
| edit_ = m->GetEdits(); |
| edit_->SetPrevLogNumber(0); |
| // SetLogNumber(log_num) indicates logs with number smaller than log_num |
| // will no longer be picked up for recovery. |
| edit_->SetLogNumber(mems_.back()->GetNextLogNumber()); |
| edit_->SetColumnFamily(cfd_->GetID()); |
| |
| // path 0 for level 0 file. |
| meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); |
| |
| base_ = cfd_->current(); |
| base_->Ref(); // it is likely that we do not need this reference |
| } |
| |
| Status FlushJob::Run(FileMetaData* file_meta) { |
| db_mutex_->AssertHeld(); |
| assert(pick_memtable_called); |
| AutoThreadOperationStageUpdater stage_run( |
| ThreadStatus::STAGE_FLUSH_RUN); |
| if (mems_.empty()) { |
| ROCKS_LOG_BUFFER(log_buffer_, "[%s] Nothing in memtable to flush", |
| cfd_->GetName().c_str()); |
| return Status::OK(); |
| } |
| |
| // I/O measurement variables |
| PerfLevel prev_perf_level = PerfLevel::kEnableTime; |
| uint64_t prev_write_nanos = 0; |
| uint64_t prev_fsync_nanos = 0; |
| uint64_t prev_range_sync_nanos = 0; |
| uint64_t prev_prepare_write_nanos = 0; |
| if (measure_io_stats_) { |
| prev_perf_level = GetPerfLevel(); |
| SetPerfLevel(PerfLevel::kEnableTime); |
| prev_write_nanos = IOSTATS(write_nanos); |
| prev_fsync_nanos = IOSTATS(fsync_nanos); |
| prev_range_sync_nanos = IOSTATS(range_sync_nanos); |
| prev_prepare_write_nanos = IOSTATS(prepare_write_nanos); |
| } |
| |
| // This will release and re-acquire the mutex. |
| Status s = WriteLevel0Table(); |
| |
| if (s.ok() && |
| (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) { |
| s = Status::ShutdownInProgress( |
| "Database shutdown or Column family drop during flush"); |
| } |
| |
| if (!s.ok()) { |
| cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber()); |
| } else { |
| TEST_SYNC_POINT("FlushJob::InstallResults"); |
| // Replace immutable memtable with the generated Table |
| s = cfd_->imm()->InstallMemtableFlushResults( |
| cfd_, mutable_cf_options_, mems_, versions_, db_mutex_, |
| meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, |
| log_buffer_); |
| } |
| |
| if (s.ok() && file_meta != nullptr) { |
| *file_meta = meta_; |
| } |
| RecordFlushIOStats(); |
| |
| auto stream = event_logger_->LogToBuffer(log_buffer_); |
| stream << "job" << job_context_->job_id << "event" |
| << "flush_finished"; |
| stream << "lsm_state"; |
| stream.StartArray(); |
| auto vstorage = cfd_->current()->storage_info(); |
| for (int level = 0; level < vstorage->num_levels(); ++level) { |
| stream << vstorage->NumLevelFiles(level); |
| } |
| stream.EndArray(); |
| stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed(); |
| |
| if (measure_io_stats_) { |
| if (prev_perf_level != PerfLevel::kEnableTime) { |
| SetPerfLevel(prev_perf_level); |
| } |
| stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos); |
| stream << "file_range_sync_nanos" |
| << (IOSTATS(range_sync_nanos) - prev_range_sync_nanos); |
| stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos); |
| stream << "file_prepare_write_nanos" |
| << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos); |
| } |
| |
| return s; |
| } |
| |
| void FlushJob::Cancel() { |
| db_mutex_->AssertHeld(); |
| assert(base_ != nullptr); |
| base_->Unref(); |
| } |
| |
| Status FlushJob::WriteLevel0Table() { |
| AutoThreadOperationStageUpdater stage_updater( |
| ThreadStatus::STAGE_FLUSH_WRITE_L0); |
| db_mutex_->AssertHeld(); |
| const uint64_t start_micros = db_options_.env->NowMicros(); |
| Status s; |
| { |
| db_mutex_->Unlock(); |
| if (log_buffer_) { |
| log_buffer_->FlushBufferToLog(); |
| } |
| // memtables and range_del_iters store internal iterators over each data |
| // memtable and its associated range deletion memtable, respectively, at |
| // corresponding indexes. |
| std::vector<InternalIterator*> memtables; |
| std::vector<InternalIterator*> range_del_iters; |
| ReadOptions ro; |
| ro.total_order_seek = true; |
| Arena arena; |
| uint64_t total_num_entries = 0, total_num_deletes = 0; |
| size_t total_memory_usage = 0; |
| for (MemTable* m : mems_) { |
| ROCKS_LOG_INFO( |
| db_options_.info_log, |
| "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n", |
| cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber()); |
| memtables.push_back(m->NewIterator(ro, &arena)); |
| auto* range_del_iter = m->NewRangeTombstoneIterator(ro); |
| if (range_del_iter != nullptr) { |
| range_del_iters.push_back(range_del_iter); |
| } |
| total_num_entries += m->num_entries(); |
| total_num_deletes += m->num_deletes(); |
| total_memory_usage += m->ApproximateMemoryUsage(); |
| } |
| |
| event_logger_->Log() << "job" << job_context_->job_id << "event" |
| << "flush_started" |
| << "num_memtables" << mems_.size() << "num_entries" |
| << total_num_entries << "num_deletes" |
| << total_num_deletes << "memory_usage" |
| << total_memory_usage; |
| |
| { |
| ScopedArenaIterator iter( |
| NewMergingIterator(&cfd_->internal_comparator(), &memtables[0], |
| static_cast<int>(memtables.size()), &arena)); |
| std::unique_ptr<InternalIterator> range_del_iter(NewMergingIterator( |
| &cfd_->internal_comparator(), |
| range_del_iters.empty() ? nullptr : &range_del_iters[0], |
| static_cast<int>(range_del_iters.size()))); |
| ROCKS_LOG_INFO(db_options_.info_log, |
| "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started", |
| cfd_->GetName().c_str(), job_context_->job_id, |
| meta_.fd.GetNumber()); |
| |
| TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression", |
| &output_compression_); |
| EnvOptions optimized_env_options = |
| db_options_.env->OptimizeForCompactionTableWrite(env_options_, db_options_); |
| |
| int64_t _current_time = 0; |
| db_options_.env->GetCurrentTime(&_current_time); // ignore error |
| const uint64_t current_time = static_cast<uint64_t>(_current_time); |
| |
| uint64_t oldest_key_time = mems_.front()->ApproximateOldestKeyTime(); |
| |
| s = BuildTable( |
| dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_, |
| optimized_env_options, cfd_->table_cache(), iter.get(), |
| std::move(range_del_iter), &meta_, cfd_->internal_comparator(), |
| cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), |
| cfd_->GetName(), existing_snapshots_, |
| earliest_write_conflict_snapshot_, output_compression_, |
| cfd_->ioptions()->compression_opts, |
| mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), |
| TableFileCreationReason::kFlush, event_logger_, job_context_->job_id, |
| Env::IO_HIGH, &table_properties_, 0 /* level */, current_time, |
| oldest_key_time); |
| LogFlush(db_options_.info_log); |
| } |
| ROCKS_LOG_INFO(db_options_.info_log, |
| "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64 |
| " bytes %s" |
| "%s", |
| cfd_->GetName().c_str(), job_context_->job_id, |
| meta_.fd.GetNumber(), meta_.fd.GetFileSize(), |
| s.ToString().c_str(), |
| meta_.marked_for_compaction ? " (needs compaction)" : ""); |
| |
| if (output_file_directory_ != nullptr) { |
| output_file_directory_->Fsync(); |
| } |
| TEST_SYNC_POINT("FlushJob::WriteLevel0Table"); |
| db_mutex_->Lock(); |
| } |
| base_->Unref(); |
| |
| // Note that if file_size is zero, the file has been deleted and |
| // should not be added to the manifest. |
| if (s.ok() && meta_.fd.GetFileSize() > 0) { |
| // if we have more than 1 background thread, then we cannot |
| // insert files directly into higher levels because some other |
| // threads could be concurrently producing compacted files for |
| // that key range. |
| // Add file to L0 |
| edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(), |
| meta_.fd.GetFileSize(), meta_.smallest, meta_.largest, |
| meta_.smallest_seqno, meta_.largest_seqno, |
| meta_.marked_for_compaction); |
| } |
| |
| // Note that here we treat flush as level 0 compaction in internal stats |
| InternalStats::CompactionStats stats(1); |
| stats.micros = db_options_.env->NowMicros() - start_micros; |
| stats.bytes_written = meta_.fd.GetFileSize(); |
| cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats); |
| cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, |
| meta_.fd.GetFileSize()); |
| RecordFlushIOStats(); |
| return s; |
| } |
| |
| } // namespace rocksdb |