| // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| // |
| // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. See the AUTHORS file for names of contributors. |
| #include "db/db_impl.h" |
| |
| #ifndef __STDC_FORMAT_MACROS |
| #define __STDC_FORMAT_MACROS |
| #endif |
| #include <inttypes.h> |
| #include <stdint.h> |
| #ifdef OS_SOLARIS |
| #include <alloca.h> |
| #endif |
| |
| #include <algorithm> |
| #include <climits> |
| #include <cstdio> |
| #include <map> |
| #include <set> |
| #include <stdexcept> |
| #include <string> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include "db/builder.h" |
| #include "db/compaction_job.h" |
| #include "db/db_info_dumper.h" |
| #include "db/db_iter.h" |
| #include "db/dbformat.h" |
| #include "db/event_helpers.h" |
| #include "db/external_sst_file_ingestion_job.h" |
| #include "db/flush_job.h" |
| #include "db/forward_iterator.h" |
| #include "db/job_context.h" |
| #include "db/log_reader.h" |
| #include "db/log_writer.h" |
| #include "db/malloc_stats.h" |
| #include "db/managed_iterator.h" |
| #include "db/memtable.h" |
| #include "db/memtable_list.h" |
| #include "db/merge_context.h" |
| #include "db/merge_helper.h" |
| #include "db/range_del_aggregator.h" |
| #include "db/table_cache.h" |
| #include "db/table_properties_collector.h" |
| #include "db/transaction_log_impl.h" |
| #include "db/version_set.h" |
| #include "db/write_batch_internal.h" |
| #include "db/write_callback.h" |
| #include "memtable/hash_linklist_rep.h" |
| #include "memtable/hash_skiplist_rep.h" |
| #include "monitoring/iostats_context_imp.h" |
| #include "monitoring/perf_context_imp.h" |
| #include "monitoring/thread_status_updater.h" |
| #include "monitoring/thread_status_util.h" |
| #include "options/cf_options.h" |
| #include "options/options_helper.h" |
| #include "options/options_parser.h" |
| #include "port/likely.h" |
| #include "port/port.h" |
| #include "rocksdb/cache.h" |
| #include "rocksdb/compaction_filter.h" |
| #include "rocksdb/convenience.h" |
| #include "rocksdb/db.h" |
| #include "rocksdb/env.h" |
| #include "rocksdb/merge_operator.h" |
| #include "rocksdb/statistics.h" |
| #include "rocksdb/status.h" |
| #include "rocksdb/table.h" |
| #include "rocksdb/version.h" |
| #include "rocksdb/write_buffer_manager.h" |
| #include "table/block.h" |
| #include "table/block_based_table_factory.h" |
| #include "table/merging_iterator.h" |
| #include "table/table_builder.h" |
| #include "table/two_level_iterator.h" |
| #include "tools/sst_dump_tool_imp.h" |
| #include "util/auto_roll_logger.h" |
| #include "util/autovector.h" |
| #include "util/build_version.h" |
| #include "util/coding.h" |
| #include "util/compression.h" |
| #include "util/crc32c.h" |
| #include "util/file_reader_writer.h" |
| #include "util/file_util.h" |
| #include "util/filename.h" |
| #include "util/log_buffer.h" |
| #include "util/logging.h" |
| #include "util/mutexlock.h" |
| #include "util/sst_file_manager_impl.h" |
| #include "util/stop_watch.h" |
| #include "util/string_util.h" |
| #include "util/sync_point.h" |
| |
| namespace rocksdb { |
| const std::string kDefaultColumnFamilyName("default"); |
| void DumpRocksDBBuildVersion(Logger * log); |
| |
| CompressionType GetCompressionFlush( |
| const ImmutableCFOptions& ioptions, |
| const MutableCFOptions& mutable_cf_options) { |
| // Compressing memtable flushes might not help unless the sequential load |
| // optimization is used for leveled compaction. Otherwise the CPU and |
| // latency overhead is not offset by saving much space. |
| if (ioptions.compaction_style == kCompactionStyleUniversal) { |
| if (ioptions.compaction_options_universal.compression_size_percent < 0) { |
| return mutable_cf_options.compression; |
| } else { |
| return kNoCompression; |
| } |
| } else if (!ioptions.compression_per_level.empty()) { |
| // For leveled compress when min_level_to_compress != 0. |
| return ioptions.compression_per_level[0]; |
| } else { |
| return mutable_cf_options.compression; |
| } |
| } |
| |
| namespace { |
| void DumpSupportInfo(Logger* logger) { |
| ROCKS_LOG_HEADER(logger, "Compression algorithms supported:"); |
| ROCKS_LOG_HEADER(logger, "\tSnappy supported: %d", Snappy_Supported()); |
| ROCKS_LOG_HEADER(logger, "\tZlib supported: %d", Zlib_Supported()); |
| ROCKS_LOG_HEADER(logger, "\tBzip supported: %d", BZip2_Supported()); |
| ROCKS_LOG_HEADER(logger, "\tLZ4 supported: %d", LZ4_Supported()); |
| ROCKS_LOG_HEADER(logger, "\tZSTD supported: %d", ZSTD_Supported()); |
| ROCKS_LOG_HEADER(logger, "Fast CRC32 supported: %d", |
| crc32c::IsFastCrc32Supported()); |
| } |
| |
| int64_t kDefaultLowPriThrottledRate = 2 * 1024 * 1024; |
| } // namespace |
| |
| DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) |
| : env_(options.env), |
| dbname_(dbname), |
| initial_db_options_(SanitizeOptions(dbname, options)), |
| immutable_db_options_(initial_db_options_), |
| mutable_db_options_(initial_db_options_), |
| stats_(immutable_db_options_.statistics.get()), |
| db_lock_(nullptr), |
| mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS, |
| immutable_db_options_.use_adaptive_mutex), |
| shutting_down_(false), |
| bg_cv_(&mutex_), |
| logfile_number_(0), |
| log_dir_synced_(false), |
| log_empty_(true), |
| default_cf_handle_(nullptr), |
| log_sync_cv_(&mutex_), |
| total_log_size_(0), |
| max_total_in_memory_state_(0), |
| is_snapshot_supported_(true), |
| write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()), |
| write_thread_(immutable_db_options_), |
| nonmem_write_thread_(immutable_db_options_), |
| write_controller_(mutable_db_options_.delayed_write_rate), |
| // Use delayed_write_rate as a base line to determine the initial |
| // low pri write rate limit. It may be adjusted later. |
| low_pri_write_rate_limiter_(NewGenericRateLimiter(std::min( |
| static_cast<int64_t>(mutable_db_options_.delayed_write_rate / 8), |
| kDefaultLowPriThrottledRate))), |
| last_batch_group_size_(0), |
| unscheduled_flushes_(0), |
| unscheduled_compactions_(0), |
| bg_bottom_compaction_scheduled_(0), |
| bg_compaction_scheduled_(0), |
| num_running_compactions_(0), |
| bg_flush_scheduled_(0), |
| num_running_flushes_(0), |
| bg_purge_scheduled_(0), |
| disable_delete_obsolete_files_(0), |
| delete_obsolete_files_last_run_(env_->NowMicros()), |
| last_stats_dump_time_microsec_(0), |
| next_job_id_(1), |
| has_unpersisted_data_(false), |
| unable_to_flush_oldest_log_(false), |
| env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)), |
| num_running_ingest_file_(0), |
| #ifndef ROCKSDB_LITE |
| wal_manager_(immutable_db_options_, env_options_), |
| #endif // ROCKSDB_LITE |
| event_logger_(immutable_db_options_.info_log.get()), |
| bg_work_paused_(0), |
| bg_compaction_paused_(0), |
| refitting_level_(false), |
| opened_successfully_(false), |
| concurrent_prepare_(options.concurrent_prepare), |
| manual_wal_flush_(options.manual_wal_flush) { |
| env_->GetAbsolutePath(dbname, &db_absolute_path_); |
| |
| // Reserve ten files or so for other uses and give the rest to TableCache. |
| // Give a large number for setting of "infinite" open files. |
| const int table_cache_size = (mutable_db_options_.max_open_files == -1) |
| ? TableCache::kInfiniteCapacity |
| : mutable_db_options_.max_open_files - 10; |
| table_cache_ = NewLRUCache(table_cache_size, |
| immutable_db_options_.table_cache_numshardbits); |
| |
| versions_.reset(new VersionSet(dbname_, &immutable_db_options_, env_options_, |
| table_cache_.get(), write_buffer_manager_, |
| &write_controller_)); |
| column_family_memtables_.reset( |
| new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet())); |
| |
| DumpRocksDBBuildVersion(immutable_db_options_.info_log.get()); |
| DumpDBFileSummary(immutable_db_options_, dbname_); |
| immutable_db_options_.Dump(immutable_db_options_.info_log.get()); |
| mutable_db_options_.Dump(immutable_db_options_.info_log.get()); |
| DumpSupportInfo(immutable_db_options_.info_log.get()); |
| } |
| |
| // Will lock the mutex_, will wait for completion if wait is true |
| void DBImpl::CancelAllBackgroundWork(bool wait) { |
| InstrumentedMutexLock l(&mutex_); |
| |
| ROCKS_LOG_INFO(immutable_db_options_.info_log, |
| "Shutdown: canceling all background work"); |
| |
| if (!shutting_down_.load(std::memory_order_acquire) && |
| has_unpersisted_data_.load(std::memory_order_relaxed) && |
| !mutable_db_options_.avoid_flush_during_shutdown) { |
| for (auto cfd : *versions_->GetColumnFamilySet()) { |
| if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) { |
| cfd->Ref(); |
| mutex_.Unlock(); |
| FlushMemTable(cfd, FlushOptions()); |
| mutex_.Lock(); |
| cfd->Unref(); |
| } |
| } |
| versions_->GetColumnFamilySet()->FreeDeadColumnFamilies(); |
| } |
| |
| shutting_down_.store(true, std::memory_order_release); |
| bg_cv_.SignalAll(); |
| if (!wait) { |
| return; |
| } |
| // Wait for background work to finish |
| while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || |
| bg_flush_scheduled_) { |
| bg_cv_.Wait(); |
| } |
| } |
| |
| DBImpl::~DBImpl() { |
| // CancelAllBackgroundWork called with false means we just set the shutdown |
| // marker. After this we do a variant of the waiting and unschedule work |
| // (to consider: moving all the waiting into CancelAllBackgroundWork(true)) |
| CancelAllBackgroundWork(false); |
| int bottom_compactions_unscheduled = |
| env_->UnSchedule(this, Env::Priority::BOTTOM); |
| int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW); |
| int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH); |
| mutex_.Lock(); |
| bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled; |
| bg_compaction_scheduled_ -= compactions_unscheduled; |
| bg_flush_scheduled_ -= flushes_unscheduled; |
| |
| // Wait for background work to finish |
| while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || |
| bg_flush_scheduled_ || bg_purge_scheduled_) { |
| TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob"); |
| bg_cv_.Wait(); |
| } |
| EraseThreadStatusDbInfo(); |
| flush_scheduler_.Clear(); |
| |
| while (!flush_queue_.empty()) { |
| auto cfd = PopFirstFromFlushQueue(); |
| if (cfd->Unref()) { |
| delete cfd; |
| } |
| } |
| while (!compaction_queue_.empty()) { |
| auto cfd = PopFirstFromCompactionQueue(); |
| if (cfd->Unref()) { |
| delete cfd; |
| } |
| } |
| |
| if (default_cf_handle_ != nullptr) { |
| // we need to delete handle outside of lock because it does its own locking |
| mutex_.Unlock(); |
| delete default_cf_handle_; |
| mutex_.Lock(); |
| } |
| |
| // Clean up obsolete files due to SuperVersion release. |
| // (1) Need to delete to obsolete files before closing because RepairDB() |
| // scans all existing files in the file system and builds manifest file. |
| // Keeping obsolete files confuses the repair process. |
| // (2) Need to check if we Open()/Recover() the DB successfully before |
| // deleting because if VersionSet recover fails (may be due to corrupted |
| // manifest file), it is not able to identify live files correctly. As a |
| // result, all "live" files can get deleted by accident. However, corrupted |
| // manifest is recoverable by RepairDB(). |
| if (opened_successfully_) { |
| JobContext job_context(next_job_id_.fetch_add(1)); |
| FindObsoleteFiles(&job_context, true); |
| |
| mutex_.Unlock(); |
| // manifest number starting from 2 |
| job_context.manifest_file_number = 1; |
| if (job_context.HaveSomethingToDelete()) { |
| PurgeObsoleteFiles(job_context); |
| } |
| job_context.Clean(); |
| mutex_.Lock(); |
| } |
| |
| for (auto l : logs_to_free_) { |
| delete l; |
| } |
| for (auto& log : logs_) { |
| log.ClearWriter(); |
| } |
| logs_.clear(); |
| |
| // Table cache may have table handles holding blocks from the block cache. |
| // We need to release them before the block cache is destroyed. The block |
| // cache may be destroyed inside versions_.reset(), when column family data |
| // list is destroyed, so leaving handles in table cache after |
| // versions_.reset() may cause issues. |
| // Here we clean all unreferenced handles in table cache. |
| // Now we assume all user queries have finished, so only version set itself |
| // can possibly hold the blocks from block cache. After releasing unreferenced |
| // handles here, only handles held by version set left and inside |
| // versions_.reset(), we will release them. There, we need to make sure every |
| // time a handle is released, we erase it from the cache too. By doing that, |
| // we can guarantee that after versions_.reset(), table cache is empty |
| // so the cache can be safely destroyed. |
| table_cache_->EraseUnRefEntries(); |
| |
| for (auto& txn_entry : recovered_transactions_) { |
| delete txn_entry.second; |
| } |
| |
| // versions need to be destroyed before table_cache since it can hold |
| // references to table_cache. |
| versions_.reset(); |
| mutex_.Unlock(); |
| if (db_lock_ != nullptr) { |
| env_->UnlockFile(db_lock_); |
| } |
| |
| ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete"); |
| LogFlush(immutable_db_options_.info_log); |
| } |
| |
| void DBImpl::MaybeIgnoreError(Status* s) const { |
| if (s->ok() || immutable_db_options_.paranoid_checks) { |
| // No change needed |
| } else { |
| ROCKS_LOG_WARN(immutable_db_options_.info_log, "Ignoring error %s", |
| s->ToString().c_str()); |
| *s = Status::OK(); |
| } |
| } |
| |
| const Status DBImpl::CreateArchivalDirectory() { |
| if (immutable_db_options_.wal_ttl_seconds > 0 || |
| immutable_db_options_.wal_size_limit_mb > 0) { |
| std::string archivalPath = ArchivalDirectory(immutable_db_options_.wal_dir); |
| return env_->CreateDirIfMissing(archivalPath); |
| } |
| return Status::OK(); |
| } |
| |
| void DBImpl::PrintStatistics() { |
| auto dbstats = immutable_db_options_.statistics.get(); |
| if (dbstats) { |
| ROCKS_LOG_WARN(immutable_db_options_.info_log, "STATISTICS:\n %s", |
| dbstats->ToString().c_str()); |
| } |
| } |
| |
| void DBImpl::MaybeDumpStats() { |
| mutex_.Lock(); |
| unsigned int stats_dump_period_sec = |
| mutable_db_options_.stats_dump_period_sec; |
| mutex_.Unlock(); |
| if (stats_dump_period_sec == 0) return; |
| |
| const uint64_t now_micros = env_->NowMicros(); |
| |
| if (last_stats_dump_time_microsec_ + stats_dump_period_sec * 1000000 <= |
| now_micros) { |
| // Multiple threads could race in here simultaneously. |
| // However, the last one will update last_stats_dump_time_microsec_ |
| // atomically. We could see more than one dump during one dump |
| // period in rare cases. |
| last_stats_dump_time_microsec_ = now_micros; |
| |
| #ifndef ROCKSDB_LITE |
| const DBPropertyInfo* cf_property_info = |
| GetPropertyInfo(DB::Properties::kCFStats); |
| assert(cf_property_info != nullptr); |
| const DBPropertyInfo* db_property_info = |
| GetPropertyInfo(DB::Properties::kDBStats); |
| assert(db_property_info != nullptr); |
| |
| std::string stats; |
| { |
| InstrumentedMutexLock l(&mutex_); |
| default_cf_internal_stats_->GetStringProperty( |
| *db_property_info, DB::Properties::kDBStats, &stats); |
| for (auto cfd : *versions_->GetColumnFamilySet()) { |
| if (cfd->initialized()) { |
| cfd->internal_stats()->GetStringProperty( |
| *cf_property_info, DB::Properties::kCFStatsNoFileHistogram, |
| &stats); |
| } |
| } |
| for (auto cfd : *versions_->GetColumnFamilySet()) { |
| if (cfd->initialized()) { |
| cfd->internal_stats()->GetStringProperty( |
| *cf_property_info, DB::Properties::kCFFileHistogram, &stats); |
| } |
| } |
| } |
| ROCKS_LOG_WARN(immutable_db_options_.info_log, |
| "------- DUMPING STATS -------"); |
| ROCKS_LOG_WARN(immutable_db_options_.info_log, "%s", stats.c_str()); |
| if (immutable_db_options_.dump_malloc_stats) { |
| stats.clear(); |
| DumpMallocStats(&stats); |
| if (!stats.empty()) { |
| ROCKS_LOG_WARN(immutable_db_options_.info_log, |
| "------- Malloc STATS -------"); |
| ROCKS_LOG_WARN(immutable_db_options_.info_log, "%s", stats.c_str()); |
| } |
| } |
| #endif // !ROCKSDB_LITE |
| |
| PrintStatistics(); |
| } |
| } |
| |
| void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) { |
| if (!job_context->logs_to_free.empty()) { |
| for (auto l : job_context->logs_to_free) { |
| AddToLogsToFreeQueue(l); |
| } |
| job_context->logs_to_free.clear(); |
| SchedulePurge(); |
| } |
| } |
| |
| Directory* DBImpl::Directories::GetDataDir(size_t path_id) { |
| assert(path_id < data_dirs_.size()); |
| Directory* ret_dir = data_dirs_[path_id].get(); |
| if (ret_dir == nullptr) { |
| // Should use db_dir_ |
| return db_dir_.get(); |
| } |
| return ret_dir; |
| } |
| |
| Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, |
| const std::unordered_map<std::string, std::string>& options_map) { |
| #ifdef ROCKSDB_LITE |
| return Status::NotSupported("Not supported in ROCKSDB LITE"); |
| #else |
| auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); |
| if (options_map.empty()) { |
| ROCKS_LOG_WARN(immutable_db_options_.info_log, |
| "SetOptions() on column family [%s], empty input", |
| cfd->GetName().c_str()); |
| return Status::InvalidArgument("empty input"); |
| } |
| |
| MutableCFOptions new_options; |
| Status s; |
| Status persist_options_status; |
| WriteThread::Writer w; |
| { |
| InstrumentedMutexLock l(&mutex_); |
| s = cfd->SetOptions(options_map); |
| if (s.ok()) { |
| new_options = *cfd->GetLatestMutableCFOptions(); |
| // Append new version to recompute compaction score. |
| VersionEdit dummy_edit; |
| versions_->LogAndApply(cfd, new_options, &dummy_edit, &mutex_, |
| directories_.GetDbDir()); |
| // Trigger possible flush/compactions. This has to be before we persist |
| // options to file, otherwise there will be a deadlock with writer |
| // thread. |
| auto* old_sv = |
| InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options); |
| delete old_sv; |
| |
| persist_options_status = WriteOptionsFile( |
| false /*need_mutex_lock*/, true /*need_enter_write_thread*/); |
| } |
| } |
| |
| ROCKS_LOG_INFO(immutable_db_options_.info_log, |
| "SetOptions() on column family [%s], inputs:", |
| cfd->GetName().c_str()); |
| for (const auto& o : options_map) { |
| ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(), |
| o.second.c_str()); |
| } |
| if (s.ok()) { |
| ROCKS_LOG_INFO(immutable_db_options_.info_log, |
| "[%s] SetOptions() succeeded", cfd->GetName().c_str()); |
| new_options.Dump(immutable_db_options_.info_log.get()); |
| if (!persist_options_status.ok()) { |
| s = persist_options_status; |
| } |
| } else { |
| ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] SetOptions() failed", |
| cfd->GetName().c_str()); |
| } |
| LogFlush(immutable_db_options_.info_log); |
| return s; |
| #endif // ROCKSDB_LITE |
| } |
| |
| Status DBImpl::SetDBOptions( |
| const std::unordered_map<std::string, std::string>& options_map) { |
| #ifdef ROCKSDB_LITE |
| return Status::NotSupported("Not supported in ROCKSDB LITE"); |
| #else |
| if (options_map.empty()) { |
| ROCKS_LOG_WARN(immutable_db_options_.info_log, |
| "SetDBOptions(), empty input."); |
| return Status::InvalidArgument("empty input"); |
| } |
| |
| MutableDBOptions new_options; |
| Status s; |
| Status persist_options_status; |
| WriteThread::Writer w; |
| WriteContext write_context; |
| { |
| InstrumentedMutexLock l(&mutex_); |
| s = GetMutableDBOptionsFromStrings(mutable_db_options_, options_map, |
| &new_options); |
| if (s.ok()) { |
| if (new_options.max_background_compactions > |
| mutable_db_options_.max_background_compactions) { |
| env_->IncBackgroundThreadsIfNeeded( |
| new_options.max_background_compactions, Env::Priority::LOW); |
| MaybeScheduleFlushOrCompaction(); |
| } |
| |
| write_controller_.set_max_delayed_write_rate(new_options.delayed_write_rate); |
| table_cache_.get()->SetCapacity(new_options.max_open_files == -1 |
| ? TableCache::kInfiniteCapacity |
| : new_options.max_open_files - 10); |
| |
| mutable_db_options_ = new_options; |
| |
| write_thread_.EnterUnbatched(&w, &mutex_); |
| if (total_log_size_ > GetMaxTotalWalSize()) { |
| Status purge_wal_status = HandleWALFull(&write_context); |
| if (!purge_wal_status.ok()) { |
| ROCKS_LOG_WARN(immutable_db_options_.info_log, |
| "Unable to purge WAL files in SetDBOptions() -- %s", |
| purge_wal_status.ToString().c_str()); |
| } |
| } |
| persist_options_status = WriteOptionsFile( |
| false /*need_mutex_lock*/, false /*need_enter_write_thread*/); |
| write_thread_.ExitUnbatched(&w); |
| } |
| } |
| ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions(), inputs:"); |
| for (const auto& o : options_map) { |
| ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(), |
| o.second.c_str()); |
| } |
| if (s.ok()) { |
| ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions() succeeded"); |
| new_options.Dump(immutable_db_options_.info_log.get()); |
| if (!persist_options_status.ok()) { |
| if (immutable_db_options_.fail_if_options_file_error) { |
| s = Status::IOError( |
| "SetDBOptions() succeeded, but unable to persist options", |
| persist_options_status.ToString()); |
| } |
| ROCKS_LOG_WARN(immutable_db_options_.info_log, |
| "Unable to persist options in SetDBOptions() -- %s", |
| persist_options_status.ToString().c_str()); |
| } |
| } else { |
| ROCKS_LOG_WARN(immutable_db_options_.info_log, "SetDBOptions failed"); |
| } |
| LogFlush(immutable_db_options_.info_log); |
| return s; |
| #endif // ROCKSDB_LITE |
| } |
| |
| // return the same level if it cannot be moved |
| int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, |
| const MutableCFOptions& mutable_cf_options, int level) { |
| mutex_.AssertHeld(); |
| const auto* vstorage = cfd->current()->storage_info(); |
| int minimum_level = level; |
| for (int i = level - 1; i > 0; --i) { |
| // stop if level i is not empty |
| if (vstorage->NumLevelFiles(i) > 0) break; |
| // stop if level i is too small (cannot fit the level files) |
| if (vstorage->MaxBytesForLevel(i) < vstorage->NumLevelBytes(level)) { |
| break; |
| } |
| |
| minimum_level = i; |
| } |
| return minimum_level; |
| } |
| |
| Status DBImpl::FlushWAL(bool sync) { |
| { |
| // We need to lock log_write_mutex_ since logs_ might change concurrently |
| InstrumentedMutexLock wl(&log_write_mutex_); |
| log::Writer* cur_log_writer = logs_.back().writer; |
| auto s = cur_log_writer->WriteBuffer(); |
| if (!s.ok()) { |
| ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s", |
| s.ToString().c_str()); |
| } |
| if (!sync) { |
| ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false"); |
| return s; |
| } |
| } |
| // sync = true |
| ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=true"); |
| return SyncWAL(); |
| } |
| |
| Status DBImpl::SyncWAL() { |
| autovector<log::Writer*, 1> logs_to_sync; |
| bool need_log_dir_sync; |
| uint64_t current_log_number; |
| |
| { |
| InstrumentedMutexLock l(&mutex_); |
| assert(!logs_.empty()); |
| |
| // This SyncWAL() call only cares about logs up to this number. |
| current_log_number = logfile_number_; |
| |
| while (logs_.front().number <= current_log_number && |
| logs_.front().getting_synced) { |
| log_sync_cv_.Wait(); |
| } |
| // First check that logs are safe to sync in background. |
| for (auto it = logs_.begin(); |
| it != logs_.end() && it->number <= current_log_number; ++it) { |
| if (!it->writer->file()->writable_file()->IsSyncThreadSafe()) { |
| return Status::NotSupported( |
| "SyncWAL() is not supported for this implementation of WAL file", |
| immutable_db_options_.allow_mmap_writes |
| ? "try setting Options::allow_mmap_writes to false" |
| : Slice()); |
| } |
| } |
| for (auto it = logs_.begin(); |
| it != logs_.end() && it->number <= current_log_number; ++it) { |
| auto& log = *it; |
| assert(!log.getting_synced); |
| log.getting_synced = true; |
| logs_to_sync.push_back(log.writer); |
| } |
| |
| need_log_dir_sync = !log_dir_synced_; |
| } |
| |
| TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1"); |
| RecordTick(stats_, WAL_FILE_SYNCED); |
| Status status; |
| for (log::Writer* log : logs_to_sync) { |
| status = log->file()->SyncWithoutFlush(immutable_db_options_.use_fsync); |
| if (!status.ok()) { |
| break; |
| } |
| } |
| if (status.ok() && need_log_dir_sync) { |
| status = directories_.GetWalDir()->Fsync(); |
| } |
| TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2"); |
| |
| TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1"); |
| { |
| InstrumentedMutexLock l(&mutex_); |
| MarkLogsSynced(current_log_number, need_log_dir_sync, status); |
| } |
| TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2"); |
| |
| return status; |
| } |
| |
| void DBImpl::MarkLogsSynced( |
| uint64_t up_to, bool synced_dir, const Status& status) { |
| mutex_.AssertHeld(); |
| if (synced_dir && |
| logfile_number_ == up_to && |
| status.ok()) { |
| log_dir_synced_ = true; |
| } |
| for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) { |
| auto& log = *it; |
| assert(log.getting_synced); |
| if (status.ok() && logs_.size() > 1) { |
| logs_to_free_.push_back(log.ReleaseWriter()); |
| it = logs_.erase(it); |
| } else { |
| log.getting_synced = false; |
| ++it; |
| } |
| } |
| assert(!status.ok() || logs_.empty() || logs_[0].number > up_to || |
| (logs_.size() == 1 && !logs_[0].getting_synced)); |
| log_sync_cv_.SignalAll(); |
| } |
| |
| SequenceNumber DBImpl::GetLatestSequenceNumber() const { |
| return versions_->LastSequence(); |
| } |
| |
| InternalIterator* DBImpl::NewInternalIterator( |
| Arena* arena, RangeDelAggregator* range_del_agg, |
| ColumnFamilyHandle* column_family) { |
| ColumnFamilyData* cfd; |
| if (column_family == nullptr) { |
| cfd = default_cf_handle_->cfd(); |
| } else { |
| auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); |
| cfd = cfh->cfd(); |
| } |
| |
| mutex_.Lock(); |
| SuperVersion* super_version = cfd->GetSuperVersion()->Ref(); |
| mutex_.Unlock(); |
| ReadOptions roptions; |
| return NewInternalIterator(roptions, cfd, super_version, arena, |
| range_del_agg); |
| } |
| |
| void DBImpl::SchedulePurge() { |
| mutex_.AssertHeld(); |
| assert(opened_successfully_); |
| |
| // Purge operations are put into High priority queue |
| bg_purge_scheduled_++; |
| env_->Schedule(&DBImpl::BGWorkPurge, this, Env::Priority::HIGH, nullptr); |
| } |
| |
| void DBImpl::BackgroundCallPurge() { |
| mutex_.Lock(); |
| |
| // We use one single loop to clear both queues so that after existing the loop |
| // both queues are empty. This is stricter than what is needed, but can make |
| // it easier for us to reason the correctness. |
| while (!purge_queue_.empty() || !logs_to_free_queue_.empty()) { |
| if (!purge_queue_.empty()) { |
| auto purge_file = purge_queue_.begin(); |
| auto fname = purge_file->fname; |
| auto type = purge_file->type; |
| auto number = purge_file->number; |
| auto path_id = purge_file->path_id; |
| auto job_id = purge_file->job_id; |
| purge_queue_.pop_front(); |
| |
| mutex_.Unlock(); |
| Status file_deletion_status; |
| DeleteObsoleteFileImpl(file_deletion_status, job_id, fname, type, number, |
| path_id); |
| mutex_.Lock(); |
| } else { |
| assert(!logs_to_free_queue_.empty()); |
| log::Writer* log_writer = *(logs_to_free_queue_.begin()); |
| logs_to_free_queue_.pop_front(); |
| mutex_.Unlock(); |
| delete log_writer; |
| mutex_.Lock(); |
| } |
| } |
| bg_purge_scheduled_--; |
| |
| bg_cv_.SignalAll(); |
| // IMPORTANT:there should be no code after calling SignalAll. This call may |
| // signal the DB destructor that it's OK to proceed with destruction. In |
| // that case, all DB variables will be dealloacated and referencing them |
| // will cause trouble. |
| mutex_.Unlock(); |
| } |
| |
| namespace { |
| struct IterState { |
| IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version, |
| bool _background_purge) |
| : db(_db), |
| mu(_mu), |
| super_version(_super_version), |
| background_purge(_background_purge) {} |
| |
| DBImpl* db; |
| InstrumentedMutex* mu; |
| SuperVersion* super_version; |
| bool background_purge; |
| }; |
| |
| static void CleanupIteratorState(void* arg1, void* arg2) { |
| IterState* state = reinterpret_cast<IterState*>(arg1); |
| |
| if (state->super_version->Unref()) { |
| // Job id == 0 means that this is not our background process, but rather |
| // user thread |
| JobContext job_context(0); |
| |
| state->mu->Lock(); |
| state->super_version->Cleanup(); |
| state->db->FindObsoleteFiles(&job_context, false, true); |
| if (state->background_purge) { |
| state->db->ScheduleBgLogWriterClose(&job_context); |
| } |
| state->mu->Unlock(); |
| |
| delete state->super_version; |
| if (job_context.HaveSomethingToDelete()) { |
| if (state->background_purge) { |
| // PurgeObsoleteFiles here does not delete files. Instead, it adds the |
| // files to be deleted to a job queue, and deletes it in a separate |
| // background thread. |
| state->db->PurgeObsoleteFiles(job_context, true /* schedule only */); |
| state->mu->Lock(); |
| state->db->SchedulePurge(); |
| state->mu->Unlock(); |
| } else { |
| state->db->PurgeObsoleteFiles(job_context); |
| } |
| } |
| job_context.Clean(); |
| } |
| |
| delete state; |
| } |
| } // namespace |
| |
| InternalIterator* DBImpl::NewInternalIterator( |
| const ReadOptions& read_options, ColumnFamilyData* cfd, |
| SuperVersion* super_version, Arena* arena, |
| RangeDelAggregator* range_del_agg) { |
| InternalIterator* internal_iter; |
| assert(arena != nullptr); |
| assert(range_del_agg != nullptr); |
| // Need to create internal iterator from the arena. |
| MergeIteratorBuilder merge_iter_builder( |
| &cfd->internal_comparator(), arena, |
| !read_options.total_order_seek && |
| cfd->ioptions()->prefix_extractor != nullptr); |
| // Collect iterator for mutable mem |
| merge_iter_builder.AddIterator( |
| super_version->mem->NewIterator(read_options, arena)); |
| std::unique_ptr<InternalIterator> range_del_iter; |
| Status s; |
| if (!read_options.ignore_range_deletions) { |
| range_del_iter.reset( |
| super_version->mem->NewRangeTombstoneIterator(read_options)); |
| s = range_del_agg->AddTombstones(std::move(range_del_iter)); |
| } |
| // Collect all needed child iterators for immutable memtables |
| if (s.ok()) { |
| super_version->imm->AddIterators(read_options, &merge_iter_builder); |
| if (!read_options.ignore_range_deletions) { |
| s = super_version->imm->AddRangeTombstoneIterators(read_options, arena, |
| range_del_agg); |
| } |
| } |
| if (s.ok()) { |
| // Collect iterators for files in L0 - Ln |
| if (read_options.read_tier != kMemtableTier) { |
| super_version->current->AddIterators(read_options, env_options_, |
| &merge_iter_builder, range_del_agg); |
| } |
| internal_iter = merge_iter_builder.Finish(); |
| IterState* cleanup = |
| new IterState(this, &mutex_, super_version, |
| read_options.background_purge_on_iterator_cleanup); |
| internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); |
| |
| return internal_iter; |
| } |
| return NewErrorInternalIterator(s); |
| } |
| |
| ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const { |
| return default_cf_handle_; |
| } |
| |
| Status DBImpl::Get(const ReadOptions& read_options, |
| ColumnFamilyHandle* column_family, const Slice& key, |
| PinnableSlice* value) { |
| return GetImpl(read_options, column_family, key, value); |
| } |
| |
| Status DBImpl::GetImpl(const ReadOptions& read_options, |
| ColumnFamilyHandle* column_family, const Slice& key, |
| PinnableSlice* pinnable_val, bool* value_found, |
| bool* is_blob_index) { |
| assert(pinnable_val != nullptr); |
| StopWatch sw(env_, stats_, DB_GET); |
| PERF_TIMER_GUARD(get_snapshot_time); |
| |
| auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); |
| auto cfd = cfh->cfd(); |
| |
| // Acquire SuperVersion |
| SuperVersion* sv = GetAndRefSuperVersion(cfd); |
| |
| TEST_SYNC_POINT("DBImpl::GetImpl:1"); |
| TEST_SYNC_POINT("DBImpl::GetImpl:2"); |
| |
| SequenceNumber snapshot; |
| if (read_options.snapshot != nullptr) { |
| snapshot = reinterpret_cast<const SnapshotImpl*>( |
| read_options.snapshot)->number_; |
| } else { |
| // Since we get and reference the super version before getting |
| // the snapshot number, without a mutex protection, it is possible |
| // that a memtable switch happened in the middle and not all the |
| // data for this snapshot is available. But it will contain all |
| // the data available in the super version we have, which is also |
| // a valid snapshot to read from. |
| // We shouldn't get snapshot before finding and referencing the |
| // super versipon because a flush happening in between may compact |
| // away data for the snapshot, but the snapshot is earlier than the |
| // data overwriting it, so users may see wrong results. |
| snapshot = versions_->LastSequence(); |
| } |
| TEST_SYNC_POINT("DBImpl::GetImpl:3"); |
| TEST_SYNC_POINT("DBImpl::GetImpl:4"); |
| |
| // Prepare to store a list of merge operations if merge occurs. |
| MergeContext merge_context; |
| RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot); |
| |
| Status s; |
| // First look in the memtable, then in the immutable memtable (if any). |
| // s is both in/out. When in, s could either be OK or MergeInProgress. |
| // merge_operands will contain the sequence of merges in the latter case. |
| LookupKey lkey(key, snapshot); |
| PERF_TIMER_STOP(get_snapshot_time); |
| |
| bool skip_memtable = (read_options.read_tier == kPersistedTier && |
| has_unpersisted_data_.load(std::memory_order_relaxed)); |
| bool done = false; |
| if (!skip_memtable) { |
| if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, |
| &range_del_agg, read_options, is_blob_index)) { |
| done = true; |
| pinnable_val->PinSelf(); |
| RecordTick(stats_, MEMTABLE_HIT); |
| } else if ((s.ok() || s.IsMergeInProgress()) && |
| sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, |
| &range_del_agg, read_options, is_blob_index)) { |
| done = true; |
| pinnable_val->PinSelf(); |
| RecordTick(stats_, MEMTABLE_HIT); |
| } |
| if (!done && !s.ok() && !s.IsMergeInProgress()) { |
| return s; |
| } |
| } |
| if (!done) { |
| PERF_TIMER_GUARD(get_from_output_files_time); |
| sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context, |
| &range_del_agg, value_found, nullptr, nullptr, |
| is_blob_index); |
| RecordTick(stats_, MEMTABLE_MISS); |
| } |
| |
| { |
| PERF_TIMER_GUARD(get_post_process_time); |
| |
| ReturnAndCleanupSuperVersion(cfd, sv); |
| |
| RecordTick(stats_, NUMBER_KEYS_READ); |
| size_t size = pinnable_val->size(); |
| RecordTick(stats_, BYTES_READ, size); |
| MeasureTime(stats_, BYTES_PER_READ, size); |
| PERF_COUNTER_ADD(get_read_bytes, size); |
| } |
| return s; |
| } |
| |
| std::vector<Status> DBImpl::MultiGet( |
| const ReadOptions& read_options, |
| const std::vector<ColumnFamilyHandle*>& column_family, |
| const std::vector<Slice>& keys, std::vector<std::string>* values) { |
| |
| StopWatch sw(env_, stats_, DB_MULTIGET); |
| PERF_TIMER_GUARD(get_snapshot_time); |
| |
| SequenceNumber snapshot; |
| |
| struct MultiGetColumnFamilyData { |
| ColumnFamilyData* cfd; |
| SuperVersion* super_version; |
| }; |
| std::unordered_map<uint32_t, MultiGetColumnFamilyData*> multiget_cf_data; |
| // fill up and allocate outside of mutex |
| for (auto cf : column_family) { |
| auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(cf); |
| auto cfd = cfh->cfd(); |
| if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) { |
| auto mgcfd = new MultiGetColumnFamilyData(); |
| mgcfd->cfd = cfd; |
| multiget_cf_data.insert({cfd->GetID(), mgcfd}); |
| } |
| } |
| |
| mutex_.Lock(); |
| if (read_options.snapshot != nullptr) { |
| snapshot = reinterpret_cast<const SnapshotImpl*>( |
| read_options.snapshot)->number_; |
| } else { |
| snapshot = versions_->LastSequence(); |
| } |
| for (auto mgd_iter : multiget_cf_data) { |
| mgd_iter.second->super_version = |
| mgd_iter.second->cfd->GetSuperVersion()->Ref(); |
| } |
| mutex_.Unlock(); |
| |
| // Contain a list of merge operations if merge occurs. |
| MergeContext merge_context; |
| |
| // Note: this always resizes the values array |
| size_t num_keys = keys.size(); |
| std::vector<Status> stat_list(num_keys); |
| values->resize(num_keys); |
| |
| // Keep track of bytes that we read for statistics-recording later |
| uint64_t bytes_read = 0; |
| PERF_TIMER_STOP(get_snapshot_time); |
| |
| // For each of the given keys, apply the entire "get" process as follows: |
| // First look in the memtable, then in the immutable memtable (if any). |
| // s is both in/out. When in, s could either be OK or MergeInProgress. |
| // merge_operands will contain the sequence of merges in the latter case. |
| for (size_t i = 0; i < num_keys; ++i) { |
| merge_context.Clear(); |
| Status& s = stat_list[i]; |
| std::string* value = &(*values)[i]; |
| |
| LookupKey lkey(keys[i], snapshot); |
| auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]); |
| RangeDelAggregator range_del_agg(cfh->cfd()->internal_comparator(), |
| snapshot); |
| auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID()); |
| assert(mgd_iter != multiget_cf_data.end()); |
| auto mgd = mgd_iter->second; |
| auto super_version = mgd->super_version; |
| bool skip_memtable = |
| (read_options.read_tier == kPersistedTier && |
| has_unpersisted_data_.load(std::memory_order_relaxed)); |
| bool done = false; |
| if (!skip_memtable) { |
| if (super_version->mem->Get(lkey, value, &s, &merge_context, |
| &range_del_agg, read_options)) { |
| done = true; |
| // TODO(?): RecordTick(stats_, MEMTABLE_HIT)? |
| } else if (super_version->imm->Get(lkey, value, &s, &merge_context, |
| &range_del_agg, read_options)) { |
| done = true; |
| // TODO(?): RecordTick(stats_, MEMTABLE_HIT)? |
| } |
| } |
| if (!done) { |
| PinnableSlice pinnable_val; |
| PERF_TIMER_GUARD(get_from_output_files_time); |
| super_version->current->Get(read_options, lkey, &pinnable_val, &s, |
| &merge_context, &range_del_agg); |
| value->assign(pinnable_val.data(), pinnable_val.size()); |
| // TODO(?): RecordTick(stats_, MEMTABLE_MISS)? |
| } |
| |
| if (s.ok()) { |
| bytes_read += value->size(); |
| } |
| } |
| |
| // Post processing (decrement reference counts and record statistics) |
| PERF_TIMER_GUARD(get_post_process_time); |
| autovector<SuperVersion*> superversions_to_delete; |
| |
| // TODO(icanadi) do we need lock here or just around Cleanup()? |
| mutex_.Lock(); |
| for (auto mgd_iter : multiget_cf_data) { |
| auto mgd = mgd_iter.second; |
| if (mgd->super_version->Unref()) { |
| mgd->super_version->Cleanup(); |
| superversions_to_delete.push_back(mgd->super_version); |
| } |
| } |
| mutex_.Unlock(); |
| |
| for (auto td : superversions_to_delete) { |
| delete td; |
| } |
| for (auto mgd : multiget_cf_data) { |
| delete mgd.second; |
| } |
| |
| RecordTick(stats_, NUMBER_MULTIGET_CALLS); |
| RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys); |
| RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read); |
| MeasureTime(stats_, BYTES_PER_MULTIGET, bytes_read); |
| PERF_COUNTER_ADD(multiget_read_bytes, bytes_read); |
| PERF_TIMER_STOP(get_post_process_time); |
| |
| return stat_list; |
| } |
| |
| Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, |
| const std::string& column_family, |
| ColumnFamilyHandle** handle) { |
| assert(handle != nullptr); |
| Status s = CreateColumnFamilyImpl(cf_options, column_family, handle); |
| if (s.ok()) { |
| s = WriteOptionsFile(true /*need_mutex_lock*/, |
| true /*need_enter_write_thread*/); |
| } |
| return s; |
| } |
| |
| Status DBImpl::CreateColumnFamilies( |
| const ColumnFamilyOptions& cf_options, |
| const std::vector<std::string>& column_family_names, |
| std::vector<ColumnFamilyHandle*>* handles) { |
| assert(handles != nullptr); |
| handles->clear(); |
| size_t num_cf = column_family_names.size(); |
| Status s; |
| bool success_once = false; |
| for (size_t i = 0; i < num_cf; i++) { |
| ColumnFamilyHandle* handle; |
| s = CreateColumnFamilyImpl(cf_options, column_family_names[i], &handle); |
| if (!s.ok()) { |
| break; |
| } |
| handles->push_back(handle); |
| success_once = true; |
| } |
| if (success_once) { |
| Status persist_options_status = WriteOptionsFile( |
| true /*need_mutex_lock*/, true /*need_enter_write_thread*/); |
| if (s.ok() && !persist_options_status.ok()) { |
| s = persist_options_status; |
| } |
| } |
| return s; |
| } |
| |
| Status DBImpl::CreateColumnFamilies( |
| const std::vector<ColumnFamilyDescriptor>& column_families, |
| std::vector<ColumnFamilyHandle*>* handles) { |
| assert(handles != nullptr); |
| handles->clear(); |
| size_t num_cf = column_families.size(); |
| Status s; |
| bool success_once = false; |
| for (size_t i = 0; i < num_cf; i++) { |
| ColumnFamilyHandle* handle; |
| s = CreateColumnFamilyImpl(column_families[i].options, |
| column_families[i].name, &handle); |
| if (!s.ok()) { |
| break; |
| } |
| handles->push_back(handle); |
| success_once = true; |
| } |
| if (success_once) { |
| Status persist_options_status = WriteOptionsFile( |
| true /*need_mutex_lock*/, true /*need_enter_write_thread*/); |
| if (s.ok() && !persist_options_status.ok()) { |
| s = persist_options_status; |
| } |
| } |
| return s; |
| } |
| |
| Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, |
| const std::string& column_family_name, |
| ColumnFamilyHandle** handle) { |
| Status s; |
| Status persist_options_status; |
| *handle = nullptr; |
| |
| s = CheckCompressionSupported(cf_options); |
| if (s.ok() && immutable_db_options_.allow_concurrent_memtable_write) { |
| s = CheckConcurrentWritesSupported(cf_options); |
| } |
| if (!s.ok()) { |
| return s; |
| } |
| |
| { |
| InstrumentedMutexLock l(&mutex_); |
| |
| if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) != |
| nullptr) { |
| return Status::InvalidArgument("Column family already exists"); |
| } |
| VersionEdit edit; |
| edit.AddColumnFamily(column_family_name); |
| uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID(); |
| edit.SetColumnFamily(new_id); |
| edit.SetLogNumber(logfile_number_); |
| edit.SetComparatorName(cf_options.comparator->Name()); |
| |
| // LogAndApply will both write the creation in MANIFEST and create |
| // ColumnFamilyData object |
| { // write thread |
| WriteThread::Writer w; |
| write_thread_.EnterUnbatched(&w, &mutex_); |
| // LogAndApply will both write the creation in MANIFEST and create |
| // ColumnFamilyData object |
| s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options), &edit, |
| &mutex_, directories_.GetDbDir(), false, |
| &cf_options); |
| write_thread_.ExitUnbatched(&w); |
| } |
| if (s.ok()) { |
| single_column_family_mode_ = false; |
| auto* cfd = |
| versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); |
| assert(cfd != nullptr); |
| delete InstallSuperVersionAndScheduleWork( |
| cfd, nullptr, *cfd->GetLatestMutableCFOptions()); |
| |
| if (!cfd->mem()->IsSnapshotSupported()) { |
| is_snapshot_supported_ = false; |
| } |
| |
| cfd->set_initialized(); |
| |
| *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_); |
| ROCKS_LOG_INFO(immutable_db_options_.info_log, |
| "Created column family [%s] (ID %u)", |
| column_family_name.c_str(), (unsigned)cfd->GetID()); |
| } else { |
| ROCKS_LOG_ERROR(immutable_db_options_.info_log, |
| "Creating column family [%s] FAILED -- %s", |
| column_family_name.c_str(), s.ToString().c_str()); |
| } |
| } // InstrumentedMutexLock l(&mutex_) |
| |
| // this is outside the mutex |
| if (s.ok()) { |
| NewThreadStatusCfInfo( |
| reinterpret_cast<ColumnFamilyHandleImpl*>(*handle)->cfd()); |
| } |
| return s; |
| } |
| |
| Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { |
| assert(column_family != nullptr); |
| Status s = DropColumnFamilyImpl(column_family); |
| if (s.ok()) { |
| s = WriteOptionsFile(true /*need_mutex_lock*/, |
| true /*need_enter_write_thread*/); |
| } |
| return s; |
| } |
| |
| Status DBImpl::DropColumnFamilies( |
| const std::vector<ColumnFamilyHandle*>& column_families) { |
| Status s; |
| bool success_once = false; |
| for (auto* handle : column_families) { |
| s = DropColumnFamilyImpl(handle); |
| if (!s.ok()) { |
| break; |
| } |
| success_once = true; |
| } |
| if (success_once) { |
| Status persist_options_status = WriteOptionsFile( |
| true /*need_mutex_lock*/, true /*need_enter_write_thread*/); |
| if (s.ok() && !persist_options_status.ok()) { |
| s = persist_options_status; |
| } |
| } |
| return s; |
| } |
| |
| Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) { |
| auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); |
| auto cfd = cfh->cfd(); |
| if (cfd->GetID() == 0) { |
| return Status::InvalidArgument("Can't drop default column family"); |
| } |
| |
| bool cf_support_snapshot = cfd->mem()->IsSnapshotSupported(); |
| |
| VersionEdit edit; |
| edit.DropColumnFamily(); |
| edit.SetColumnFamily(cfd->GetID()); |
| |
| Status s; |
| { |
| InstrumentedMutexLock l(&mutex_); |
| if (cfd->IsDropped()) { |
| s = Status::InvalidArgument("Column family already dropped!\n"); |
| } |
| if (s.ok()) { |
| // we drop column family from a single write thread |
| WriteThread::Writer w; |
| write_thread_.EnterUnbatched(&w, &mutex_); |
| s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), |
| &edit, &mutex_); |
| write_thread_.ExitUnbatched(&w); |
| } |
| if (s.ok()) { |
| auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); |
| max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size * |
| mutable_cf_options->max_write_buffer_number; |
| } |
| |
| if (!cf_support_snapshot) { |
| // Dropped Column Family doesn't support snapshot. Need to recalculate |
| // is_snapshot_supported_. |
| bool new_is_snapshot_supported = true; |
| for (auto c : *versions_->GetColumnFamilySet()) { |
| if (!c->IsDropped() && !c->mem()->IsSnapshotSupported()) { |
| new_is_snapshot_supported = false; |
| break; |
| } |
| } |
| is_snapshot_supported_ = new_is_snapshot_supported; |
| } |
| } |
| |
| if (s.ok()) { |
| // Note that here we erase the associated cf_info of the to-be-dropped |
| // cfd before its ref-count goes to zero to avoid having to erase cf_info |
| // later inside db_mutex. |
| EraseThreadStatusCfInfo(cfd); |
| assert(cfd->IsDropped()); |
| ROCKS_LOG_INFO(immutable_db_options_.info_log, |
| "Dropped column family with id %u\n", cfd->GetID()); |
| } else { |
| ROCKS_LOG_ERROR(immutable_db_options_.info_log, |
| "Dropping column family with id %u FAILED -- %s\n", |
| cfd->GetID(), s.ToString().c_str()); |
| } |
| |
| return s; |
| } |
| |
| bool DBImpl::KeyMayExist(const ReadOptions& read_options, |
| ColumnFamilyHandle* column_family, const Slice& key, |
| std::string* value, bool* value_found) { |
| assert(value != nullptr); |
| if (value_found != nullptr) { |
| // falsify later if key-may-exist but can't fetch value |
| *value_found = true; |
| } |
| ReadOptions roptions = read_options; |
| roptions.read_tier = kBlockCacheTier; // read from block cache only |
| PinnableSlice pinnable_val; |
| auto s = GetImpl(roptions, column_family, key, &pinnable_val, value_found); |
| value->assign(pinnable_val.data(), pinnable_val.size()); |
| |
| // If block_cache is enabled and the index block of the table didn't |
| // not present in block_cache, the return value will be Status::Incomplete. |
| // In this case, key may still exist in the table. |
| return s.ok() || s.IsIncomplete(); |
| } |
| |
| Iterator* DBImpl::NewIterator(const ReadOptions& read_options, |
| ColumnFamilyHandle* column_family) { |
| if (read_options.read_tier == kPersistedTier) { |
| return NewErrorIterator(Status::NotSupported( |
| "ReadTier::kPersistedData is not yet supported in iterators.")); |
| } |
| auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); |
| auto cfd = cfh->cfd(); |
| if (read_options.managed) { |
| #ifdef ROCKSDB_LITE |
| // not supported in lite version |
| return NewErrorIterator(Status::InvalidArgument( |
| "Managed Iterators not supported in RocksDBLite.")); |
| #else |
| if ((read_options.tailing) || (read_options.snapshot != nullptr) || |
| (is_snapshot_supported_)) { |
| return new ManagedIterator(this, read_options, cfd); |
| } |
| // Managed iter not supported |
| return NewErrorIterator(Status::InvalidArgument( |
| "Managed Iterators not supported without snapshots.")); |
| #endif |
| } else if (read_options.tailing) { |
| #ifdef ROCKSDB_LITE |
| // not supported in lite version |
| return nullptr; |
| #else |
| SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); |
| auto iter = new ForwardIterator(this, read_options, cfd, sv); |
| return NewDBIterator( |
| env_, read_options, *cfd->ioptions(), cfd->user_comparator(), iter, |
| kMaxSequenceNumber, |
| sv->mutable_cf_options.max_sequential_skip_in_iterations); |
| #endif |
| } else { |
| SequenceNumber latest_snapshot = versions_->LastSequence(); |
| auto snapshot = |
| read_options.snapshot != nullptr |
| ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot) |
| ->number_ |
| : latest_snapshot; |
| return NewIteratorImpl(read_options, cfd, snapshot); |
| } |
| // To stop compiler from complaining |
| return nullptr; |
| } |
| |
| ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options, |
| ColumnFamilyData* cfd, |
| SequenceNumber snapshot, |
| bool allow_blob) { |
| SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); |
| |
| // Try to generate a DB iterator tree in continuous memory area to be |
| // cache friendly. Here is an example of result: |
| // +-------------------------------+ |
| // | | |
| // | ArenaWrappedDBIter | |
| // | + | |
| // | +---> Inner Iterator ------------+ |
| // | | | | |
| // | | +-- -- -- -- -- -- -- --+ | |
| // | +--- | Arena | | |
| // | | | | |
| // | Allocated Memory: | | |
| // | | +-------------------+ | |
| // | | | DBIter | <---+ |
| // | | + | |
| // | | | +-> iter_ ------------+ |
| // | | | | | |
| // | | +-------------------+ | |
| // | | | MergingIterator | <---+ |
| // | | + | |
| // | | | +->child iter1 ------------+ |
| // | | | | | | |
| // | | +->child iter2 ----------+ | |
| // | | | | | | | |
| // | | | +->child iter3 --------+ | | |
| // | | | | | | |
| // | | +-------------------+ | | | |
| // | | | Iterator1 | <--------+ |
| // | | +-------------------+ | | |
| // | | | Iterator2 | <------+ |
| // | | +-------------------+ | |
| // | | | Iterator3 | <----+ |
| // | | +-------------------+ |
| // | | | |
| // +-------+-----------------------+ |
| // |
| // ArenaWrappedDBIter inlines an arena area where all the iterators in |
| // the iterator tree are allocated in the order of being accessed when |
| // querying. |
| // Laying out the iterators in the order of being accessed makes it more |
| // likely that any iterator pointer is close to the iterator it points to so |
| // that they are likely to be in the same cache line and/or page. |
| ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( |
| env_, read_options, *cfd->ioptions(), snapshot, |
| sv->mutable_cf_options.max_sequential_skip_in_iterations, |
| sv->version_number, ((read_options.snapshot != nullptr) ? nullptr : this), |
| cfd, allow_blob); |
| |
| InternalIterator* internal_iter = |
| NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), |
| db_iter->GetRangeDelAggregator()); |
| db_iter->SetIterUnderDBIter(internal_iter); |
| |
| return db_iter; |
| } |
| |
| Status DBImpl::NewIterators( |
| const ReadOptions& read_options, |
| const std::vector<ColumnFamilyHandle*>& column_families, |
| std::vector<Iterator*>* iterators) { |
| if (read_options.read_tier == kPersistedTier) { |
| return Status::NotSupported( |
| "ReadTier::kPersistedData is not yet supported in iterators."); |
| } |
| iterators->clear(); |
| iterators->reserve(column_families.size()); |
| if (read_options.managed) { |
| #ifdef ROCKSDB_LITE |
| return Status::InvalidArgument( |
| "Managed interator not supported in RocksDB lite"); |
| #else |
| if ((!read_options.tailing) && (read_options.snapshot == nullptr) && |
| (!is_snapshot_supported_)) { |
| return Status::InvalidArgument( |
| "Managed interator not supported without snapshots"); |
| } |
| for (auto cfh : column_families) { |
| auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd(); |
| auto iter = new ManagedIterator(this, read_options, cfd); |
| iterators->push_back(iter); |
| } |
| #endif |
| } else if (read_options.tailing) { |
| #ifdef ROCKSDB_LITE |
| return Status::InvalidArgument( |
| "Tailing interator not supported in RocksDB lite"); |
| #else |
| for (auto cfh : column_families) { |
| auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd(); |
| SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); |
| auto iter = new ForwardIterator(this, read_options, cfd, sv); |
| iterators->push_back(NewDBIterator( |
| env_, read_options, *cfd->ioptions(), cfd->user_comparator(), iter, |
| kMaxSequenceNumber, |
| sv->mutable_cf_options.max_sequential_skip_in_iterations)); |
| } |
| #endif |
| } else { |
| SequenceNumber latest_snapshot = versions_->LastSequence(); |
| auto snapshot = |
| read_options.snapshot != nullptr |
| ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot) |
| ->number_ |
| : latest_snapshot; |
| |
| for (size_t i = 0; i < column_families.size(); ++i) { |
| auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>( |
| column_families[i])->cfd(); |
| iterators->push_back(NewIteratorImpl(read_options, cfd, snapshot)); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| const Snapshot* DBImpl::GetSnapshot() { return GetSnapshotImpl(false); } |
| |
| #ifndef ROCKSDB_LITE |
| const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() { |
| return GetSnapshotImpl(true); |
| } |
| #endif // ROCKSDB_LITE |
| |
| const Snapshot* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary) { |
| int64_t unix_time = 0; |
| env_->GetCurrentTime(&unix_time); // Ignore error |
| SnapshotImpl* s = new SnapshotImpl; |
| |
| InstrumentedMutexLock l(&mutex_); |
| // returns null if the underlying memtable does not support snapshot. |
| if (!is_snapshot_supported_) { |
| delete s; |
| return nullptr; |
| } |
| return snapshots_.New(s, versions_->LastSequence(), unix_time, |
| is_write_conflict_boundary); |
| } |
| |
| void DBImpl::ReleaseSnapshot(const Snapshot* s) { |
| const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s); |
| { |
| InstrumentedMutexLock l(&mutex_); |
| snapshots_.Delete(casted_s); |
| } |
| delete casted_s; |
| } |
| |
| bool DBImpl::HasActiveSnapshotInRange(SequenceNumber lower_bound, |
| SequenceNumber upper_bound) { |
| InstrumentedMutexLock l(&mutex_); |
| return snapshots_.HasSnapshotInRange(lower_bound, upper_bound); |
| } |
| |
| #ifndef ROCKSDB_LITE |
| Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family, |
| TablePropertiesCollection* props) { |
| auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); |
| auto cfd = cfh->cfd(); |
| |
| // Increment the ref count |
| mutex_.Lock(); |
| auto version = cfd->current(); |
| version->Ref(); |
| mutex_.Unlock(); |
| |
| auto s = version->GetPropertiesOfAllTables(props); |
| |
| // Decrement the ref count |
| mutex_.Lock(); |
| version->Unref(); |
| mutex_.Unlock(); |
| |
| return s; |
| } |
| |
| Status DBImpl::GetPropertiesOfTablesInRange(ColumnFamilyHandle* column_family, |
| const Range* range, std::size_t n, |
| TablePropertiesCollection* props) { |
| auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); |
| auto cfd = cfh->cfd(); |
| |
| // Increment the ref count |
| mutex_.Lock(); |
| auto version = cfd->current(); |
| version->Ref(); |
| mutex_.Unlock(); |
| |
| auto s = version->GetPropertiesOfTablesInRange(range, n, props); |
| |
| // Decrement the ref count |
| mutex_.Lock(); |
| version->Unref(); |
| mutex_.Unlock(); |
| |
| return s; |
| } |
| |
| #endif // ROCKSDB_LITE |
| |
| const std::string& DBImpl::GetName() const { |
| return dbname_; |
| } |
| |
| Env* DBImpl::GetEnv() const { |
| return env_; |
| } |
| |
| Options DBImpl::GetOptions(ColumnFamilyHandle* column_family) const { |
| InstrumentedMutexLock l(&mutex_); |
| auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); |
| return Options(BuildDBOptions(immutable_db_options_, mutable_db_options_), |
| cfh->cfd()->GetLatestCFOptions()); |
| } |
| |
| DBOptions DBImpl::GetDBOptions() const { |
| InstrumentedMutexLock l(&mutex_); |
| return BuildDBOptions(immutable_db_options_, mutable_db_options_); |
| } |
| |
| bool DBImpl::GetProperty(ColumnFamilyHandle* column_family, |
| const Slice& property, std::string* value) { |
| const DBPropertyInfo* property_info = GetPropertyInfo(property); |
| value->clear(); |
| auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); |
| if (property_info == nullptr) { |
| return false; |
| } else if (property_info->handle_int) { |
| uint64_t int_value; |
| bool ret_value = |
| GetIntPropertyInternal(cfd, *property_info, false, &int_value); |
| if (ret_value) { |
| *value = ToString(int_value); |
| } |
| return ret_value; |
| } else if (property_info->handle_string) { |
| InstrumentedMutexLock l(&mutex_); |
| return cfd->internal_stats()->GetStringProperty(*property_info, property, |
| value); |
| } |
| // Shouldn't reach here since exactly one of handle_string and handle_int |
| // should be non-nullptr. |
| assert(false); |
| return false; |
| } |
| |
| bool DBImpl::GetMapProperty(ColumnFamilyHandle* column_family, |
| const Slice& property, |
| std::map<std::string, double>* value) { |
| const DBPropertyInfo* property_info = GetPropertyInfo(property); |
| value->clear(); |
| auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); |
| if (property_info == nullptr) { |
| return false; |
| } else if (property_info->handle_map) { |
| InstrumentedMutexLock l(&mutex_); |
| return cfd->internal_stats()->GetMapProperty(*property_info, property, |
| value); |
| } |
| // If we reach this point it means that handle_map is not provided for the |
| // requested property |
| return false; |
| } |
| |
| bool DBImpl::GetIntProperty(ColumnFamilyHandle* column_family, |
| const Slice& property, uint64_t* value) { |
| const DBPropertyInfo* property_info = GetPropertyInfo(property); |
| if (property_info == nullptr || property_info->handle_int == nullptr) { |
| return false; |
| } |
| auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); |
| return GetIntPropertyInternal(cfd, *property_info, false, value); |
| } |
| |
| bool DBImpl::GetIntPropertyInternal(ColumnFamilyData* cfd, |
| const DBPropertyInfo& property_info, |
| bool is_locked, uint64_t* value) { |
| assert(property_info.handle_int != nullptr); |
| if (!property_info.need_out_of_mutex) { |
| if (is_locked) { |
| mutex_.AssertHeld(); |
| return cfd->internal_stats()->GetIntProperty(property_info, value, this); |
| } else { |
| InstrumentedMutexLock l(&mutex_); |
| return cfd->internal_stats()->GetIntProperty(property_info, value, this); |
| } |
| } else { |
| SuperVersion* sv = nullptr; |
| if (!is_locked) { |
| sv = GetAndRefSuperVersion(cfd); |
| } else { |
| sv = cfd->GetSuperVersion(); |
| } |
| |
| bool ret = cfd->internal_stats()->GetIntPropertyOutOfMutex( |
| property_info, sv->current, value); |
| |
| if (!is_locked) { |
| ReturnAndCleanupSuperVersion(cfd, sv); |
| } |
| |
| return ret; |
| } |
| } |
| |
| #ifndef ROCKSDB_LITE |
| Status DBImpl::ResetStats() { |
| InstrumentedMutexLock l(&mutex_); |
| for (auto* cfd : *versions_->GetColumnFamilySet()) { |
| if (cfd->initialized()) { |
| cfd->internal_stats()->Clear(); |
| } |
| } |
| return Status::OK(); |
| } |
| #endif // ROCKSDB_LITE |
| |
| bool DBImpl::GetAggregatedIntProperty(const Slice& property, |
| uint64_t* aggregated_value) { |
| const DBPropertyInfo* property_info = GetPropertyInfo(property); |
| if (property_info == nullptr || property_info->handle_int == nullptr) { |
| return false; |
| } |
| |
| uint64_t sum = 0; |
| { |
| // Needs mutex to protect the list of column families. |
| InstrumentedMutexLock l(&mutex_); |
| uint64_t value; |
| for (auto* cfd : *versions_->GetColumnFamilySet()) { |
| if (!cfd->initialized()) { |
| continue; |
| } |
| if (GetIntPropertyInternal(cfd, *property_info, true, &value)) { |
| sum += value; |
| } else { |
| return false; |
| } |
| } |
| } |
| *aggregated_value = sum; |
| return true; |
| } |
| |
| SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) { |
| // TODO(ljin): consider using GetReferencedSuperVersion() directly |
| return cfd->GetThreadLocalSuperVersion(&mutex_); |
| } |
| |
| // REQUIRED: this function should only be called on the write thread or if the |
| // mutex is held. |
| SuperVersion* DBImpl::GetAndRefSuperVersion(uint32_t column_family_id) { |
| auto column_family_set = versions_->GetColumnFamilySet(); |
| auto cfd = column_family_set->GetColumnFamily(column_family_id); |
| if (!cfd) { |
| return nullptr; |
| } |
| |
| return GetAndRefSuperVersion(cfd); |
| } |
| |
| void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd, |
| SuperVersion* sv) { |
| bool unref_sv = !cfd->ReturnThreadLocalSuperVersion(sv); |
| |
| if (unref_sv) { |
| // Release SuperVersion |
| if (sv->Unref()) { |
| { |
| InstrumentedMutexLock l(&mutex_); |
| sv->Cleanup(); |
| } |
| delete sv; |
| RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS); |
| } |
| RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES); |
| } |
| } |
| |
| // REQUIRED: this function should only be called on the write thread. |
| void DBImpl::ReturnAndCleanupSuperVersion(uint32_t column_family_id, |
| SuperVersion* sv) { |
| auto column_family_set = versions_->GetColumnFamilySet(); |
| auto cfd = column_family_set->GetColumnFamily(column_family_id); |
| |
| // If SuperVersion is held, and we successfully fetched a cfd using |
| // GetAndRefSuperVersion(), it must still exist. |
| assert(cfd != nullptr); |
| ReturnAndCleanupSuperVersion(cfd, sv); |
| } |
| |
| // REQUIRED: this function should only be called on the write thread or if the |
| // mutex is held. |
| ColumnFamilyHandle* DBImpl::GetColumnFamilyHandle(uint32_t column_family_id) { |
| ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get(); |
| |
| if (!cf_memtables->Seek(column_family_id)) { |
| return nullptr; |
| } |
| |
| return cf_memtables->GetColumnFamilyHandle(); |
| } |
| |
| // REQUIRED: mutex is NOT held. |
| ColumnFamilyHandle* DBImpl::GetColumnFamilyHandleUnlocked( |
| uint32_t column_family_id) { |
| ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get(); |
| |
| InstrumentedMutexLock l(&mutex_); |
| |
| if (!cf_memtables->Seek(column_family_id)) { |
| return nullptr; |
| } |
| |
| return cf_memtables->GetColumnFamilyHandle(); |
| } |
| |
| void DBImpl::GetApproximateMemTableStats(ColumnFamilyHandle* column_family, |
| const Range& range, |
| uint64_t* const count, |
| uint64_t* const size) { |
| ColumnFamilyHandleImpl* cfh = |
| reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); |
| ColumnFamilyData* cfd = cfh->cfd(); |
| SuperVersion* sv = GetAndRefSuperVersion(cfd); |
| |
| // Convert user_key into a corresponding internal key. |
| InternalKey k1(range.start, kMaxSequenceNumber, kValueTypeForSeek); |
| InternalKey k2(range.limit, kMaxSequenceNumber, kValueTypeForSeek); |
| MemTable::MemTableStats memStats = |
| sv->mem->ApproximateStats(k1.Encode(), k2.Encode()); |
| MemTable::MemTableStats immStats = |
| sv->imm->ApproximateStats(k1.Encode(), k2.Encode()); |
| *count = memStats.count + immStats.count; |
| *size = memStats.size + immStats.size; |
| |
| ReturnAndCleanupSuperVersion(cfd, sv); |
| } |
| |
| void DBImpl::GetApproximateSizes(ColumnFamilyHandle* column_family, |
| const Range* range, int n, uint64_t* sizes, |
| uint8_t include_flags) { |
| assert(include_flags & DB::SizeApproximationFlags::INCLUDE_FILES || |
| include_flags & DB::SizeApproximationFlags::INCLUDE_MEMTABLES); |
| Version* v; |
| auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); |
| auto cfd = cfh->cfd(); |
| SuperVersion* sv = GetAndRefSuperVersion(cfd); |
| v = sv->current; |
| |
| for (int i = 0; i < n; i++) { |
| // Convert user_key into a corresponding internal key. |
| InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek); |
| InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek); |
| sizes[i] = 0; |
| if (include_flags & DB::SizeApproximationFlags::INCLUDE_FILES) { |
| sizes[i] += versions_->ApproximateSize(v, k1.Encode(), k2.Encode()); |
| } |
| if (include_flags & DB::SizeApproximationFlags::INCLUDE_MEMTABLES) { |
| sizes[i] += sv->mem->ApproximateStats(k1.Encode(), k2.Encode()).size; |
| sizes[i] += sv->imm->ApproximateStats(k1.Encode(), k2.Encode()).size; |
| } |
| } |
| |
| ReturnAndCleanupSuperVersion(cfd, sv); |
| } |
| |
| std::list<uint64_t>::iterator |
| DBImpl::CaptureCurrentFileNumberInPendingOutputs() { |
| // We need to remember the iterator of our insert, because after the |
| // background job is done, we need to remove that element from |
| // pending_outputs_. |
| pending_outputs_.push_back(versions_->current_next_file_number()); |
| auto pending_outputs_inserted_elem = pending_outputs_.end(); |
| --pending_outputs_inserted_elem; |
| return pending_outputs_inserted_elem; |
| } |
| |
| void DBImpl::ReleaseFileNumberFromPendingOutputs( |
| std::list<uint64_t>::iterator v) { |
| pending_outputs_.erase(v); |
| } |
| |
| #ifndef ROCKSDB_LITE |
| Status DBImpl::GetUpdatesSince( |
| SequenceNumber seq, unique_ptr<TransactionLogIterator>* iter, |
| const TransactionLogIterator::ReadOptions& read_options) { |
| |
| RecordTick(stats_, GET_UPDATES_SINCE_CALLS); |
| if (seq > versions_->LastSequence()) { |
| return Status::NotFound("Requested sequence not yet written in the db"); |
| } |
| return wal_manager_.GetUpdatesSince(seq, iter, read_options, versions_.get()); |
| } |
| |
| Status DBImpl::DeleteFile(std::string name) { |
| uint64_t number; |
| FileType type; |
| WalFileType log_type; |
| if (!ParseFileName(name, &number, &type, &log_type) || |
| (type != kTableFile && type != kLogFile)) { |
| ROCKS_LOG_ERROR(immutable_db_options_.info_log, "DeleteFile %s failed.\n", |
| name.c_str()); |
| return Status::InvalidArgument("Invalid file name"); |
| } |
| |
| Status status; |
| if (type == kLogFile) { |
| // Only allow deleting archived log files |
| if (log_type != kArchivedLogFile) { |
| ROCKS_LOG_ERROR(immutable_db_options_.info_log, |
| "DeleteFile %s failed - not archived log.\n", |
| name.c_str()); |
| return Status::NotSupported("Delete only supported for archived logs"); |
| } |
| status = |
| env_->DeleteFile(immutable_db_options_.wal_dir + "/" + name.c_str()); |
| if (!status.ok()) { |
| ROCKS_LOG_ERROR(immutable_db_options_.info_log, |
| "DeleteFile %s failed -- %s.\n", name.c_str(), |
| status.ToString().c_str()); |
| } |
| return status; |
| } |
| |
| int level; |
| FileMetaData* metadata; |
| ColumnFamilyData* cfd; |
| VersionEdit edit; |
| JobContext job_context(next_job_id_.fetch_add(1), true); |
| { |
| InstrumentedMutexLock l(&mutex_); |
| status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd); |
| if (!status.ok()) { |
| ROCKS_LOG_WARN(immutable_db_options_.info_log, |
| "DeleteFile %s failed. File not found\n", name.c_str()); |
| job_context.Clean(); |
| return Status::InvalidArgument("File not found"); |
| } |
| assert(level < cfd->NumberLevels()); |
| |
| // If the file is being compacted no need to delete. |
| if (metadata->being_compacted) { |
| ROCKS_LOG_INFO(immutable_db_options_.info_log, |
| "DeleteFile %s Skipped. File about to be compacted\n", |
| name.c_str()); |
| job_context.Clean(); |
| return Status::OK(); |
| } |
| |
| // Only the files in the last level can be deleted externally. |
| // This is to make sure that any deletion tombstones are not |
| // lost. Check that the level passed is the last level. |
| auto* vstoreage = cfd->current()->storage_info(); |
| for (int i = level + 1; i < cfd->NumberLevels(); i++) { |
| if (vstoreage->NumLevelFiles(i) != 0) { |
| ROCKS_LOG_WARN(immutable_db_options_.info_log, |
| "DeleteFile %s FAILED. File not in last level\n", |
| name.c_str()); |
| job_context.Clean(); |
| return Status::InvalidArgument("File not in last level"); |
| } |
| } |
| // if level == 0, it has to be the oldest file |
| if (level == 0 && |
| vstoreage->LevelFiles(0).back()->fd.GetNumber() != number) { |
| ROCKS_LOG_WARN(immutable_db_options_.info_log, |
| "DeleteFile %s failed ---" |
| " target file in level 0 must be the oldest.", |
| name.c_str()); |
| job_context.Clean(); |
| return Status::InvalidArgument("File in level 0, but not oldest"); |
| } |
| edit.SetColumnFamily(cfd->GetID()); |
| edit.DeleteFile(level, number); |
| status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), |
| &edit, &mutex_, directories_.GetDbDir()); |
| if (status.ok()) { |
| InstallSuperVersionAndScheduleWorkWrapper( |
| cfd, &job_context, *cfd->GetLatestMutableCFOptions()); |
| } |
| FindObsoleteFiles(&job_context, false); |
| } // lock released here |
| |
| LogFlush(immutable_db_options_.info_log); |
| // remove files outside the db-lock |
| if (job_context.HaveSomethingToDelete()) { |
| // Call PurgeObsoleteFiles() without holding mutex. |
| PurgeObsoleteFiles(job_context); |
| } |
| job_context.Clean(); |
| return status; |
| } |
| |
| Status DBImpl::DeleteFilesInRange(ColumnFamilyHandle* column_family, |
| const Slice* begin, const Slice* end) { |
| Status status; |
| auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); |
| ColumnFamilyData* cfd = cfh->cfd(); |
| VersionEdit edit; |
| std::vector<FileMetaData*> deleted_files; |
| JobContext job_context(next_job_id_.fetch_add(1), true); |
| { |
| InstrumentedMutexLock l(&mutex_); |
| Version* input_version = cfd->current(); |
| |
| auto* vstorage = input_version->storage_info(); |
| for (int i = 1; i < cfd->NumberLevels(); i++) { |
| if (vstorage->LevelFiles(i).empty() || |
| !vstorage->OverlapInLevel(i, begin, end)) { |
| continue; |
| } |
| std::vector<FileMetaData*> level_files; |
| InternalKey begin_storage, end_storage, *begin_key, *end_key; |
| if (begin == nullptr) { |
| begin_key = nullptr; |
| } else { |
| begin_storage.SetMaxPossibleForUserKey(*begin); |
| begin_key = &begin_storage; |
| } |
| if (end == nullptr) { |
| end_key = nullptr; |
| } else { |
| end_storage.SetMinPossibleForUserKey(*end); |
| end_key = &end_storage; |
| } |
| |
| vstorage->GetOverlappingInputs(i, begin_key, end_key, &level_files, -1, |
| nullptr, false); |
| FileMetaData* level_file; |
| for (uint32_t j = 0; j < level_files.size(); j++) { |
| level_file = level_files[j]; |
| if (((begin == nullptr) || |
| (cfd->internal_comparator().user_comparator()->Compare( |
| level_file->smallest.user_key(), *begin) >= 0)) && |
| ((end == nullptr) || |
| (cfd->internal_comparator().user_comparator()->Compare( |
| level_file->largest.user_key(), *end) <= 0))) { |
| if (level_file->being_compacted) { |
| continue; |
| } |
| edit.SetColumnFamily(cfd->GetID()); |
| edit.DeleteFile(i, level_file->fd.GetNumber()); |
| deleted_files.push_back(level_file); |
| level_file->being_compacted = true; |
| } |
| } |
| } |
| if (edit.GetDeletedFiles().empty()) { |
| job_context.Clean(); |
| return Status::OK(); |
| } |
| input_version->Ref(); |
| status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), |
| &edit, &mutex_, directories_.GetDbDir()); |
| if (status.ok()) { |
| InstallSuperVersionAndScheduleWorkWrapper( |
| cfd, &job_context, *cfd->GetLatestMutableCFOptions()); |
| } |
| for (auto* deleted_file : deleted_files) { |
| deleted_file->being_compacted = false; |
| } |
| input_version->Unref(); |
| FindObsoleteFiles(&job_context, false); |
| } // lock released here |
| |
| LogFlush(immutable_db_options_.info_log); |
| // remove files outside the db-lock |
| if (job_context.HaveSomethingToDelete()) { |
| // Call PurgeObsoleteFiles() without holding mutex. |
| PurgeObsoleteFiles(job_context); |
| } |
| job_context.Clean(); |
| return status; |
| } |
| |
| void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) { |
| InstrumentedMutexLock l(&mutex_); |
| versions_->GetLiveFilesMetaData(metadata); |
| } |
| |
| void DBImpl::GetColumnFamilyMetaData( |
| ColumnFamilyHandle* column_family, |
| ColumnFamilyMetaData* cf_meta) { |
| assert(column_family); |
| auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); |
| auto* sv = GetAndRefSuperVersion(cfd); |
| sv->current->GetColumnFamilyMetaData(cf_meta); |
| ReturnAndCleanupSuperVersion(cfd, sv); |
| } |
| |
| #endif // ROCKSDB_LITE |
| |
| Status DBImpl::CheckConsistency() { |
| mutex_.AssertHeld(); |
| std::vector<LiveFileMetaData> metadata; |
| versions_->GetLiveFilesMetaData(&metadata); |
| |
| std::string corruption_messages; |
| for (const auto& md : metadata) { |
| // md.name has a leading "/". |
| std::string file_path = md.db_path + md.name; |
| |
| uint64_t fsize = 0; |
| Status s = env_->GetFileSize(file_path, &fsize); |
| if (!s.ok() && |
| env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok()) { |
| s = Status::OK(); |
| } |
| if (!s.ok()) { |
| corruption_messages += |
| "Can't access " + md.name + ": " + s.ToString() + "\n"; |
| } else if (fsize != md.size) { |
| corruption_messages += "Sst file size mismatch: " + file_path + |
| ". Size recorded in manifest " + |
| ToString(md.size) + ", actual size " + |
| ToString(fsize) + "\n"; |
| } |
| } |
| if (corruption_messages.size() == 0) { |
| return Status::OK(); |
| } else { |
| return Status::Corruption(corruption_messages); |
| } |
| } |
| |
| Status DBImpl::GetDbIdentity(std::string& identity) const { |
| std::string idfilename = IdentityFileName(dbname_); |
| const EnvOptions soptions; |
| unique_ptr<SequentialFileReader> id_file_reader; |
| Status s; |
| { |
| unique_ptr<SequentialFile> idfile; |
| s = env_->NewSequentialFile(idfilename, &idfile, soptions); |
| if (!s.ok()) { |
| return s; |
| } |
| id_file_reader.reset(new SequentialFileReader(std::move(idfile))); |
| } |
| |
| uint64_t file_size; |
| s = env_->GetFileSize(idfilename, &file_size); |
| if (!s.ok()) { |
| return s; |
| } |
| char* buffer = reinterpret_cast<char*>(alloca(file_size)); |
| Slice id; |
| s = id_file_reader->Read(static_cast<size_t>(file_size), &id, buffer); |
| if (!s.ok()) { |
| return s; |
| } |
| identity.assign(id.ToString()); |
| // If last character is '\n' remove it from identity |
| if (identity.size() > 0 && identity.back() == '\n') { |
| identity.pop_back(); |
| } |
| return s; |
| } |
| |
| // Default implementation -- returns not supported status |
| Status DB::CreateColumnFamily(const ColumnFamilyOptions& cf_options, |
| const std::string& column_family_name, |
| ColumnFamilyHandle** handle) { |
| return Status::NotSupported(""); |
| } |
| |
| Status DB::CreateColumnFamilies( |
| const ColumnFamilyOptions& cf_options, |
| const std::vector<std::string>& column_family_names, |
| std::vector<ColumnFamilyHandle*>* handles) { |
| return Status::NotSupported(""); |
| } |
| |
| Status DB::CreateColumnFamilies( |
| const std::vector<ColumnFamilyDescriptor>& column_families, |
| std::vector<ColumnFamilyHandle*>* handles) { |
| return Status::NotSupported(""); |
| } |
| |
| Status DB::DropColumnFamily(ColumnFamilyHandle* column_family) { |
| return Status::NotSupported(""); |
| } |
| |
| Status DB::DropColumnFamilies( |
| const std::vector<ColumnFamilyHandle*>& column_families) { |
| return Status::NotSupported(""); |
| } |
| |
| Status DB::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) { |
| delete column_family; |
| return Status::OK(); |
| } |
| |
| DB::~DB() { } |
| |
| Status DB::ListColumnFamilies(const DBOptions& db_options, |
| const std::string& name, |
| std::vector<std::string>* column_families) { |
| return VersionSet::ListColumnFamilies(column_families, name, db_options.env); |
| } |
| |
| Snapshot::~Snapshot() { |
| } |
| |
| Status DestroyDB(const std::string& dbname, const Options& options) { |
| const ImmutableDBOptions soptions(SanitizeOptions(dbname, options)); |
| Env* env = soptions.env; |
| std::vector<std::string> filenames; |
| |
| // Ignore error in case directory does not exist |
| env->GetChildren(dbname, &filenames); |
| |
| FileLock* lock; |
| const std::string lockname = LockFileName(dbname); |
| Status result = env->LockFile(lockname, &lock); |
| if (result.ok()) { |
| uint64_t number; |
| FileType type; |
| InfoLogPrefix info_log_prefix(!soptions.db_log_dir.empty(), dbname); |
| for (size_t i = 0; i < filenames.size(); i++) { |
| if (ParseFileName(filenames[i], &number, info_log_prefix.prefix, &type) && |
| type != kDBLockFile) { // Lock file will be deleted at end |
| Status del; |
| std::string path_to_delete = dbname + "/" + filenames[i]; |
| if (type == kMetaDatabase) { |
| del = DestroyDB(path_to_delete, options); |
| } else if (type == kTableFile) { |
| del = DeleteSSTFile(&soptions, path_to_delete, 0); |
| } else { |
| del = env->DeleteFile(path_to_delete); |
| } |
| if (result.ok() && !del.ok()) { |
| result = del; |
| } |
| } |
| } |
| |
| for (size_t path_id = 0; path_id < options.db_paths.size(); path_id++) { |
| const auto& db_path = options.db_paths[path_id]; |
| env->GetChildren(db_path.path, &filenames); |
| for (size_t i = 0; i < filenames.size(); i++) { |
| if (ParseFileName(filenames[i], &number, &type) && |
| type == kTableFile) { // Lock file will be deleted at end |
| std::string table_path = db_path.path + "/" + filenames[i]; |
| Status del = DeleteSSTFile(&soptions, table_path, |
| static_cast<uint32_t>(path_id)); |
| if (result.ok() && !del.ok()) { |
| result = del; |
| } |
| } |
| } |
| } |
| |
| std::vector<std::string> walDirFiles; |
| std::string archivedir = ArchivalDirectory(dbname); |
| if (dbname != soptions.wal_dir) { |
| env->GetChildren(soptions.wal_dir, &walDirFiles); |
| archivedir = ArchivalDirectory(soptions.wal_dir); |
| } |
| |
| // Delete log files in the WAL dir |
| for (const auto& file : walDirFiles) { |
| if (ParseFileName(file, &number, &type) && type == kLogFile) { |
| Status del = env->DeleteFile(LogFileName(soptions.wal_dir, number)); |
| if (result.ok() && !del.ok()) { |
| result = del; |
| } |
| } |
| } |
| |
| std::vector<std::string> archiveFiles; |
| env->GetChildren(archivedir, &archiveFiles); |
| // Delete archival files. |
| for (size_t i = 0; i < archiveFiles.size(); ++i) { |
| if (ParseFileName(archiveFiles[i], &number, &type) && |
| type == kLogFile) { |
| Status del = env->DeleteFile(archivedir + "/" + archiveFiles[i]); |
| if (result.ok() && !del.ok()) { |
| result = del; |
| } |
| } |
| } |
| |
| // ignore case where no archival directory is present |
| env->DeleteDir(archivedir); |
| |
| env->UnlockFile(lock); // Ignore error since state is already gone |
| env->DeleteFile(lockname); |
| env->DeleteDir(dbname); // Ignore error in case dir contains other files |
| env->DeleteDir(soptions.wal_dir); |
| } |
| return result; |
| } |
| |
| Status DBImpl::WriteOptionsFile(bool need_mutex_lock, |
| bool need_enter_write_thread) { |
| #ifndef ROCKSDB_LITE |
| WriteThread::Writer w; |
| if (need_mutex_lock) { |
| mutex_.Lock(); |
| } else { |
| mutex_.AssertHeld(); |
| } |
| if (need_enter_write_thread) { |
| write_thread_.EnterUnbatched(&w, &mutex_); |
| } |
| |
| std::vector<std::string> cf_names; |
| std::vector<ColumnFamilyOptions> cf_opts; |
| |
| // This part requires mutex to protect the column family options |
| for (auto cfd : *versions_->GetColumnFamilySet()) { |
| if (cfd->IsDropped()) { |
| continue; |
| } |
| cf_names.push_back(cfd->GetName()); |
| cf_opts.push_back(cfd->GetLatestCFOptions()); |
| } |
| |
| // Unlock during expensive operations. New writes cannot get here |
| // because the single write thread ensures all new writes get queued. |
| DBOptions db_options = |
| BuildDBOptions(immutable_db_options_, mutable_db_options_); |
| mutex_.Unlock(); |
| |
| TEST_SYNC_POINT("DBImpl::WriteOptionsFile:1"); |
| TEST_SYNC_POINT("DBImpl::WriteOptionsFile:2"); |
| |
| std::string file_name = |
| TempOptionsFileName(GetName(), versions_->NewFileNumber()); |
| Status s = |
| PersistRocksDBOptions(db_options, cf_names, cf_opts, file_name, GetEnv()); |
| |
| if (s.ok()) { |
| s = RenameTempFileToOptionsFile(file_name); |
| } |
| // restore lock |
| if (!need_mutex_lock) { |
| mutex_.Lock(); |
| } |
| if (need_enter_write_thread) { |
| write_thread_.ExitUnbatched(&w); |
| } |
| if (!s.ok()) { |
| ROCKS_LOG_WARN(immutable_db_options_.info_log, |
| "Unnable to persist options -- %s", s.ToString().c_str()); |
| if (immutable_db_options_.fail_if_options_file_error) { |
| return Status::IOError("Unable to persist options.", |
| s.ToString().c_str()); |
| } |
| } |
| #endif // !ROCKSDB_LITE |
| return Status::OK(); |
| } |
| |
| #ifndef ROCKSDB_LITE |
| namespace { |
| void DeleteOptionsFilesHelper(const std::map<uint64_t, std::string>& filenames, |
| const size_t num_files_to_keep, |
| const std::shared_ptr<Logger>& info_log, |
| Env* env) { |
| if (filenames.size() <= num_files_to_keep) { |
| return; |
| } |
| for (auto iter = std::next(filenames.begin(), num_files_to_keep); |
| iter != filenames.end(); ++iter) { |
| if (!env->DeleteFile(iter->second).ok()) { |
| ROCKS_LOG_WARN(info_log, "Unable to delete options file %s", |
| iter->second.c_str()); |
| } |
| } |
| } |
| } // namespace |
| #endif // !ROCKSDB_LITE |
| |
| Status DBImpl::DeleteObsoleteOptionsFiles() { |
| #ifndef ROCKSDB_LITE |
| std::vector<std::string> filenames; |
| // use ordered map to store keep the filenames sorted from the newest |
| // to the oldest. |
| std::map<uint64_t, std::string> options_filenames; |
| Status s; |
| s = GetEnv()->GetChildren(GetName(), &filenames); |
| if (!s.ok()) { |
| return s; |
| } |
| for (auto& filename : filenames) { |
| uint64_t file_number; |
| FileType type; |
| if (ParseFileName(filename, &file_number, &type) && type == kOptionsFile) { |
| options_filenames.insert( |
| {std::numeric_limits<uint64_t>::max() - file_number, |
| GetName() + "/" + filename}); |
| } |
| } |
| |
| // Keeps the latest 2 Options file |
| const size_t kNumOptionsFilesKept = 2; |
| DeleteOptionsFilesHelper(options_filenames, kNumOptionsFilesKept, |
| immutable_db_options_.info_log, GetEnv()); |
| return Status::OK(); |
| #else |
| return Status::OK(); |
| #endif // !ROCKSDB_LITE |
| } |
| |
| Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) { |
| #ifndef ROCKSDB_LITE |
| Status s; |
| |
| versions_->options_file_number_ = versions_->NewFileNumber(); |
| std::string options_file_name = |
| OptionsFileName(GetName(), versions_->options_file_number_); |
| // Retry if the file name happen to conflict with an existing one. |
| s = GetEnv()->RenameFile(file_name, options_file_name); |
| |
| DeleteObsoleteOptionsFiles(); |
| return s; |
| #else |
| return Status::OK(); |
| #endif // !ROCKSDB_LITE |
| } |
| |
| #ifdef ROCKSDB_USING_THREAD_STATUS |
| |
| void DBImpl::NewThreadStatusCfInfo( |
| ColumnFamilyData* cfd) const { |
| if (immutable_db_options_.enable_thread_tracking) { |
| ThreadStatusUtil::NewColumnFamilyInfo(this, cfd, cfd->GetName(), |
| cfd->ioptions()->env); |
| } |
| } |
| |
| void DBImpl::EraseThreadStatusCfInfo( |
| ColumnFamilyData* cfd) const { |
| if (immutable_db_options_.enable_thread_tracking) { |
| ThreadStatusUtil::EraseColumnFamilyInfo(cfd); |
| } |
| } |
| |
| void DBImpl::EraseThreadStatusDbInfo() const { |
| if (immutable_db_options_.enable_thread_tracking) { |
| ThreadStatusUtil::EraseDatabaseInfo(this); |
| } |
| } |
| |
| #else |
| void DBImpl::NewThreadStatusCfInfo( |
| ColumnFamilyData* cfd) const { |
| } |
| |
| void DBImpl::EraseThreadStatusCfInfo( |
| ColumnFamilyData* cfd) const { |
| } |
| |
| void DBImpl::EraseThreadStatusDbInfo() const { |
| } |
| #endif // ROCKSDB_USING_THREAD_STATUS |
| |
| // |
| // A global method that can dump out the build version |
| void DumpRocksDBBuildVersion(Logger * log) { |
| #if !defined(IOS_CROSS_COMPILE) |
| // if we compile with Xcode, we don't run build_detect_version, so we don't |
| // generate util/build_version.cc |
| ROCKS_LOG_HEADER(log, "RocksDB version: %d.%d.%d\n", ROCKSDB_MAJOR, |
| ROCKSDB_MINOR, ROCKSDB_PATCH); |
| ROCKS_LOG_HEADER(log, "Git sha %s", rocksdb_build_git_sha); |
| ROCKS_LOG_HEADER(log, "Compile date %s", rocksdb_build_compile_date); |
| #endif |
| } |
| |
| #ifndef ROCKSDB_LITE |
| SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv, |
| bool include_history) { |
| // Find the earliest sequence number that we know we can rely on reading |
| // from the memtable without needing to check sst files. |
| SequenceNumber earliest_seq = |
| sv->imm->GetEarliestSequenceNumber(include_history); |
| if (earliest_seq == kMaxSequenceNumber) { |
| earliest_seq = sv->mem->GetEarliestSequenceNumber(); |
| } |
| assert(sv->mem->GetEarliestSequenceNumber() >= earliest_seq); |
| |
| return earliest_seq; |
| } |
| #endif // ROCKSDB_LITE |
| |
| #ifndef ROCKSDB_LITE |
| Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, |
| bool cache_only, SequenceNumber* seq, |
| bool* found_record_for_key, |
| bool* is_blob_index) { |
| Status s; |
| MergeContext merge_context; |
| RangeDelAggregator range_del_agg(sv->mem->GetInternalKeyComparator(), |
| kMaxSequenceNumber); |
| |
| ReadOptions read_options; |
| SequenceNumber current_seq = versions_->LastSequence(); |
| LookupKey lkey(key, current_seq); |
| |
| *seq = kMaxSequenceNumber; |
| *found_record_for_key = false; |
| |
| // Check if there is a record for this key in the latest memtable |
| sv->mem->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq, |
| read_options, is_blob_index); |
| |
| if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { |
| // unexpected error reading memtable. |
| ROCKS_LOG_ERROR(immutable_db_options_.info_log, |
| "Unexpected status returned from MemTable::Get: %s\n", |
| s.ToString().c_str()); |
| |
| return s; |
| } |
| |
| if (*seq != kMaxSequenceNumber) { |
| // Found a sequence number, no need to check immutable memtables |
| *found_record_for_key = true; |
| return Status::OK(); |
| } |
| |
| // Check if there is a record for this key in the immutable memtables |
| sv->imm->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq, |
| read_options, is_blob_index); |
| |
| if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { |
| // unexpected error reading memtable. |
| ROCKS_LOG_ERROR(immutable_db_options_.info_log, |
| "Unexpected status returned from MemTableList::Get: %s\n", |
| s.ToString().c_str()); |
| |
| return s; |
| } |
| |
| if (*seq != kMaxSequenceNumber) { |
| // Found a sequence number, no need to check memtable history |
| *found_record_for_key = true; |
| return Status::OK(); |
| } |
| |
| // Check if there is a record for this key in the immutable memtables |
| sv->imm->GetFromHistory(lkey, nullptr, &s, &merge_context, &range_del_agg, |
| seq, read_options, is_blob_index); |
| |
| if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { |
| // unexpected error reading memtable. |
| ROCKS_LOG_ERROR( |
| immutable_db_options_.info_log, |
| "Unexpected status returned from MemTableList::GetFromHistory: %s\n", |
| s.ToString().c_str()); |
| |
| return s; |
| } |
| |
| if (*seq != kMaxSequenceNumber) { |
| // Found a sequence number, no need to check SST files |
| *found_record_for_key = true; |
| return Status::OK(); |
| } |
| |
| // TODO(agiardullo): possible optimization: consider checking cached |
| // SST files if cache_only=true? |
| if (!cache_only) { |
| // Check tables |
| sv->current->Get(read_options, lkey, nullptr, &s, &merge_context, |
| &range_del_agg, nullptr /* value_found */, |
| found_record_for_key, seq, is_blob_index); |
| |
| if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { |
| // unexpected error reading SST files |
| ROCKS_LOG_ERROR(immutable_db_options_.info_log, |
| "Unexpected status returned from Version::Get: %s\n", |
| s.ToString().c_str()); |
| |
| return s; |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status DBImpl::IngestExternalFile( |
| ColumnFamilyHandle* column_family, |
| const std::vector<std::string>& external_files, |
| const IngestExternalFileOptions& ingestion_options) { |
| Status status; |
| auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); |
| auto cfd = cfh->cfd(); |
| |
| // Ingest should immediately fail if ingest_behind is requested, |
| // but the DB doesn't support it. |
| if (ingestion_options.ingest_behind) { |
| if (!immutable_db_options_.allow_ingest_behind) { |
| return Status::InvalidArgument( |
| "Can't ingest_behind file in DB with allow_ingest_behind=false"); |
| } |
| } |
| |
| ExternalSstFileIngestionJob ingestion_job(env_, versions_.get(), cfd, |
| immutable_db_options_, env_options_, |
| &snapshots_, ingestion_options); |
| |
| std::list<uint64_t>::iterator pending_output_elem; |
| { |
| InstrumentedMutexLock l(&mutex_); |
| if (!bg_error_.ok()) { |
| // Don't ingest files when there is a bg_error |
| return bg_error_; |
| } |
| |
| // Make sure that bg cleanup wont delete the files that we are ingesting |
| pending_output_elem = CaptureCurrentFileNumberInPendingOutputs(); |
| } |
| |
| status = ingestion_job.Prepare(external_files); |
| if (!status.ok()) { |
| return status; |
| } |
| |
| TEST_SYNC_POINT("DBImpl::AddFile:Start"); |
| { |
| // Lock db mutex |
| InstrumentedMutexLock l(&mutex_); |
| TEST_SYNC_POINT("DBImpl::AddFile:MutexLock"); |
| |
| // Stop writes to the DB by entering both write threads |
| WriteThread::Writer w; |
| write_thread_.EnterUnbatched(&w, &mutex_); |
| WriteThread::Writer nonmem_w; |
| if (concurrent_prepare_) { |
| nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); |
| } |
| |
| num_running_ingest_file_++; |
| |
| // We cannot ingest a file into a dropped CF |
| if (cfd->IsDropped()) { |
| status = Status::InvalidArgument( |
| "Cannot ingest an external file into a dropped CF"); |
| } |
| |
| // Figure out if we need to flush the memtable first |
| if (status.ok()) { |
| bool need_flush = false; |
| status = ingestion_job.NeedsFlush(&need_flush); |
| TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush", |
| &need_flush); |
| if (status.ok() && need_flush) { |
| mutex_.Unlock(); |
| status = FlushMemTable(cfd, FlushOptions(), true /* writes_stopped */); |
| mutex_.Lock(); |
| } |
| } |
| |
| // Run the ingestion job |
| if (status.ok()) { |
| status = ingestion_job.Run(); |
| } |
| |
| // Install job edit [Mutex will be unlocked here] |
| auto mutable_cf_options = cfd->GetLatestMutableCFOptions(); |
| if (status.ok()) { |
| status = |
| versions_->LogAndApply(cfd, *mutable_cf_options, ingestion_job.edit(), |
| &mutex_, directories_.GetDbDir()); |
| } |
| if (status.ok()) { |
| delete InstallSuperVersionAndScheduleWork(cfd, nullptr, |
| *mutable_cf_options); |
| } |
| |
| // Resume writes to the DB |
| if (concurrent_prepare_) { |
| nonmem_write_thread_.ExitUnbatched(&nonmem_w); |
| } |
| write_thread_.ExitUnbatched(&w); |
| |
| // Update stats |
| if (status.ok()) { |
| ingestion_job.UpdateStats(); |
| } |
| |
| ReleaseFileNumberFromPendingOutputs(pending_output_elem); |
| |
| num_running_ingest_file_--; |
| if (num_running_ingest_file_ == 0) { |
| bg_cv_.SignalAll(); |
| } |
| |
| TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock"); |
| } |
| // mutex_ is unlocked here |
| |
| // Cleanup |
| ingestion_job.Cleanup(status); |
| |
| if (status.ok()) { |
| NotifyOnExternalFileIngested(cfd, ingestion_job); |
| } |
| |
| return status; |
| } |
| |
| Status DBImpl::VerifyChecksum() { |
| Status s; |
| Options options; |
| EnvOptions env_options; |
| std::vector<ColumnFamilyData*> cfd_list; |
| { |
| InstrumentedMutexLock l(&mutex_); |
| for (auto cfd : *versions_->GetColumnFamilySet()) { |
| if (!cfd->IsDropped() && cfd->initialized()) { |
| cfd->Ref(); |
| cfd_list.push_back(cfd); |
| } |
| } |
| } |
| std::vector<SuperVersion*> sv_list; |
| for (auto cfd : cfd_list) { |
| sv_list.push_back(cfd->GetReferencedSuperVersion(&mutex_)); |
| } |
| for (auto& sv : sv_list) { |
| VersionStorageInfo* vstorage = sv->current->storage_info(); |
| for (int i = 0; i < vstorage->num_non_empty_levels() && s.ok(); i++) { |
| for (size_t j = 0; j < vstorage->LevelFilesBrief(i).num_files && s.ok(); |
| j++) { |
| const auto& fd = vstorage->LevelFilesBrief(i).files[j].fd; |
| std::string fname = TableFileName(immutable_db_options_.db_paths, |
| fd.GetNumber(), fd.GetPathId()); |
| s = rocksdb::VerifySstFileChecksum(options, env_options, fname); |
| } |
| } |
| if (!s.ok()) { |
| break; |
| } |
| } |
| { |
| InstrumentedMutexLock l(&mutex_); |
| for (auto sv : sv_list) { |
| if (sv && sv->Unref()) { |
| sv->Cleanup(); |
| delete sv; |
| } |
| } |
| for (auto cfd : cfd_list) { |
| cfd->Unref(); |
| } |
| } |
| return s; |
| } |
| |
| void DBImpl::NotifyOnExternalFileIngested( |
| ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job) { |
| #ifndef ROCKSDB_LITE |
| if (immutable_db_options_.listeners.empty()) { |
| return; |
| } |
| |
| for (const IngestedFileInfo& f : ingestion_job.files_to_ingest()) { |
| ExternalFileIngestionInfo info; |
| info.cf_name = cfd->GetName(); |
| info.external_file_path = f.external_file_path; |
| info.internal_file_path = f.internal_file_path; |
| info.global_seqno = f.assigned_seqno; |
| info.table_properties = f.table_properties; |
| for (auto listener : immutable_db_options_.listeners) { |
| listener->OnExternalFileIngested(this, info); |
| } |
| } |
| |
| #endif |
| } |
| |
| void DBImpl::WaitForIngestFile() { |
| mutex_.AssertHeld(); |
| while (num_running_ingest_file_ > 0) { |
| bg_cv_.Wait(); |
| } |
| } |
| |
| #endif // ROCKSDB_LITE |
| |
| } // namespace rocksdb |