| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // A DiskRowSet is a horizontal slice of a Kudu tablet. |
| // Each DiskRowSet contains data for a a disjoint set of keys. |
| // See src/kudu/tablet/README for a detailed description. |
| #pragma once |
| |
| #include <atomic> |
| #include <cstddef> |
| #include <cstdint> |
| #include <memory> |
| #include <mutex> |
| #include <string> |
| #include <vector> |
| |
| #include <glog/logging.h> |
| #include <gtest/gtest_prod.h> |
| |
| #include "kudu/common/rowid.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/fs/block_id.h" |
| #include "kudu/fs/fs_manager.h" |
| #include "kudu/gutil/macros.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/tablet/delta_key.h" |
| #include "kudu/tablet/delta_tracker.h" |
| #include "kudu/tablet/rowset.h" |
| #include "kudu/tablet/rowset_metadata.h" |
| #include "kudu/tablet/tablet_mem_trackers.h" |
| #include "kudu/tablet/tablet_metadata.h" |
| #include "kudu/util/bloom_filter.h" |
| #include "kudu/util/faststring.h" |
| #include "kudu/util/locks.h" |
| #include "kudu/util/make_shared.h" |
| #include "kudu/util/status.h" |
| |
| namespace kudu { |
| |
| class MonoTime; |
| class RowBlock; |
| class RowChangeList; |
| class RowwiseIterator; |
| class Timestamp; |
| |
| namespace cfile { |
| class BloomFileWriter; |
| class CFileWriter; |
| } |
| |
| namespace consensus { |
| class OpId; |
| } |
| |
| namespace fs { |
| class BlockCreationTransaction; |
| struct IOContext; |
| } |
| |
| namespace log { |
| class LogAnchorRegistry; |
| } |
| |
| namespace tablet { |
| |
| class CFileSet; |
| class CompactionInput; |
| class DeltaFileWriter; |
| class DeltaStats; |
| class HistoryGcOpts; |
| class MultiColumnWriter; |
| class Mutation; |
| class MvccSnapshot; |
| class OperationResultPB; |
| |
| class DiskRowSetWriter { |
| public: |
| // TODO: document ownership of rowset_metadata |
| DiskRowSetWriter(RowSetMetadata* rowset_metadata, const Schema* schema, |
| BloomFilterSizing bloom_sizing); |
| |
| ~DiskRowSetWriter(); |
| |
| Status Open(); |
| |
| // The block is written to all column writers as well as the bloom filter, |
| // if configured. |
| // Rows must be appended in ascending order. |
| // 'live_row_count' means the number of live rows in this input block. |
| Status AppendBlock(const RowBlock &block, int live_row_count = 0); |
| |
| // Closes the CFiles and their underlying writable blocks. |
| // If no rows were written, returns Status::Aborted(). |
| Status Finish(); |
| |
| // Closes the CFiles, finalizing the underlying blocks and releasing |
| // them to 'transaction'. If no rows were written, returns Status::Aborted(). |
| Status FinishAndReleaseBlocks(fs::BlockCreationTransaction* transaction); |
| |
| // The base DiskRowSetWriter never rolls. This method is necessary for tests |
| // which are templatized on the writer type. |
| Status RollIfNecessary() { return Status::OK(); } |
| |
| rowid_t written_count() const { |
| CHECK(finished_); |
| return written_count_; |
| } |
| |
| // Return the total number of bytes written so far to this DiskRowSet. |
| // Additional bytes may be written by "Finish()", but this should provide |
| // a reasonable estimate for the total data size. |
| size_t written_size() const; |
| |
| const Schema& schema() const { return *schema_; } |
| |
| private: |
| DISALLOW_COPY_AND_ASSIGN(DiskRowSetWriter); |
| |
| Status InitBloomFileWriter(); |
| |
| // Initializes the index writer required for compound keys |
| // this index is written to a new file instead of embedded in the col_* files |
| Status InitAdHocIndexWriter(); |
| |
| // Return the cfile::Writer responsible for writing the key index. |
| // (the ad-hoc writer for composite keys, otherwise the key column writer) |
| cfile::CFileWriter *key_index_writer(); |
| |
| RowSetMetadata* rowset_metadata_; |
| const Schema* const schema_; |
| |
| BloomFilterSizing bloom_sizing_; |
| |
| bool finished_; |
| rowid_t written_count_; |
| std::unique_ptr<MultiColumnWriter> col_writer_; |
| std::unique_ptr<cfile::BloomFileWriter> bloom_writer_; |
| std::unique_ptr<cfile::CFileWriter> ad_hoc_index_writer_; |
| |
| // The last encoded key written. |
| faststring last_encoded_key_; |
| }; |
| |
| |
| // Wrapper around DiskRowSetWriter which "rolls" to a new DiskRowSet after |
| // a certain amount of data has been written. Each output rowset is suffixed |
| // with ".N" where N starts at 0 and increases as new rowsets are generated. |
| // |
| // See AppendBlock(...) for important usage information. |
| class RollingDiskRowSetWriter { |
| public: |
| // Create a new rolling writer. The given 'tablet_metadata' must stay valid |
| // for the lifetime of this writer, and is used to construct the new rowsets |
| // that this RollingDiskRowSetWriter creates. |
| RollingDiskRowSetWriter(TabletMetadata* tablet_metadata, const Schema& schema, |
| BloomFilterSizing bloom_sizing, |
| size_t target_rowset_size); |
| ~RollingDiskRowSetWriter(); |
| |
| Status Open(); |
| |
| // The block is written to all column writers as well as the bloom filter, |
| // if configured. |
| // Rows must be appended in ascending order. |
| // |
| // NOTE: data must be appended in a particular order: for each set of rows |
| // you must append deltas using the APIs below *before* appending the block |
| // of rows that they correspond to. This ensures that the output delta files |
| // and data files are aligned. |
| // 'live_row_count' means the number of live rows in this input block. |
| Status AppendBlock(const RowBlock &block, int live_row_count = 0); |
| |
| // Appends a sequence of REDO deltas for the same row to the current redo |
| // delta file. 'row_idx_in_block' is the positional index after the last |
| // written block. The 'row_idx_in_drs' out parameter will be set with the row |
| // index from the start of the DiskRowSet currently being written. |
| Status AppendRedoDeltas(rowid_t row_idx_in_block, |
| Mutation* redo_delta_head, |
| rowid_t* row_idx_in_drs); |
| |
| // Appends a sequence of UNDO deltas for the same row to the current undo |
| // delta file. 'row_idx_in_block' is the positional index after the last |
| // written block. The 'row_idx_in_drs' out parameter will be set with the row |
| // index from the start of the DiskRowSet currently being written. |
| Status AppendUndoDeltas(rowid_t row_idx_in_block, |
| Mutation* undo_delta_head, |
| rowid_t* row_idx_in_drs); |
| |
| // Try to roll the output, if we've passed the configured threshold. This will |
| // only roll if called immediately after an AppendBlock() call. The implementation |
| // of AppendBlock() doesn't call it automatically, because it doesn't know if there |
| // is any more data to be appended. It is safe to call this in other circumstances -- |
| // it will be ignored if it is not a good time to roll. |
| Status RollIfNecessary(); |
| |
| Status Finish(); |
| |
| int64_t rows_written_count() const { return written_count_; } |
| |
| const Schema &schema() const { return schema_; } |
| |
| // Return the set of rowset paths that were written by this writer. |
| // This must only be called after Finish() returns an OK result. |
| void GetWrittenRowSetMetadata(RowSetMetadataVector* metas) const; |
| |
| uint64_t written_size() const { return written_size_; } |
| |
| int64_t drs_written_count() const { return written_drs_metas_.size(); } |
| |
| private: |
| Status RollWriter(); |
| |
| // Close the current DRS and delta writers, releasing their finished blocks |
| // into block_closer_. |
| Status FinishCurrentWriter(); |
| |
| template<DeltaType Type> |
| Status AppendDeltas(rowid_t row_idx_in_block, |
| Mutation* delta_head, |
| rowid_t* row_idx, |
| DeltaFileWriter* writer, |
| DeltaStats* delta_stats); |
| |
| enum State { |
| kInitialized, |
| kStarted, |
| kFinished |
| }; |
| State state_; |
| |
| TabletMetadata* tablet_metadata_; |
| const Schema schema_; |
| std::shared_ptr<RowSetMetadata> cur_drs_metadata_; |
| const BloomFilterSizing bloom_sizing_; |
| const size_t target_rowset_size_; |
| |
| std::unique_ptr<DiskRowSetWriter> cur_writer_; |
| |
| // A delta writer to store the undos for each DRS |
| std::unique_ptr<DeltaFileWriter> cur_undo_writer_; |
| std::unique_ptr<DeltaStats> cur_undo_delta_stats_; |
| // a delta writer to store the redos for each DRS |
| std::unique_ptr<DeltaFileWriter> cur_redo_writer_; |
| std::unique_ptr<DeltaStats> cur_redo_delta_stats_; |
| BlockId cur_undo_ds_block_id_; |
| BlockId cur_redo_ds_block_id_; |
| |
| uint64_t row_idx_in_cur_drs_; |
| |
| // True when we are allowed to roll. We can only roll when the delta writers |
| // and data writers are aligned (i.e. just after we've appended a new block of data). |
| bool can_roll_; |
| |
| // RowSetMetadata objects for diskrowsets which have been successfully |
| // written out. |
| RowSetMetadataVector written_drs_metas_; |
| |
| int64_t written_count_; |
| uint64_t written_size_; |
| |
| // Syncs and commits all writes of outstanding blocks when the rolling |
| // writer is destroyed. |
| std::unique_ptr<fs::BlockCreationTransaction> block_transaction_; |
| |
| DISALLOW_COPY_AND_ASSIGN(RollingDiskRowSetWriter); |
| }; |
| |
| // A rowset's disk-space-occupying components are as follows: |
| // - cfile set |
| // - base data |
| // - bloom file |
| // - ad hoc index |
| // - delta files |
| // - UNDO deltas |
| // - REDO deltas |
| // This struct is a container for the sizes of these components. |
| struct DiskRowSetSpace { |
| uint64_t base_data_size; |
| uint64_t bloom_size; |
| uint64_t ad_hoc_index_size; |
| uint64_t redo_deltas_size; |
| uint64_t undo_deltas_size; |
| |
| // Helper method to compute the size of the diskrowset's underlying cfile set. |
| uint64_t CFileSetOnDiskSize() { |
| return base_data_size + bloom_size + ad_hoc_index_size; |
| } |
| }; |
| |
| //////////////////////////////////////////////////////////// |
| // DiskRowSet |
| //////////////////////////////////////////////////////////// |
| |
| class MajorDeltaCompaction; |
| |
| class DiskRowSet : |
| public RowSet, |
| public enable_make_shared<DiskRowSet> { |
| public: |
| static const char *kMinKeyMetaEntryName; |
| static const char *kMaxKeyMetaEntryName; |
| |
| // Open a rowset from disk. |
| // If successful, sets *rowset to the newly open rowset |
| static Status Open(const std::shared_ptr<RowSetMetadata>& rowset_metadata, |
| log::LogAnchorRegistry* log_anchor_registry, |
| const TabletMemTrackers& mem_trackers, |
| const fs::IOContext* io_context, |
| std::shared_ptr<DiskRowSet> *rowset); |
| |
| //////////////////////////////////////////////////////////// |
| // "Management" functions |
| //////////////////////////////////////////////////////////// |
| |
| // Flush all accumulated delta data to disk. |
| Status FlushDeltas(const fs::IOContext* io_context) override; |
| |
| // Perform delta store minor compaction. |
| // This compacts the delta files down to a single one. |
| // If there is already only a single delta file, this does nothing. |
| Status MinorCompactDeltaStores(const fs::IOContext* io_context) override; |
| |
| //////////////////////////////////////////////////////////// |
| // RowSet implementation |
| //////////////////////////////////////////////////////////// |
| |
| //////////////////// |
| // Updates |
| //////////////////// |
| |
| // Update the given row. |
| // 'key' should be the key portion of the row -- i.e a contiguous |
| // encoding of the key columns. |
| Status MutateRow(Timestamp timestamp, |
| const RowSetKeyProbe &probe, |
| const RowChangeList &update, |
| const consensus::OpId& op_id, |
| const fs::IOContext* io_context, |
| ProbeStats* stats, |
| OperationResultPB* result) override; |
| |
| Status CheckRowPresent(const RowSetKeyProbe &probe, const fs::IOContext* io_context, |
| bool *present, ProbeStats* stats) const override; |
| |
| //////////////////// |
| // Read functions. |
| //////////////////// |
| virtual Status NewRowIterator(const RowIteratorOptions& opts, |
| std::unique_ptr<RowwiseIterator>* out) const override; |
| |
| virtual Status NewCompactionInput(const Schema* projection, |
| const MvccSnapshot &snap, |
| const fs::IOContext* io_context, |
| std::unique_ptr<CompactionInput>* out) const override; |
| |
| // Gets the number of rows in this rowset, checking 'num_rows_' first. If not |
| // yet set, consults the base data and stores the result in 'num_rows_'. |
| Status CountRows(const fs::IOContext* io_context, rowid_t *count) const final override; |
| |
| // Count the number of live rows in this DRS. |
| virtual Status CountLiveRows(uint64_t* count) const override; |
| |
| // See RowSet::GetBounds(...) |
| virtual Status GetBounds(std::string* min_encoded_key, |
| std::string* max_encoded_key) const override; |
| |
| void GetDiskRowSetSpaceUsage(DiskRowSetSpace* drss) const; |
| |
| uint64_t OnDiskSize() const override; |
| |
| uint64_t OnDiskBaseDataSize() const override; |
| |
| uint64_t OnDiskBaseDataColumnSize(const ColumnId& col_id) const override; |
| |
| uint64_t OnDiskBaseDataSizeWithRedos() const override; |
| |
| size_t DeltaMemStoreSize() const override; |
| |
| bool DeltaMemStoreInfo(size_t* size_bytes, MonoTime* creation_time) const override; |
| |
| bool DeltaMemStoreEmpty() const override; |
| |
| int64_t MinUnflushedLogIndex() const override; |
| |
| size_t CountDeltaStores() const; |
| |
| double DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType type) const override; |
| |
| Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancient_history_mark, |
| int64_t* bytes) override; |
| |
| Status IsDeletedAndFullyAncient(Timestamp ancient_history_mark, |
| bool* deleted_and_ancient) override; |
| |
| Status InitUndoDeltas(Timestamp ancient_history_mark, |
| MonoTime deadline, |
| const fs::IOContext* io_context, |
| int64_t* delta_blocks_initialized, |
| int64_t* bytes_in_ancient_undos) override; |
| |
| Status DeleteAncientUndoDeltas(Timestamp ancient_history_mark, const fs::IOContext* io_context, |
| int64_t* blocks_deleted, int64_t* bytes_deleted) override; |
| |
| // Major compacts all the delta files for all the columns. |
| Status MajorCompactDeltaStores(const fs::IOContext* io_context, HistoryGcOpts history_gc_opts); |
| |
| std::mutex *compact_flush_lock() override { |
| return &compact_flush_lock_; |
| } |
| |
| bool has_been_compacted() const override { |
| return has_been_compacted_.load(); |
| } |
| |
| void set_has_been_compacted() override { |
| has_been_compacted_.store(true); |
| } |
| |
| DeltaTracker *delta_tracker() { |
| return DCHECK_NOTNULL(delta_tracker_.get()); |
| } |
| |
| std::shared_ptr<RowSetMetadata> metadata() override { |
| return rowset_metadata_; |
| } |
| |
| std::string ToString() const override { |
| return rowset_metadata_->ToString(); |
| } |
| |
| std::string LogPrefix() const { |
| return strings::Substitute("T $0 P $1: $2: ", |
| rowset_metadata_->tablet_metadata()->tablet_id(), |
| rowset_metadata_->fs_manager()->uuid(), |
| ToString()); |
| } |
| |
| virtual Status DebugDump(std::vector<std::string> *lines) override; |
| |
| protected: |
| DiskRowSet(std::shared_ptr<RowSetMetadata> rowset_metadata, |
| log::LogAnchorRegistry* log_anchor_registry, |
| TabletMemTrackers mem_trackers); |
| |
| private: |
| FRIEND_TEST(TabletHistoryGcTest, TestMajorDeltaCompactionOnSubsetOfColumns); |
| FRIEND_TEST(TestCompaction, TestOneToOne); |
| FRIEND_TEST(TestRowSet, TestRowSetUpdate); |
| FRIEND_TEST(TestRowSet, TestDMSFlush); |
| |
| friend class CompactionInput; |
| friend class Tablet; |
| |
| Status Open(const fs::IOContext* io_context); |
| |
| // Create a new major delta compaction object to compact the specified columns. |
| Status NewMajorDeltaCompaction(const std::vector<ColumnId>& col_ids, |
| HistoryGcOpts history_gc_opts, |
| const fs::IOContext* io_context, |
| std::unique_ptr<MajorDeltaCompaction>* out) const; |
| |
| // Major compacts all the delta files for the specified columns. |
| Status MajorCompactDeltaStoresWithColumnIds(const std::vector<ColumnId>& col_ids, |
| const fs::IOContext* io_context, |
| HistoryGcOpts history_gc_opts); |
| |
| std::shared_ptr<RowSetMetadata> rowset_metadata_; |
| |
| bool open_; |
| |
| log::LogAnchorRegistry* log_anchor_registry_; |
| |
| TabletMemTrackers mem_trackers_; |
| |
| // Base data for this rowset. |
| mutable rw_spinlock component_lock_; |
| std::shared_ptr<CFileSet> base_data_; |
| std::unique_ptr<DeltaTracker> delta_tracker_; |
| |
| // Number of rows in the rowset. This may be unset (-1) if the rows in the |
| // underlying cfile set have not been counted yet. |
| mutable std::atomic<int64_t> num_rows_; |
| |
| // Lock governing this rowset's inclusion in a compact/flush. If locked, |
| // no other compactor will attempt to include this rowset. |
| std::mutex compact_flush_lock_; |
| |
| // Flag indicating whether the rowset has been removed from a rowset tree, |
| // and thus should not be scheduled for further compactions. |
| std::atomic<bool> has_been_compacted_; |
| |
| DISALLOW_COPY_AND_ASSIGN(DiskRowSet); |
| }; |
| |
| } // namespace tablet |
| } // namespace kudu |