| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/tablet/rowset_metadata.h" |
| |
| #include <algorithm> |
| #include <mutex> |
| #include <ostream> |
| #include <string> |
| #include <unordered_set> |
| #include <vector> |
| |
| #include <glog/stl_logging.h> |
| |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/tablet/metadata.pb.h" |
| |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace tablet { |
| |
| // ============================================================================ |
| // RowSet Metadata |
| // ============================================================================ |
| Status RowSetMetadata::Load(TabletMetadata* tablet_metadata, |
| const RowSetDataPB& pb, |
| unique_ptr<RowSetMetadata>* metadata) { |
| unique_ptr<RowSetMetadata> ret(new RowSetMetadata(tablet_metadata)); |
| RETURN_NOT_OK(ret->InitFromPB(pb)); |
| *metadata = std::move(ret); |
| return Status::OK(); |
| } |
| |
| Status RowSetMetadata::CreateNew(TabletMetadata* tablet_metadata, |
| int64_t id, |
| unique_ptr<RowSetMetadata>* metadata) { |
| metadata->reset(new RowSetMetadata(tablet_metadata, id)); |
| return Status::OK(); |
| } |
| |
| void RowSetMetadata::AddOrphanedBlocks(const BlockIdContainer& blocks) { |
| tablet_metadata_->AddOrphanedBlocks(blocks); |
| } |
| |
| Status RowSetMetadata::Flush() { |
| return tablet_metadata_->Flush(); |
| } |
| |
| Status RowSetMetadata::InitFromPB(const RowSetDataPB& pb) { |
| CHECK(!initted_); |
| |
| LoadFromPB(pb); |
| |
| initted_ = true; |
| return Status::OK(); |
| } |
| |
| void RowSetMetadata::LoadFromPB(const RowSetDataPB& pb) { |
| std::lock_guard<LockType> l(lock_); |
| id_ = pb.id(); |
| |
| // Load the min/max keys. |
| min_encoded_key_.reset(); |
| max_encoded_key_.reset(); |
| DCHECK_EQ(pb.has_min_encoded_key(), pb.has_max_encoded_key()); |
| if (pb.has_min_encoded_key() && pb.has_max_encoded_key()) { |
| min_encoded_key_ = pb.min_encoded_key(); |
| max_encoded_key_ = pb.max_encoded_key(); |
| } |
| |
| // Load Bloom File. |
| bloom_block_ = BlockId(); |
| if (pb.has_bloom_block()) { |
| bloom_block_ = BlockId::FromPB(pb.bloom_block()); |
| } |
| |
| // Load AdHoc Index File. |
| adhoc_index_block_ = BlockId(); |
| if (pb.has_adhoc_index_block()) { |
| adhoc_index_block_ = BlockId::FromPB(pb.adhoc_index_block()); |
| } |
| |
| // Load Column Files. |
| blocks_by_col_id_.clear(); |
| for (const ColumnDataPB& col_pb : pb.columns()) { |
| ColumnId col_id = ColumnId(col_pb.column_id()); |
| blocks_by_col_id_[col_id] = BlockId::FromPB(col_pb.block()); |
| } |
| |
| // Load redo delta files. |
| redo_delta_blocks_.clear(); |
| for (const DeltaDataPB& redo_delta_pb : pb.redo_deltas()) { |
| redo_delta_blocks_.push_back(BlockId::FromPB(redo_delta_pb.block())); |
| } |
| |
| last_durable_redo_dms_id_ = pb.last_durable_dms_id(); |
| |
| // Load undo delta files. |
| undo_delta_blocks_.clear(); |
| for (const DeltaDataPB& undo_delta_pb : pb.undo_deltas()) { |
| undo_delta_blocks_.push_back(BlockId::FromPB(undo_delta_pb.block())); |
| } |
| |
| // Load live row count. |
| if (tablet_metadata_->supports_live_row_count()) { |
| live_row_count_ = pb.live_row_count(); |
| } |
| } |
| |
| void RowSetMetadata::ToProtobuf(RowSetDataPB *pb) { |
| pb->set_id(id_); |
| |
| std::lock_guard<LockType> l(lock_); |
| |
| // Write Column Files |
| for (const ColumnIdToBlockIdMap::value_type& e : blocks_by_col_id_) { |
| ColumnId col_id = e.first; |
| const BlockId& block_id = e.second; |
| |
| ColumnDataPB *col_data = pb->add_columns(); |
| block_id.CopyToPB(col_data->mutable_block()); |
| col_data->set_column_id(col_id); |
| } |
| |
| // Write Delta Files |
| pb->set_last_durable_dms_id(last_durable_redo_dms_id_); |
| |
| for (const BlockId& redo_delta_block : redo_delta_blocks_) { |
| DeltaDataPB *redo_delta_pb = pb->add_redo_deltas(); |
| redo_delta_block.CopyToPB(redo_delta_pb->mutable_block()); |
| } |
| |
| for (const BlockId& undo_delta_block : undo_delta_blocks_) { |
| DeltaDataPB *undo_delta_pb = pb->add_undo_deltas(); |
| undo_delta_block.CopyToPB(undo_delta_pb->mutable_block()); |
| } |
| |
| // Write Bloom File |
| if (!bloom_block_.IsNull()) { |
| bloom_block_.CopyToPB(pb->mutable_bloom_block()); |
| } |
| |
| // Write AdHoc Index |
| if (!adhoc_index_block_.IsNull()) { |
| adhoc_index_block_.CopyToPB(pb->mutable_adhoc_index_block()); |
| } |
| |
| // Write the min/max keys. |
| if (has_encoded_keys_unlocked()) { |
| pb->set_min_encoded_key(*min_encoded_key_); |
| pb->set_max_encoded_key(*max_encoded_key_); |
| } |
| |
| // Write the live row count. |
| if (tablet_metadata_->supports_live_row_count()) { |
| pb->set_live_row_count(live_row_count_); |
| } |
| } |
| |
| const std::string RowSetMetadata::ToString() const { |
| return Substitute("RowSet($0)", id_); |
| } |
| |
| void RowSetMetadata::SetColumnDataBlocks(const std::map<ColumnId, BlockId>& blocks_by_col_id) { |
| ColumnIdToBlockIdMap new_map(blocks_by_col_id.begin(), blocks_by_col_id.end()); |
| new_map.shrink_to_fit(); |
| std::lock_guard<LockType> l(lock_); |
| blocks_by_col_id_ = std::move(new_map); |
| } |
| |
| Status RowSetMetadata::CommitRedoDeltaDataBlock(int64_t dms_id, |
| int64_t num_deleted_rows, |
| const BlockId& block_id) { |
| std::lock_guard<LockType> l(lock_); |
| last_durable_redo_dms_id_ = dms_id; |
| redo_delta_blocks_.push_back(block_id); |
| IncrementLiveRowsUnlocked(-num_deleted_rows); |
| return Status::OK(); |
| } |
| |
| Status RowSetMetadata::CommitUndoDeltaDataBlock(const BlockId& block_id) { |
| std::lock_guard<LockType> l(lock_); |
| undo_delta_blocks_.push_back(block_id); |
| return Status::OK(); |
| } |
| |
| void RowSetMetadata::CommitUpdate(const RowSetMetadataUpdate& update, |
| BlockIdContainer* removed) { |
| removed->clear(); |
| { |
| std::lock_guard<LockType> l(lock_); |
| |
| // Find the exact sequence of blocks to remove. |
| for (const auto& rep : update.replace_redo_blocks_) { |
| CHECK(!rep.to_remove.empty()); |
| |
| auto start_it = std::find(redo_delta_blocks_.begin(), |
| redo_delta_blocks_.end(), rep.to_remove[0]); |
| |
| auto end_it = start_it; |
| for (const BlockId& b : rep.to_remove) { |
| CHECK(end_it != redo_delta_blocks_.end() && *end_it == b) << |
| Substitute("Cannot find subsequence <$0> in <$1>", |
| BlockId::JoinStrings(rep.to_remove), |
| BlockId::JoinStrings(redo_delta_blocks_)); |
| ++end_it; |
| } |
| |
| removed->insert(removed->end(), start_it, end_it); |
| redo_delta_blocks_.erase(start_it, end_it); |
| redo_delta_blocks_.insert(start_it, rep.to_add.begin(), rep.to_add.end()); |
| } |
| |
| // Add new redo blocks |
| for (const BlockId& b : update.new_redo_blocks_) { |
| redo_delta_blocks_.push_back(b); |
| } |
| |
| // Remove undo blocks. |
| BlockIdSet undos_to_remove(update.remove_undo_blocks_.begin(), |
| update.remove_undo_blocks_.end()); |
| auto iter = undo_delta_blocks_.begin(); |
| while (iter != undo_delta_blocks_.end()) { |
| if (ContainsKey(undos_to_remove, *iter)) { |
| removed->push_back(*iter); |
| undos_to_remove.erase(*iter); |
| iter = undo_delta_blocks_.erase(iter); |
| } else { |
| ++iter; |
| } |
| } |
| CHECK(undos_to_remove.empty()) |
| << "Tablet " << tablet_metadata_->tablet_id() << " RowSet " << id_ << ": " |
| << "Attempted to remove an undo delta block from the RowSetMetadata that is not present. " |
| << "Removed: { " << *removed << " }; " |
| << "Failed to remove: { " |
| << vector<BlockId>(undos_to_remove.begin(), undos_to_remove.end()) |
| << " }"; |
| |
| if (!update.new_undo_block_.IsNull()) { |
| // Front-loading to keep the UNDO files in their natural order. |
| undo_delta_blocks_.insert(undo_delta_blocks_.begin(), update.new_undo_block_); |
| } |
| |
| for (const ColumnIdToBlockIdMap::value_type& e : update.cols_to_replace_) { |
| // If we are major-compacting deltas into a column which previously had no |
| // base-data (e.g. because it was newly added), then there will be no original |
| // block there to replace. |
| BlockId old_block_id; |
| if (UpdateReturnCopy(&blocks_by_col_id_, e.first, e.second, &old_block_id)) { |
| removed->push_back(old_block_id); |
| } |
| } |
| |
| for (const ColumnId& col_id : update.col_ids_to_remove_) { |
| BlockId old = FindOrDie(blocks_by_col_id_, col_id); |
| CHECK_EQ(1, blocks_by_col_id_.erase(col_id)); |
| removed->push_back(old); |
| } |
| } |
| |
| blocks_by_col_id_.shrink_to_fit(); |
| } |
| |
| void RowSetMetadata::IncrementLiveRowsUnlocked(int64_t row_count) { |
| if (tablet_metadata_->supports_live_row_count() && row_count != 0) { |
| live_row_count_ += row_count; |
| DCHECK_GE(live_row_count_, 0); |
| } |
| } |
| |
| void RowSetMetadata::IncrementLiveRows(int64_t row_count) { |
| std::lock_guard<LockType> l(lock_); |
| IncrementLiveRowsUnlocked(row_count); |
| } |
| |
| int64_t RowSetMetadata::live_row_count() const { |
| std::lock_guard<LockType> l(lock_); |
| DCHECK_GE(live_row_count_, 0); |
| return live_row_count_; |
| } |
| |
| BlockIdContainer RowSetMetadata::GetAllBlocks() const { |
| BlockIdContainer blocks; |
| blocks.reserve(blocks_by_col_id_.size() + |
| undo_delta_blocks_.size() + |
| redo_delta_blocks_.size() + |
| 2); // '2' is reserved for 'adhoc_index_block_' and 'bloom_block_' |
| std::lock_guard<LockType> l(lock_); |
| if (!adhoc_index_block_.IsNull()) { |
| blocks.push_back(adhoc_index_block_); |
| } |
| if (!bloom_block_.IsNull()) { |
| blocks.push_back(bloom_block_); |
| } |
| AppendValuesFromMap(blocks_by_col_id_, &blocks); |
| |
| blocks.insert(blocks.end(), |
| undo_delta_blocks_.begin(), undo_delta_blocks_.end()); |
| blocks.insert(blocks.end(), |
| redo_delta_blocks_.begin(), redo_delta_blocks_.end()); |
| return blocks; |
| } |
| |
| BlockId RowSetMetadata::GetMaxLiveBlockId() const { |
| BlockId max_block_id; |
| std::lock_guard<LockType> l(lock_); |
| if (!adhoc_index_block_.IsNull()) { |
| max_block_id = std::max(max_block_id, adhoc_index_block_); |
| } |
| if (!bloom_block_.IsNull()) { |
| max_block_id = std::max(max_block_id, bloom_block_); |
| } |
| for (const auto& e : blocks_by_col_id_) { |
| max_block_id = std::max(max_block_id, e.second); |
| } |
| for (const auto& block_id : undo_delta_blocks_) { |
| max_block_id = std::max(max_block_id, block_id); |
| } |
| for (const auto& block_id : redo_delta_blocks_) { |
| max_block_id = std::max(max_block_id, block_id); |
| } |
| return max_block_id; |
| } |
| |
| RowSetMetadataUpdate::RowSetMetadataUpdate() { |
| } |
| |
| RowSetMetadataUpdate::~RowSetMetadataUpdate() { |
| } |
| |
| RowSetMetadataUpdate& RowSetMetadataUpdate::ReplaceColumnId(ColumnId col_id, |
| const BlockId& block_id) { |
| InsertOrDie(&cols_to_replace_, col_id, block_id); |
| return *this; |
| } |
| |
| RowSetMetadataUpdate& RowSetMetadataUpdate::RemoveColumnId(ColumnId col_id) { |
| col_ids_to_remove_.push_back(col_id); |
| return *this; |
| } |
| |
| RowSetMetadataUpdate& RowSetMetadataUpdate::ReplaceRedoDeltaBlocks( |
| const std::vector<BlockId>& to_remove, |
| const std::vector<BlockId>& to_add) { |
| |
| ReplaceDeltaBlocks rdb = { to_remove, to_add }; |
| replace_redo_blocks_.push_back(rdb); |
| return *this; |
| } |
| |
| RowSetMetadataUpdate& RowSetMetadataUpdate::RemoveUndoDeltaBlocks( |
| const std::vector<BlockId>& to_remove) { |
| remove_undo_blocks_.insert(remove_undo_blocks_.end(), to_remove.begin(), to_remove.end()); |
| return *this; |
| } |
| |
| RowSetMetadataUpdate& RowSetMetadataUpdate::SetNewUndoBlock(const BlockId& undo_block) { |
| new_undo_block_ = undo_block; |
| return *this; |
| } |
| |
| } // namespace tablet |
| } // namespace kudu |