| // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| // |
| // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. See the AUTHORS file for names of contributors. |
| |
| #include "db/builder.h" |
| |
| #include <algorithm> |
| #include <deque> |
| #include <vector> |
| |
| #include "db/compaction_iterator.h" |
| #include "db/dbformat.h" |
| #include "db/event_helpers.h" |
| #include "db/internal_stats.h" |
| #include "db/merge_helper.h" |
| #include "db/table_cache.h" |
| #include "db/version_edit.h" |
| #include "monitoring/iostats_context_imp.h" |
| #include "monitoring/thread_status_util.h" |
| #include "rocksdb/db.h" |
| #include "rocksdb/env.h" |
| #include "rocksdb/iterator.h" |
| #include "rocksdb/options.h" |
| #include "rocksdb/table.h" |
| #include "table/block_based_table_builder.h" |
| #include "table/internal_iterator.h" |
| #include "util/file_reader_writer.h" |
| #include "util/filename.h" |
| #include "util/stop_watch.h" |
| #include "util/sync_point.h" |
| |
| namespace rocksdb { |
| |
| class TableFactory; |
| |
| TableBuilder* NewTableBuilder( |
| const ImmutableCFOptions& ioptions, |
| const InternalKeyComparator& internal_comparator, |
| const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>* |
| int_tbl_prop_collector_factories, |
| uint32_t column_family_id, const std::string& column_family_name, |
| WritableFileWriter* file, const CompressionType compression_type, |
| const CompressionOptions& compression_opts, int level, |
| const std::string* compression_dict, const bool skip_filters, |
| const uint64_t creation_time, const uint64_t oldest_key_time) { |
| assert((column_family_id == |
| TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == |
| column_family_name.empty()); |
| return ioptions.table_factory->NewTableBuilder( |
| TableBuilderOptions( |
| ioptions, internal_comparator, int_tbl_prop_collector_factories, |
| compression_type, compression_opts, compression_dict, skip_filters, |
| column_family_name, level, creation_time, oldest_key_time), |
| column_family_id, file); |
| } |
| |
| Status BuildTable( |
| const std::string& dbname, Env* env, const ImmutableCFOptions& ioptions, |
| const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, |
| TableCache* table_cache, InternalIterator* iter, |
| std::unique_ptr<InternalIterator> range_del_iter, FileMetaData* meta, |
| const InternalKeyComparator& internal_comparator, |
| const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>* |
| int_tbl_prop_collector_factories, |
| uint32_t column_family_id, const std::string& column_family_name, |
| std::vector<SequenceNumber> snapshots, |
| SequenceNumber earliest_write_conflict_snapshot, |
| const CompressionType compression, |
| const CompressionOptions& compression_opts, bool paranoid_file_checks, |
| InternalStats* internal_stats, TableFileCreationReason reason, |
| EventLogger* event_logger, int job_id, const Env::IOPriority io_priority, |
| TableProperties* table_properties, int level, const uint64_t creation_time, |
| const uint64_t oldest_key_time) { |
| assert((column_family_id == |
| TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == |
| column_family_name.empty()); |
| // Reports the IOStats for flush for every following bytes. |
| const size_t kReportFlushIOStatsEvery = 1048576; |
| Status s; |
| meta->fd.file_size = 0; |
| iter->SeekToFirst(); |
| std::unique_ptr<RangeDelAggregator> range_del_agg( |
| new RangeDelAggregator(internal_comparator, snapshots)); |
| s = range_del_agg->AddTombstones(std::move(range_del_iter)); |
| if (!s.ok()) { |
| // may be non-ok if a range tombstone key is unparsable |
| return s; |
| } |
| |
| std::string fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(), |
| meta->fd.GetPathId()); |
| #ifndef ROCKSDB_LITE |
| EventHelpers::NotifyTableFileCreationStarted( |
| ioptions.listeners, dbname, column_family_name, fname, job_id, reason); |
| #endif // !ROCKSDB_LITE |
| TableProperties tp; |
| |
| if (iter->Valid() || range_del_agg->ShouldAddTombstones()) { |
| TableBuilder* builder; |
| unique_ptr<WritableFileWriter> file_writer; |
| { |
| unique_ptr<WritableFile> file; |
| #ifndef NDEBUG |
| bool use_direct_writes = env_options.use_direct_writes; |
| TEST_SYNC_POINT_CALLBACK("BuildTable:create_file", &use_direct_writes); |
| #endif // !NDEBUG |
| s = NewWritableFile(env, fname, &file, env_options); |
| if (!s.ok()) { |
| EventHelpers::LogAndNotifyTableFileCreationFinished( |
| event_logger, ioptions.listeners, dbname, column_family_name, fname, |
| job_id, meta->fd, tp, reason, s); |
| return s; |
| } |
| file->SetIOPriority(io_priority); |
| |
| file_writer.reset(new WritableFileWriter(std::move(file), env_options, |
| ioptions.statistics)); |
| builder = NewTableBuilder( |
| ioptions, internal_comparator, int_tbl_prop_collector_factories, |
| column_family_id, column_family_name, file_writer.get(), compression, |
| compression_opts, level, nullptr /* compression_dict */, |
| false /* skip_filters */, creation_time, oldest_key_time); |
| } |
| |
| MergeHelper merge(env, internal_comparator.user_comparator(), |
| ioptions.merge_operator, nullptr, ioptions.info_log, |
| true /* internal key corruption is not ok */, |
| snapshots.empty() ? 0 : snapshots.back()); |
| |
| CompactionIterator c_iter( |
| iter, internal_comparator.user_comparator(), &merge, kMaxSequenceNumber, |
| &snapshots, earliest_write_conflict_snapshot, env, |
| true /* internal key corruption is not ok */, range_del_agg.get()); |
| c_iter.SeekToFirst(); |
| for (; c_iter.Valid(); c_iter.Next()) { |
| const Slice& key = c_iter.key(); |
| const Slice& value = c_iter.value(); |
| builder->Add(key, value); |
| meta->UpdateBoundaries(key, c_iter.ikey().sequence); |
| |
| // TODO(noetzli): Update stats after flush, too. |
| if (io_priority == Env::IO_HIGH && |
| IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) { |
| ThreadStatusUtil::SetThreadOperationProperty( |
| ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written)); |
| } |
| } |
| // nullptr for table_{min,max} so all range tombstones will be flushed |
| range_del_agg->AddToBuilder(builder, nullptr /* lower_bound */, |
| nullptr /* upper_bound */, meta); |
| |
| // Finish and check for builder errors |
| bool empty = builder->NumEntries() == 0; |
| s = c_iter.status(); |
| if (!s.ok() || empty) { |
| builder->Abandon(); |
| } else { |
| s = builder->Finish(); |
| } |
| |
| if (s.ok() && !empty) { |
| uint64_t file_size = builder->FileSize(); |
| meta->fd.file_size = file_size; |
| meta->marked_for_compaction = builder->NeedCompact(); |
| assert(meta->fd.GetFileSize() > 0); |
| tp = builder->GetTableProperties(); |
| if (table_properties) { |
| *table_properties = tp; |
| } |
| } |
| delete builder; |
| |
| // Finish and check for file errors |
| if (s.ok() && !empty) { |
| StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS); |
| s = file_writer->Sync(ioptions.use_fsync); |
| } |
| if (s.ok() && !empty) { |
| s = file_writer->Close(); |
| } |
| |
| if (s.ok() && !empty) { |
| // Verify that the table is usable |
| // We set for_compaction to false and don't OptimizeForCompactionTableRead |
| // here because this is a special case after we finish the table building |
| // No matter whether use_direct_io_for_flush_and_compaction is true, |
| // we will regrad this verification as user reads since the goal is |
| // to cache it here for further user reads |
| std::unique_ptr<InternalIterator> it(table_cache->NewIterator( |
| ReadOptions(), env_options, internal_comparator, meta->fd, |
| nullptr /* range_del_agg */, nullptr, |
| (internal_stats == nullptr) ? nullptr |
| : internal_stats->GetFileReadHist(0), |
| false /* for_compaction */, nullptr /* arena */, |
| false /* skip_filter */, level)); |
| s = it->status(); |
| if (s.ok() && paranoid_file_checks) { |
| for (it->SeekToFirst(); it->Valid(); it->Next()) { |
| } |
| s = it->status(); |
| } |
| } |
| } |
| |
| // Check for input iterator errors |
| if (!iter->status().ok()) { |
| s = iter->status(); |
| } |
| |
| if (!s.ok() || meta->fd.GetFileSize() == 0) { |
| env->DeleteFile(fname); |
| } |
| |
| // Output to event logger and fire events. |
| EventHelpers::LogAndNotifyTableFileCreationFinished( |
| event_logger, ioptions.listeners, dbname, column_family_name, fname, |
| job_id, meta->fd, tp, reason, s); |
| |
| return s; |
| } |
| |
| } // namespace rocksdb |