blob: 718710e315e8adc74e723427ff32ee64cd16d877 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/rowset_metadata.h"
#include <algorithm>
#include <mutex>
#include <ostream>
#include <string>
#include <unordered_set>
#include <vector>
#include <glog/stl_logging.h>
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/map-util.h"
#include "kudu/tablet/metadata.pb.h"
using std::unique_ptr;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace tablet {
// ============================================================================
// RowSet Metadata
// ============================================================================
Status RowSetMetadata::Load(TabletMetadata* tablet_metadata,
const RowSetDataPB& pb,
unique_ptr<RowSetMetadata>* metadata) {
unique_ptr<RowSetMetadata> ret(new RowSetMetadata(tablet_metadata));
RETURN_NOT_OK(ret->InitFromPB(pb));
*metadata = std::move(ret);
return Status::OK();
}
Status RowSetMetadata::CreateNew(TabletMetadata* tablet_metadata,
int64_t id,
unique_ptr<RowSetMetadata>* metadata) {
metadata->reset(new RowSetMetadata(tablet_metadata, id));
return Status::OK();
}
void RowSetMetadata::AddOrphanedBlocks(const BlockIdContainer& blocks) {
tablet_metadata_->AddOrphanedBlocks(blocks);
}
Status RowSetMetadata::Flush() {
return tablet_metadata_->Flush();
}
Status RowSetMetadata::InitFromPB(const RowSetDataPB& pb) {
CHECK(!initted_);
LoadFromPB(pb);
initted_ = true;
return Status::OK();
}
void RowSetMetadata::LoadFromPB(const RowSetDataPB& pb) {
std::lock_guard<LockType> l(lock_);
id_ = pb.id();
// Load the min/max keys.
min_encoded_key_.reset();
max_encoded_key_.reset();
DCHECK_EQ(pb.has_min_encoded_key(), pb.has_max_encoded_key());
if (pb.has_min_encoded_key() && pb.has_max_encoded_key()) {
min_encoded_key_ = pb.min_encoded_key();
max_encoded_key_ = pb.max_encoded_key();
}
// Load Bloom File.
bloom_block_ = BlockId();
if (pb.has_bloom_block()) {
bloom_block_ = BlockId::FromPB(pb.bloom_block());
}
// Load AdHoc Index File.
adhoc_index_block_ = BlockId();
if (pb.has_adhoc_index_block()) {
adhoc_index_block_ = BlockId::FromPB(pb.adhoc_index_block());
}
// Load Column Files.
blocks_by_col_id_.clear();
for (const ColumnDataPB& col_pb : pb.columns()) {
ColumnId col_id = ColumnId(col_pb.column_id());
blocks_by_col_id_[col_id] = BlockId::FromPB(col_pb.block());
}
// Load redo delta files.
redo_delta_blocks_.clear();
for (const DeltaDataPB& redo_delta_pb : pb.redo_deltas()) {
redo_delta_blocks_.push_back(BlockId::FromPB(redo_delta_pb.block()));
}
last_durable_redo_dms_id_ = pb.last_durable_dms_id();
// Load undo delta files.
undo_delta_blocks_.clear();
for (const DeltaDataPB& undo_delta_pb : pb.undo_deltas()) {
undo_delta_blocks_.push_back(BlockId::FromPB(undo_delta_pb.block()));
}
// Load live row count.
if (tablet_metadata_->supports_live_row_count()) {
live_row_count_ = pb.live_row_count();
}
}
void RowSetMetadata::ToProtobuf(RowSetDataPB *pb) {
pb->set_id(id_);
std::lock_guard<LockType> l(lock_);
// Write Column Files
for (const ColumnIdToBlockIdMap::value_type& e : blocks_by_col_id_) {
ColumnId col_id = e.first;
const BlockId& block_id = e.second;
ColumnDataPB *col_data = pb->add_columns();
block_id.CopyToPB(col_data->mutable_block());
col_data->set_column_id(col_id);
}
// Write Delta Files
pb->set_last_durable_dms_id(last_durable_redo_dms_id_);
for (const BlockId& redo_delta_block : redo_delta_blocks_) {
DeltaDataPB *redo_delta_pb = pb->add_redo_deltas();
redo_delta_block.CopyToPB(redo_delta_pb->mutable_block());
}
for (const BlockId& undo_delta_block : undo_delta_blocks_) {
DeltaDataPB *undo_delta_pb = pb->add_undo_deltas();
undo_delta_block.CopyToPB(undo_delta_pb->mutable_block());
}
// Write Bloom File
if (!bloom_block_.IsNull()) {
bloom_block_.CopyToPB(pb->mutable_bloom_block());
}
// Write AdHoc Index
if (!adhoc_index_block_.IsNull()) {
adhoc_index_block_.CopyToPB(pb->mutable_adhoc_index_block());
}
// Write the min/max keys.
if (has_encoded_keys_unlocked()) {
pb->set_min_encoded_key(*min_encoded_key_);
pb->set_max_encoded_key(*max_encoded_key_);
}
// Write the live row count.
if (tablet_metadata_->supports_live_row_count()) {
pb->set_live_row_count(live_row_count_);
}
}
const std::string RowSetMetadata::ToString() const {
return Substitute("RowSet($0)", id_);
}
void RowSetMetadata::SetColumnDataBlocks(const std::map<ColumnId, BlockId>& blocks_by_col_id) {
ColumnIdToBlockIdMap new_map(blocks_by_col_id.begin(), blocks_by_col_id.end());
new_map.shrink_to_fit();
std::lock_guard<LockType> l(lock_);
blocks_by_col_id_ = std::move(new_map);
}
Status RowSetMetadata::CommitRedoDeltaDataBlock(int64_t dms_id,
int64_t num_deleted_rows,
const BlockId& block_id) {
std::lock_guard<LockType> l(lock_);
last_durable_redo_dms_id_ = dms_id;
redo_delta_blocks_.push_back(block_id);
IncrementLiveRowsUnlocked(-num_deleted_rows);
return Status::OK();
}
Status RowSetMetadata::CommitUndoDeltaDataBlock(const BlockId& block_id) {
std::lock_guard<LockType> l(lock_);
undo_delta_blocks_.push_back(block_id);
return Status::OK();
}
void RowSetMetadata::CommitUpdate(const RowSetMetadataUpdate& update,
BlockIdContainer* removed) {
removed->clear();
{
std::lock_guard<LockType> l(lock_);
// Find the exact sequence of blocks to remove.
for (const auto& rep : update.replace_redo_blocks_) {
CHECK(!rep.to_remove.empty());
auto start_it = std::find(redo_delta_blocks_.begin(),
redo_delta_blocks_.end(), rep.to_remove[0]);
auto end_it = start_it;
for (const BlockId& b : rep.to_remove) {
CHECK(end_it != redo_delta_blocks_.end() && *end_it == b) <<
Substitute("Cannot find subsequence <$0> in <$1>",
BlockId::JoinStrings(rep.to_remove),
BlockId::JoinStrings(redo_delta_blocks_));
++end_it;
}
removed->insert(removed->end(), start_it, end_it);
redo_delta_blocks_.erase(start_it, end_it);
redo_delta_blocks_.insert(start_it, rep.to_add.begin(), rep.to_add.end());
}
// Add new redo blocks
for (const BlockId& b : update.new_redo_blocks_) {
redo_delta_blocks_.push_back(b);
}
// Remove undo blocks.
BlockIdSet undos_to_remove(update.remove_undo_blocks_.begin(),
update.remove_undo_blocks_.end());
auto iter = undo_delta_blocks_.begin();
while (iter != undo_delta_blocks_.end()) {
if (ContainsKey(undos_to_remove, *iter)) {
removed->push_back(*iter);
undos_to_remove.erase(*iter);
iter = undo_delta_blocks_.erase(iter);
} else {
++iter;
}
}
CHECK(undos_to_remove.empty())
<< "Tablet " << tablet_metadata_->tablet_id() << " RowSet " << id_ << ": "
<< "Attempted to remove an undo delta block from the RowSetMetadata that is not present. "
<< "Removed: { " << *removed << " }; "
<< "Failed to remove: { "
<< vector<BlockId>(undos_to_remove.begin(), undos_to_remove.end())
<< " }";
if (!update.new_undo_block_.IsNull()) {
// Front-loading to keep the UNDO files in their natural order.
undo_delta_blocks_.insert(undo_delta_blocks_.begin(), update.new_undo_block_);
}
for (const ColumnIdToBlockIdMap::value_type& e : update.cols_to_replace_) {
// If we are major-compacting deltas into a column which previously had no
// base-data (e.g. because it was newly added), then there will be no original
// block there to replace.
BlockId old_block_id;
if (UpdateReturnCopy(&blocks_by_col_id_, e.first, e.second, &old_block_id)) {
removed->push_back(old_block_id);
}
}
for (const ColumnId& col_id : update.col_ids_to_remove_) {
BlockId old = FindOrDie(blocks_by_col_id_, col_id);
CHECK_EQ(1, blocks_by_col_id_.erase(col_id));
removed->push_back(old);
}
}
blocks_by_col_id_.shrink_to_fit();
}
void RowSetMetadata::IncrementLiveRowsUnlocked(int64_t row_count) {
if (tablet_metadata_->supports_live_row_count() && row_count != 0) {
live_row_count_ += row_count;
DCHECK_GE(live_row_count_, 0);
}
}
void RowSetMetadata::IncrementLiveRows(int64_t row_count) {
std::lock_guard<LockType> l(lock_);
IncrementLiveRowsUnlocked(row_count);
}
int64_t RowSetMetadata::live_row_count() const {
std::lock_guard<LockType> l(lock_);
DCHECK_GE(live_row_count_, 0);
return live_row_count_;
}
BlockIdContainer RowSetMetadata::GetAllBlocks() const {
BlockIdContainer blocks;
blocks.reserve(blocks_by_col_id_.size() +
undo_delta_blocks_.size() +
redo_delta_blocks_.size() +
2); // '2' is reserved for 'adhoc_index_block_' and 'bloom_block_'
std::lock_guard<LockType> l(lock_);
if (!adhoc_index_block_.IsNull()) {
blocks.push_back(adhoc_index_block_);
}
if (!bloom_block_.IsNull()) {
blocks.push_back(bloom_block_);
}
AppendValuesFromMap(blocks_by_col_id_, &blocks);
blocks.insert(blocks.end(),
undo_delta_blocks_.begin(), undo_delta_blocks_.end());
blocks.insert(blocks.end(),
redo_delta_blocks_.begin(), redo_delta_blocks_.end());
return blocks;
}
BlockId RowSetMetadata::GetMaxLiveBlockId() const {
BlockId max_block_id;
std::lock_guard<LockType> l(lock_);
if (!adhoc_index_block_.IsNull()) {
max_block_id = std::max(max_block_id, adhoc_index_block_);
}
if (!bloom_block_.IsNull()) {
max_block_id = std::max(max_block_id, bloom_block_);
}
for (const auto& e : blocks_by_col_id_) {
max_block_id = std::max(max_block_id, e.second);
}
for (const auto& block_id : undo_delta_blocks_) {
max_block_id = std::max(max_block_id, block_id);
}
for (const auto& block_id : redo_delta_blocks_) {
max_block_id = std::max(max_block_id, block_id);
}
return max_block_id;
}
RowSetMetadataUpdate::RowSetMetadataUpdate() {
}
RowSetMetadataUpdate::~RowSetMetadataUpdate() {
}
RowSetMetadataUpdate& RowSetMetadataUpdate::ReplaceColumnId(ColumnId col_id,
const BlockId& block_id) {
InsertOrDie(&cols_to_replace_, col_id, block_id);
return *this;
}
RowSetMetadataUpdate& RowSetMetadataUpdate::RemoveColumnId(ColumnId col_id) {
col_ids_to_remove_.push_back(col_id);
return *this;
}
RowSetMetadataUpdate& RowSetMetadataUpdate::ReplaceRedoDeltaBlocks(
const std::vector<BlockId>& to_remove,
const std::vector<BlockId>& to_add) {
ReplaceDeltaBlocks rdb = { to_remove, to_add };
replace_redo_blocks_.push_back(rdb);
return *this;
}
RowSetMetadataUpdate& RowSetMetadataUpdate::RemoveUndoDeltaBlocks(
const std::vector<BlockId>& to_remove) {
remove_undo_blocks_.insert(remove_undo_blocks_.end(), to_remove.begin(), to_remove.end());
return *this;
}
RowSetMetadataUpdate& RowSetMetadataUpdate::SetNewUndoBlock(const BlockId& undo_block) {
new_undo_block_ = undo_block;
return *this;
}
} // namespace tablet
} // namespace kudu