blob: 5429c9c825c5bfb84a024ddf42b0eb87840a6b62 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <string>
#include "kudu/common/columnblock.h"
#include "kudu/common/row.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/row_changelist.h"
#include "kudu/common/schema.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/coding.h"
#include "kudu/util/coding-inl.h"
#include "kudu/util/faststring.h"
using strings::Substitute;
using strings::SubstituteAndAppend;
namespace kudu {
string RowChangeList::ToString(const Schema &schema) const {
DCHECK_GT(encoded_data_.size(), 0);
RowChangeListDecoder decoder(*this);
Status s = decoder.Init();
if (!s.ok()) {
return "[invalid: " + s.ToString() + "]";
}
if (decoder.is_delete()) {
return string("DELETE");
} else if (decoder.is_reinsert()) {
ConstContiguousRow row(&schema, decoder.remaining_);
return string("REINSERT ") + schema.DebugRow(row);
} else {
CHECK(decoder.is_update()) << "Unknown changelist type!";
}
string ret = "SET ";
bool first = true;
while (decoder.HasNext()) {
if (!first) {
ret.append(", ");
}
first = false;
RowChangeListDecoder::DecodedUpdate dec;
int col_idx;
const void* value;
s = decoder.DecodeNext(&dec);
if (s.ok()) {
s = dec.Validate(schema, &col_idx, &value);
}
if (!s.ok()) {
return "[invalid update: " + s.ToString() + ", before corruption: " + ret + "]";
}
if (col_idx == Schema::kColumnNotFound) {
// Unknown column.
SubstituteAndAppend(&ret, "[unknown column id $0]=", dec.col_id);
if (dec.null) {
ret.append("NULL");
} else {
ret.append(dec.raw_value.ToDebugString());
}
} else {
// Known column.
const ColumnSchema& col_schema = schema.column(col_idx);
ret.append(col_schema.name());
ret.append("=");
if (value == nullptr) {
ret.append("NULL");
} else {
ret.append(col_schema.Stringify(value));
}
}
}
return ret;
}
void RowChangeListEncoder::AddRawColumnUpdate(
int col_id, bool is_null, Slice new_val) {
if (type_ == RowChangeList::kUninitialized) {
SetType(RowChangeList::kUpdate);
} else {
DCHECK_EQ(RowChangeList::kUpdate, type_);
}
InlinePutVarint32(dst_, col_id);
if (is_null) {
dst_->push_back(0);
} else {
InlinePutVarint32(dst_, new_val.size() + 1);
dst_->append(new_val.data(), new_val.size());
}
}
void RowChangeListEncoder::AddColumnUpdate(const ColumnSchema& col_schema,
int col_id,
const void* cell_ptr) {
Slice val_slice;
if (cell_ptr != nullptr) {
if (col_schema.type_info()->physical_type() == BINARY) {
memcpy(&val_slice, cell_ptr, sizeof(val_slice));
} else {
val_slice = Slice(reinterpret_cast<const uint8_t*>(cell_ptr),
col_schema.type_info()->size());
}
} else {
// NULL value.
DCHECK(col_schema.is_nullable());
}
AddRawColumnUpdate(col_id, cell_ptr == nullptr, val_slice);
}
Status RowChangeListDecoder::Init() {
if (PREDICT_FALSE(remaining_.empty())) {
return Status::Corruption("empty changelist - expected type");
}
bool was_valid = tight_enum_test_cast<RowChangeList::ChangeType>(remaining_[0], &type_);
if (PREDICT_FALSE(!was_valid || type_ == RowChangeList::kUninitialized)) {
return Status::Corruption(Substitute("bad type enum value: $0 in $1",
static_cast<int>(remaining_[0]),
remaining_.ToDebugString()));
}
if (PREDICT_FALSE(is_delete() && remaining_.size() != 1)) {
return Status::Corruption("DELETE changelist too long",
remaining_.ToDebugString());
}
remaining_.remove_prefix(1);
return Status::OK();
}
Status RowChangeListDecoder::GetReinsertedRowSlice(const Schema& schema, Slice* s) const {
DCHECK(is_reinsert());
int expected_size = ContiguousRowHelper::row_size(schema);
if (remaining_.size() != expected_size) {
return Status::Corruption(Substitute("REINSERT changelist wrong length (expected $0)",
expected_size,
remaining_.ToDebugString()));
}
*s = remaining_;
return Status::OK();
}
Status RowChangeListDecoder::ProjectUpdate(const DeltaProjector& projector,
const RowChangeList& src,
faststring *buf) {
RowChangeListDecoder decoder(src);
RETURN_NOT_OK(decoder.Init());
buf->clear();
RowChangeListEncoder encoder(buf);
if (decoder.is_delete()) {
encoder.SetToDelete();
} else if (decoder.is_reinsert()) {
Slice reinsert;
RETURN_NOT_OK(decoder.GetReinsertedRowSlice(*projector.delta_schema(), &reinsert));
// ReInsert = MemStore Insert -> Delete -> (Re)Insert
ConstContiguousRow src_row = ConstContiguousRow(projector.delta_schema(),
reinsert);
RowProjector row_projector(projector.delta_schema(), projector.projection());
size_t row_size = ContiguousRowHelper::row_size(*projector.projection());
uint8_t buffer[row_size];
ContiguousRow row(projector.projection(), buffer);
RETURN_NOT_OK(row_projector.Init());
RETURN_NOT_OK(row_projector.ProjectRowForRead(src_row, &row, static_cast<Arena*>(NULL)));
encoder.SetToReinsert(Slice(buffer, row_size));
} else if (decoder.is_update()) {
while (decoder.HasNext()) {
DecodedUpdate dec;
RETURN_NOT_OK(decoder.DecodeNext(&dec));
int col_idx;
const void* new_val;
RETURN_NOT_OK(dec.Validate(*projector.projection(), &col_idx, &new_val));
// If the new schema doesn't have this column, throw away the update.
if (col_idx == Schema::kColumnNotFound) {
continue;
}
encoder.AddRawColumnUpdate(dec.col_id, dec.null, dec.raw_value);
}
}
return Status::OK();
}
Status RowChangeListDecoder::ApplyRowUpdate(RowBlockRow *dst_row,
Arena *arena, RowChangeListEncoder* undo_encoder) {
const Schema* dst_schema = dst_row->schema();
while (HasNext()) {
DecodedUpdate dec;
RETURN_NOT_OK(DecodeNext(&dec));
int col_idx;
const void* value;
RETURN_NOT_OK(dec.Validate(*dst_schema, &col_idx, &value));
// If the delta is for a column ID not part of the projection
// we're scanning, just skip over it.
if (col_idx == Schema::kColumnNotFound) {
continue;
}
const ColumnSchema& col_schema = dst_schema->column(col_idx);
SimpleConstCell src(&col_schema, value);
// save the old cell on the undo encoder
RowBlockRow::Cell dst_cell = dst_row->cell(col_idx);
undo_encoder->AddColumnUpdate(col_schema, dec.col_id, dst_cell.ptr());
// copy the new cell to the row
RETURN_NOT_OK(CopyCell(src, &dst_cell, arena));
}
return Status::OK();
}
Status RowChangeListDecoder::ApplyToOneColumn(size_t row_idx, ColumnBlock* dst_col,
const Schema& dst_schema,
int col_idx, Arena *arena) {
DCHECK_EQ(RowChangeList::kUpdate, type_);
const ColumnSchema& col_schema = dst_schema.column(col_idx);
ColumnId col_id = dst_schema.column_id(col_idx);
while (HasNext()) {
DecodedUpdate dec;
RETURN_NOT_OK(DecodeNext(&dec));
if (dec.col_id != col_id) {
continue;
}
int junk_col_idx;
const void* new_val;
RETURN_NOT_OK(dec.Validate(dst_schema, &junk_col_idx, &new_val));
DCHECK_EQ(junk_col_idx, col_idx);
SimpleConstCell src(&col_schema, new_val);
ColumnBlock::Cell dst_cell = dst_col->cell(row_idx);
RETURN_NOT_OK(CopyCell(src, &dst_cell, arena));
// TODO: could potentially break; here if we're guaranteed to only have one update
// per column in a RowChangeList (which would make sense!)
}
return Status::OK();
}
Status RowChangeListDecoder::RemoveColumnIdsFromChangeList(const RowChangeList& src,
const std::vector<ColumnId>& col_ids,
RowChangeListEncoder* out) {
RowChangeListDecoder decoder(src);
RETURN_NOT_OK(decoder.Init());
if (decoder.is_delete()) {
out->SetToDelete();
} else if (decoder.is_reinsert()) {
out->SetToReinsert(decoder.remaining_);
} else if (decoder.is_update()) {
while (decoder.HasNext()) {
DecodedUpdate dec;
RETURN_NOT_OK(decoder.DecodeNext(&dec));
if (!std::binary_search(col_ids.begin(), col_ids.end(), dec.col_id)) {
out->AddRawColumnUpdate(dec.col_id, dec.null, dec.raw_value);
}
}
}
return Status::OK();
}
Status RowChangeListDecoder::DecodeNext(DecodedUpdate* dec) {
DCHECK_NE(type_, RowChangeList::kUninitialized) << "Must call Init()";
// Decode the column id.
uint32_t id;
if (PREDICT_FALSE(!GetVarint32(&remaining_, &id))) {
return Status::Corruption("Invalid column ID varint in delta");
}
dec->col_id = id;
uint32_t size;
if (PREDICT_FALSE(!GetVarint32(&remaining_, &size))) {
return Status::Corruption("Invalid size varint in delta");
}
dec->null = size == 0;
if (dec->null) {
return Status::OK();
}
size--;
if (PREDICT_FALSE(remaining_.size() < size)) {
return Status::Corruption(
Substitute("truncated value for column id $0, expected $1 bytes, only $2 remaining",
id, size, remaining_.size()));
}
dec->raw_value = Slice(remaining_.data(), size);
remaining_.remove_prefix(size);
return Status::OK();
}
Status RowChangeListDecoder::DecodedUpdate::Validate(const Schema& schema,
int* col_idx,
const void** value) const {
*col_idx = schema.find_column_by_id(this->col_id);
if (*col_idx == Schema::kColumnNotFound) {
return Status::OK();
}
// It's a valid column - validate it.
const ColumnSchema& col = schema.column(*col_idx);
if (null) {
if (!col.is_nullable()) {
return Status::Corruption("decoded set-to-NULL for non-nullable column",
col.ToString());
}
*value = nullptr;
return Status::OK();
}
if (col.type_info()->physical_type() == BINARY) {
*value = &this->raw_value;
return Status::OK();
}
if (PREDICT_FALSE(col.type_info()->size() != this->raw_value.size())) {
return Status::Corruption(Substitute(
"invalid value $0 for column $1",
this->raw_value.ToDebugString(), col.ToString()));
}
*value = reinterpret_cast<const void*>(this->raw_value.data());
return Status::OK();
}
} // namespace kudu