| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/tablet/delta_compaction.h" |
| |
| #include <cstdint> |
| #include <map> |
| #include <ostream> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include <glog/logging.h> |
| |
| #include "kudu/common/generic_iterators.h" |
| #include "kudu/common/iterator.h" |
| #include "kudu/common/row.h" |
| #include "kudu/common/row_changelist.h" |
| #include "kudu/common/rowblock.h" |
| #include "kudu/common/rowblock_memory.h" |
| #include "kudu/common/rowid.h" |
| #include "kudu/common/scan_spec.h" |
| #include "kudu/fs/block_manager.h" |
| #include "kudu/fs/fs_manager.h" |
| #include "kudu/gutil/casts.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/strings/join.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/tablet/cfile_set.h" |
| #include "kudu/tablet/compaction.h" |
| #include "kudu/tablet/delta_key.h" |
| #include "kudu/tablet/delta_stats.h" |
| #include "kudu/tablet/delta_tracker.h" |
| #include "kudu/tablet/deltafile.h" |
| #include "kudu/tablet/multi_column_writer.h" |
| #include "kudu/tablet/mutation.h" |
| #include "kudu/tablet/mvcc.h" |
| #include "kudu/tablet/rowset_metadata.h" |
| #include "kudu/util/memory/arena.h" |
| #include "kudu/util/trace.h" |
| |
| using kudu::fs::BlockCreationTransaction; |
| using kudu::fs::BlockManager; |
| using kudu::fs::CreateBlockOptions; |
| using kudu::fs::IOContext; |
| using kudu::fs::WritableBlock; |
| using std::shared_ptr; |
| using std::string; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace tablet { |
| namespace { |
| |
| const size_t kRowsPerBlock = 100; // Number of rows per block of columns |
| |
| } // anonymous namespace |
| |
| // TODO: can you major-delta-compact a new column after an alter table in order |
| // to materialize it? should write a test for this. |
| MajorDeltaCompaction::MajorDeltaCompaction( |
| FsManager* fs_manager, const Schema& base_schema, CFileSet* base_data, |
| unique_ptr<DeltaIterator> delta_iter, |
| vector<shared_ptr<DeltaStore> > included_stores, |
| vector<ColumnId> col_ids, |
| HistoryGcOpts history_gc_opts, |
| string tablet_id) |
| : fs_manager_(fs_manager), |
| base_schema_(base_schema), |
| column_ids_(std::move(col_ids)), |
| history_gc_opts_(std::move(history_gc_opts)), |
| base_data_(base_data), |
| included_stores_(std::move(included_stores)), |
| delta_iter_(std::move(delta_iter)), |
| tablet_id_(std::move(tablet_id)), |
| redo_delta_mutations_written_(0), |
| undo_delta_mutations_written_(0), |
| state_(kInitialized) { |
| CHECK(!column_ids_.empty()); |
| } |
| |
| MajorDeltaCompaction::~MajorDeltaCompaction() { |
| } |
| |
| string MajorDeltaCompaction::ColumnNamesToString() const { |
| vector<string> col_names; |
| col_names.reserve(column_ids_.size()); |
| for (ColumnId col_id : column_ids_) { |
| int col_idx = base_schema_.find_column_by_id(col_id); |
| if (col_idx != Schema::kColumnNotFound) { |
| col_names.push_back(base_schema_.column_by_id(col_id).ToString()); |
| } else { |
| col_names.push_back(Substitute("[deleted column id $0]", col_id)); |
| } |
| } |
| return JoinStrings(col_names, ", "); |
| } |
| |
| Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) { |
| CHECK_EQ(state_, kInitialized); |
| |
| unique_ptr<ColumnwiseIterator> old_base_data_cwise(base_data_->NewIterator(&partial_schema_, |
| io_context)); |
| unique_ptr<RowwiseIterator> old_base_data_rwise( |
| NewMaterializingIterator(std::move(old_base_data_cwise))); |
| |
| ScanSpec spec; |
| spec.set_cache_blocks(false); |
| RETURN_NOT_OK_PREPEND( |
| old_base_data_rwise->Init(&spec), |
| "Unable to open iterator for specified columns (" + partial_schema_.ToString() + ")"); |
| |
| RETURN_NOT_OK(delta_iter_->Init(&spec)); |
| RETURN_NOT_OK(delta_iter_->SeekToOrdinal(0)); |
| |
| RowBlockMemory mem(32 * 1024); |
| RowBlock block(&partial_schema_, kRowsPerBlock, &mem); |
| |
| DVLOG(1) << "Applying deltas and rewriting columns (" << partial_schema_.ToString() << ")"; |
| unique_ptr<DeltaStats> redo_stats(new DeltaStats); |
| unique_ptr<DeltaStats> undo_stats(new DeltaStats); |
| size_t nrows = 0; |
| // We know that we're reading everything from disk so we're including all transactions. |
| MvccSnapshot snap = MvccSnapshot::CreateSnapshotIncludingAllOps(); |
| while (old_base_data_rwise->HasNext()) { |
| |
| // 1) Get the next batch of base data for the columns we're compacting. |
| mem.Reset(); |
| RETURN_NOT_OK(old_base_data_rwise->NextBlock(&block)); |
| size_t n = block.nrows(); |
| |
| // 2) Fetch all the REDO mutations. |
| vector<Mutation *> redo_mutation_block(kRowsPerBlock, static_cast<Mutation *>(nullptr)); |
| RETURN_NOT_OK(delta_iter_->PrepareBatch(n, DeltaIterator::PREPARE_FOR_COLLECT)); |
| RETURN_NOT_OK(delta_iter_->CollectMutations(&redo_mutation_block, block.arena())); |
| |
| // 3) Write new UNDO mutations for the current block. The REDO mutations |
| // are written out in step 6. |
| vector<CompactionInputRow> input_rows; |
| input_rows.resize(block.nrows()); |
| for (int i = 0; i < block.nrows(); i++) { |
| CompactionInputRow* input_row = &input_rows[i]; |
| input_row->row.Reset(&block, i); |
| input_row->redo_head = redo_mutation_block[i]; |
| Mutation::ReverseMutationList(&input_row->redo_head); |
| input_row->undo_head = nullptr; |
| |
| RowBlockRow dst_row = block.row(i); |
| RETURN_NOT_OK(CopyRow(input_row->row, &dst_row, static_cast<Arena*>(nullptr))); |
| |
| Mutation* new_undos_head = nullptr; |
| // We're ignoring the result from new_redos_head because we'll find them |
| // later at step 5. |
| Mutation* new_redos_head = nullptr; |
| |
| // Since this is a delta compaction the input and output row id's are the same. |
| rowid_t row_id = nrows + input_row->row.row_index(); |
| |
| DVLOG(3) << "MDC Input Row - RowId: " << row_id << " " |
| << CompactionInputRowToString(*input_row); |
| |
| // NOTE: This is presently ignored. |
| bool is_garbage_collected; |
| |
| RETURN_NOT_OK(ApplyMutationsAndGenerateUndos( |
| snap, *input_row, &new_undos_head, &new_redos_head, &mem.arena, &dst_row)); |
| |
| RemoveAncientUndos(history_gc_opts_, |
| &new_undos_head, |
| new_redos_head, |
| &is_garbage_collected); |
| |
| DVLOG(3) << "MDC Output Row - RowId: " << row_id << " " |
| << RowToString(dst_row, new_undos_head, new_redos_head); |
| |
| // We only create a new undo delta file if we need to. |
| if (new_undos_head != nullptr && !new_undo_delta_writer_) { |
| RETURN_NOT_OK(OpenUndoDeltaFileWriter()); |
| } |
| for (const Mutation *mut = new_undos_head; mut != nullptr; mut = mut->next()) { |
| DeltaKey undo_key(nrows + dst_row.row_index(), mut->timestamp()); |
| RETURN_NOT_OK(new_undo_delta_writer_->AppendDelta<UNDO>(undo_key, mut->changelist())); |
| undo_stats->UpdateStats(mut->timestamp(), mut->changelist()); |
| undo_delta_mutations_written_++; |
| } |
| } |
| |
| // 4) Write the new base data. |
| RETURN_NOT_OK(base_data_writer_->AppendBlock(block)); |
| |
| // 5) Remove the columns that we've done our major REDO delta compaction on |
| // from this delta flush, except keep all the delete and reinsert |
| // mutations. |
| mem.Reset(); |
| vector<DeltaKeyAndUpdate> out; |
| RETURN_NOT_OK(delta_iter_->FilterColumnIdsAndCollectDeltas(column_ids_, &out, &mem.arena)); |
| |
| // We only create a new redo delta file if we need to. |
| if (!out.empty() && !new_redo_delta_writer_) { |
| RETURN_NOT_OK(OpenRedoDeltaFileWriter()); |
| } |
| |
| // 6) Write the remaining REDO deltas that we haven't compacted away back |
| // into a REDO delta file. |
| for (const DeltaKeyAndUpdate& key_and_update : out) { |
| RowChangeList update(key_and_update.cell); |
| DVLOG(4) << "Keeping delta as REDO: " |
| << key_and_update.Stringify(DeltaType::REDO, base_schema_); |
| RETURN_NOT_OK_PREPEND(new_redo_delta_writer_->AppendDelta<REDO>(key_and_update.key, update), |
| "Failed to append a delta"); |
| WARN_NOT_OK(redo_stats->UpdateStats(key_and_update.key.timestamp(), update), |
| "Failed to update stats"); |
| } |
| redo_delta_mutations_written_ += out.size(); |
| nrows += n; |
| } |
| |
| BlockManager* bm = fs_manager_->block_manager(); |
| unique_ptr<BlockCreationTransaction> transaction = bm->NewCreationTransaction(); |
| RETURN_NOT_OK(base_data_writer_->FinishAndReleaseBlocks(transaction.get())); |
| |
| if (redo_delta_mutations_written_ > 0) { |
| new_redo_delta_writer_->WriteDeltaStats(std::move(redo_stats)); |
| RETURN_NOT_OK(new_redo_delta_writer_->FinishAndReleaseBlock(transaction.get())); |
| } |
| |
| if (undo_delta_mutations_written_ > 0) { |
| new_undo_delta_writer_->WriteDeltaStats(std::move(undo_stats)); |
| RETURN_NOT_OK(new_undo_delta_writer_->FinishAndReleaseBlock(transaction.get())); |
| } |
| transaction->CommitCreatedBlocks(); |
| |
| DVLOG(1) << "Applied all outstanding deltas for columns " |
| << partial_schema_.ToString() |
| << ", and flushed the resulting rowsets and a total of " |
| << redo_delta_mutations_written_ |
| << " REDO delta mutations and " |
| << undo_delta_mutations_written_ |
| << " UNDO delta mutations to disk."; |
| |
| state_ = kFinished; |
| return Status::OK(); |
| } |
| |
| Status MajorDeltaCompaction::OpenBaseDataWriter() { |
| CHECK(!base_data_writer_); |
| |
| unique_ptr<MultiColumnWriter> w(new MultiColumnWriter(fs_manager_, |
| &partial_schema_, |
| tablet_id_)); |
| RETURN_NOT_OK(w->Open()); |
| base_data_writer_ = std::move(w); |
| return Status::OK(); |
| } |
| |
| Status MajorDeltaCompaction::OpenRedoDeltaFileWriter() { |
| unique_ptr<WritableBlock> block; |
| CreateBlockOptions opts({ tablet_id_ }); |
| RETURN_NOT_OK_PREPEND(fs_manager_->CreateNewBlock(opts, &block), |
| "Unable to create REDO delta output block"); |
| new_redo_delta_block_ = block->id(); |
| new_redo_delta_writer_.reset(new DeltaFileWriter(std::move(block))); |
| return new_redo_delta_writer_->Start(); |
| } |
| |
| Status MajorDeltaCompaction::OpenUndoDeltaFileWriter() { |
| unique_ptr<WritableBlock> block; |
| CreateBlockOptions opts({ tablet_id_ }); |
| RETURN_NOT_OK_PREPEND(fs_manager_->CreateNewBlock(opts, &block), |
| "Unable to create UNDO delta output block"); |
| new_undo_delta_block_ = block->id(); |
| new_undo_delta_writer_.reset(new DeltaFileWriter(std::move(block))); |
| return new_undo_delta_writer_->Start(); |
| } |
| |
| namespace { |
| struct DeltaStoreStats { |
| int64_t delete_count; |
| int64_t reinsert_count; |
| int64_t update_count; |
| |
| public: |
| string ToString() const { |
| return Substitute("delete_count=$0, reinsert_count=$1, update_count=$2", |
| delete_count, reinsert_count, update_count); |
| } |
| }; |
| |
| DeltaStoreStats ComputeDeltaStoreStats(const SharedDeltaStoreVector& stores) { |
| int64_t delete_count = 0; |
| int64_t reinsert_count = 0; |
| int64_t update_count = 0; |
| for (const auto& store : stores) { |
| if (!store->Initted()) { |
| continue; |
| } |
| const auto& stats = store->delta_stats(); |
| delete_count += stats.delete_count(); |
| reinsert_count += stats.reinsert_count(); |
| update_count += stats.UpdateCount(); |
| } |
| return DeltaStoreStats{delete_count, reinsert_count, update_count}; |
| } |
| } // anonymous namespace |
| |
| Status MajorDeltaCompaction::Compact(const IOContext* io_context) { |
| CHECK_EQ(state_, kInitialized); |
| |
| VLOG(1) << "Starting major delta compaction for columns " << ColumnNamesToString(); |
| RETURN_NOT_OK(base_schema_.CreateProjectionByIdsIgnoreMissing(column_ids_, &partial_schema_)); |
| |
| if (VLOG_IS_ON(1)) { |
| for (const auto& ds : included_stores_) { |
| VLOG(1) << "Preparing to major compact delta file: " << ds->ToString(); |
| } |
| } |
| |
| // We defer calling OpenRedoDeltaFileWriter() since we might not need to flush. |
| RETURN_NOT_OK(OpenBaseDataWriter()); |
| RETURN_NOT_OK(FlushRowSetAndDeltas(io_context)); |
| |
| TRACE_COUNTER_INCREMENT("delta_blocks_compacted", included_stores_.size()); |
| const auto stats = ComputeDeltaStoreStats(included_stores_); |
| TRACE_COUNTER_INCREMENT("delete_count", stats.delete_count); |
| TRACE_COUNTER_INCREMENT("reinsert_count", stats.reinsert_count); |
| TRACE_COUNTER_INCREMENT("update_count", stats.update_count); |
| VLOG(1) << Substitute("Finished major delta compaction of columns $0. " |
| "Compacted $1 delta files. Overall stats: $2", |
| ColumnNamesToString(), |
| included_stores_.size(), |
| stats.ToString()); |
| return Status::OK(); |
| } |
| |
| void MajorDeltaCompaction::CreateMetadataUpdate( |
| RowSetMetadataUpdate* update) { |
| CHECK(update); |
| CHECK_EQ(state_, kFinished); |
| |
| vector<BlockId> compacted_delta_blocks; |
| for (const shared_ptr<DeltaStore>& store : included_stores_) { |
| DeltaFileReader* dfr = down_cast<DeltaFileReader*>(store.get()); |
| compacted_delta_blocks.push_back(dfr->block_id()); |
| } |
| |
| vector<BlockId> new_delta_blocks; |
| if (redo_delta_mutations_written_ > 0) { |
| new_delta_blocks.push_back(new_redo_delta_block_); |
| } |
| |
| update->ReplaceRedoDeltaBlocks(compacted_delta_blocks, |
| new_delta_blocks); |
| |
| if (undo_delta_mutations_written_ > 0) { |
| update->SetNewUndoBlock(new_undo_delta_block_); |
| } |
| |
| // Replace old column blocks with new ones |
| std::map<ColumnId, BlockId> new_column_blocks; |
| base_data_writer_->GetFlushedBlocksByColumnId(&new_column_blocks); |
| |
| // NOTE: in the case that one of the columns being compacted is deleted, |
| // we may have fewer elements in new_column_blocks compared to 'column_ids'. |
| // For those deleted columns, we just remove the old column data. |
| CHECK_LE(new_column_blocks.size(), column_ids_.size()); |
| |
| for (ColumnId col_id : column_ids_) { |
| BlockId new_block; |
| if (FindCopy(new_column_blocks, col_id, &new_block)) { |
| update->ReplaceColumnId(col_id, new_block); |
| } else { |
| // The column has been deleted. |
| // If the base data has a block for this column, we need to remove it. |
| // NOTE: It's possible that the base data has no data for this column in the |
| // case that the column was added and removed in succession after the base |
| // data was flushed. |
| CHECK_EQ(base_schema_.find_column_by_id(col_id), Schema::kColumnNotFound) |
| << "major compaction removing column " << col_id << " but still present in Schema!"; |
| if (base_data_->has_data_for_column_id(col_id)) { |
| update->RemoveColumnId(col_id); |
| } |
| } |
| } |
| } |
| |
| // We're called under diskrowset's component_lock_ and delta_tracker's compact_flush_lock_ |
| // so both AtomicUpdateStores calls can be done separately and still be seen as one atomic |
| // operation. |
| Status MajorDeltaCompaction::UpdateDeltaTracker(DeltaTracker* tracker, |
| const IOContext* io_context) { |
| CHECK_EQ(state_, kFinished); |
| |
| // 1. Get all the necessary I/O out of the way. It's OK to fail here |
| // because we haven't updated any in-memory state. |
| // |
| // TODO(awong): pull the OpenDeltaReaders() calls out of the critical path of |
| // diskrowset's component_lock_. They touch disk and may block other |
| // diskrowset operations. |
| |
| // Create blocks for the new redo deltas. |
| vector<DeltaBlockIdAndStats> new_redo_blocks; |
| if (redo_delta_mutations_written_ > 0) { |
| new_redo_blocks.emplace_back(std::make_pair(new_redo_delta_block_, |
| new_redo_delta_writer_->release_delta_stats())); |
| } |
| SharedDeltaStoreVector new_redo_stores; |
| RETURN_NOT_OK(tracker->OpenDeltaReaders(std::move(new_redo_blocks), io_context, |
| &new_redo_stores, REDO)); |
| |
| // Create blocks for the new undo deltas. |
| SharedDeltaStoreVector new_undo_stores; |
| if (undo_delta_mutations_written_ > 0) { |
| vector<DeltaBlockIdAndStats> new_undo_blocks; |
| new_undo_blocks.emplace_back(std::make_pair(new_undo_delta_block_, |
| new_undo_delta_writer_->release_delta_stats())); |
| RETURN_NOT_OK(tracker->OpenDeltaReaders(std::move(new_undo_blocks), io_context, |
| &new_undo_stores, UNDO)); |
| } |
| |
| // 2. Only now that we cannot fail do we update the in-memory state. |
| |
| // Even if we didn't create any new redo blocks, we still need to update the |
| // tracker so it removes the included_stores_. |
| tracker->AtomicUpdateStores(included_stores_, new_redo_stores, io_context, REDO); |
| |
| // We only call AtomicUpdateStores() for UNDOs if we wrote UNDOs. We're not |
| // removing stores so we don't need to call it otherwise. |
| if (!new_undo_stores.empty()) { |
| tracker->AtomicUpdateStores({}, new_undo_stores, io_context, UNDO); |
| } |
| return Status::OK(); |
| } |
| |
| } // namespace tablet |
| } // namespace kudu |