blob: 194915a02c918a2675660b79d4f869b919d409ce [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/cfile/binary_dict_block.h"
#include <functional>
#include <limits>
#include <ostream>
#include <utility>
#include <vector>
#include <glog/logging.h>
#include "kudu/cfile/block_handle.h"
#include "kudu/cfile/block_pointer.h"
#include "kudu/cfile/bshuf_block.h"
#include "kudu/cfile/cfile.pb.h"
#include "kudu/cfile/cfile_reader.h"
#include "kudu/cfile/cfile_writer.h"
#include "kudu/common/column_materialization_context.h"
#include "kudu/common/column_predicate.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/rowblock_memory.h"
#include "kudu/common/types.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/stringpiece.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/bitmap.h"
#include "kudu/util/coding.h"
#include "kudu/util/coding-inl.h"
#include "kudu/util/memory/arena.h"
using std::vector;
namespace kudu {
namespace cfile {
BinaryDictBlockBuilder::BinaryDictBlockBuilder(const WriterOptions* options)
: options_(options),
dict_block_(options_),
dictionary_strings_arena_(1024),
mode_(kCodeWordMode) {
data_builder_.reset(new BShufBlockBuilder<UINT32>(options_));
// We use this invalid StringPiece for the "empty key". It's safe to build such
// a string and use it in equality comparisons.
dictionary_.set_empty_key(StringPiece(static_cast<const char*>(nullptr),
std::numeric_limits<int>::max()));
Reset();
}
void BinaryDictBlockBuilder::Reset() {
if (mode_ == kCodeWordMode &&
dict_block_.IsBlockFull()) {
mode_ = kPlainBinaryMode;
data_builder_.reset(new BinaryPlainBlockBuilder(options_));
} else {
data_builder_->Reset();
}
finished_ = false;
}
void BinaryDictBlockBuilder::Finish(rowid_t ordinal_pos, vector<Slice>* slices) {
finished_ = true;
header_buffer_.resize(sizeof(int32_t));
InlineEncodeFixed32(&header_buffer_[0], mode_);
vector<Slice> data_slices;
data_builder_->Finish(ordinal_pos, &data_slices);
data_slices.insert(data_slices.begin(), Slice(header_buffer_));
*slices = std::move(data_slices);
}
// The current block is considered full when the the size of data block
// exceeds limit or when the size of dictionary block exceeds the
// CFile block size.
//
// If it is the latter case, all the subsequent data blocks will switch to
// StringPlainBlock automatically.
bool BinaryDictBlockBuilder::IsBlockFull() const {
if (data_builder_->IsBlockFull()) return true;
if (dict_block_.IsBlockFull() && (mode_ == kCodeWordMode)) return true;
return false;
}
int BinaryDictBlockBuilder::AddCodeWords(const uint8_t* vals, size_t count) {
DCHECK(!finished_);
DCHECK_GT(count, 0);
size_t i;
const Slice* src = reinterpret_cast<const Slice*>(vals);
if (data_builder_->Count() == 0) {
first_key_.assign_copy(src->data(), src->size());
}
for (i = 0; i < count; i++, src++) {
const char* c_str = reinterpret_cast<const char*>(src->data());
StringPiece current_item(c_str, src->size());
uint32_t codeword;
if (PREDICT_FALSE(!FindCopy(dictionary_, current_item, &codeword))) {
// Not already in dictionary, try to add it if there is space.
if (PREDICT_FALSE(!AddToDict(*src, &codeword))) {
break;
}
}
if (PREDICT_FALSE(data_builder_->Add(reinterpret_cast<const uint8_t*>(&codeword), 1) == 0)) {
// The data block is full
break;
}
}
return i;
}
bool BinaryDictBlockBuilder::AddToDict(Slice val, uint32_t* codeword) {
if (PREDICT_FALSE(dict_block_.Add(reinterpret_cast<const uint8_t*>(&val), 1) == 0)) {
// The dictionary block is full
return false;
}
const uint8_t* s_ptr = CHECK_NOTNULL(dictionary_strings_arena_.AddSlice(val));
const char* s_content = reinterpret_cast<const char*>(s_ptr);
*codeword = dict_block_.Count() - 1;
InsertOrDie(&dictionary_, StringPiece(s_content, val.size()), *codeword);
return true;
}
int BinaryDictBlockBuilder::Add(const uint8_t* vals, size_t count) {
if (mode_ == kCodeWordMode) {
return AddCodeWords(vals, count);
} else {
DCHECK_EQ(mode_, kPlainBinaryMode);
return data_builder_->Add(vals, count);
}
}
Status BinaryDictBlockBuilder::AppendExtraInfo(CFileWriter* c_writer, CFileFooterPB* footer) {
vector<Slice> dict_v;
dict_block_.Finish(0, &dict_v);
BlockPointer ptr;
Status s = c_writer->AppendDictBlock(dict_v, &ptr, "Append dictionary block");
if (!s.ok()) {
LOG(WARNING) << "Unable to append block to file: " << s.ToString();
return s;
}
ptr.CopyToPB(footer->mutable_dict_block_ptr());
return Status::OK();
}
size_t BinaryDictBlockBuilder::Count() const {
return data_builder_->Count();
}
Status BinaryDictBlockBuilder::GetFirstKey(void* key_void) const {
if (mode_ == kCodeWordMode) {
CHECK(finished_);
Slice* slice = reinterpret_cast<Slice*>(key_void);
*slice = Slice(first_key_);
return Status::OK();
} else {
DCHECK_EQ(mode_, kPlainBinaryMode);
return data_builder_->GetFirstKey(key_void);
}
}
Status BinaryDictBlockBuilder::GetLastKey(void* key_void) const {
if (mode_ == kCodeWordMode) {
CHECK(finished_);
uint32_t last_codeword;
RETURN_NOT_OK(data_builder_->GetLastKey(reinterpret_cast<void*>(&last_codeword)));
return dict_block_.GetKeyAtIdx(key_void, last_codeword);
} else {
DCHECK_EQ(mode_, kPlainBinaryMode);
return data_builder_->GetLastKey(key_void);
}
}
////////////////////////////////////////////////////////////
// Decoding
////////////////////////////////////////////////////////////
BinaryDictBlockDecoder::BinaryDictBlockDecoder(scoped_refptr<BlockHandle> block,
CFileIterator* iter)
: block_(std::move(block)),
data_(block_->data()),
parsed_(false),
dict_decoder_(iter->GetDictDecoder()),
parent_cfile_iter_(iter) {
}
Status BinaryDictBlockDecoder::ParseHeader() {
CHECK(!parsed_);
if (data_.size() < kMinHeaderSize) {
return Status::Corruption(
strings::Substitute("not enough bytes for header: dictionary block header "
"size ($0) less than minimum possible header length ($1)",
data_.size(), kMinHeaderSize));
}
bool valid = tight_enum_test_cast<DictEncodingMode>(DecodeFixed32(&data_[0]), &mode_);
if (PREDICT_FALSE(!valid)) {
return Status::Corruption("header Mode information corrupted");
}
auto sub_block = block_->SubrangeBlock(4, data_.size() - 4);
if (mode_ == kCodeWordMode) {
data_decoder_.reset(new BShufBlockDecoder<UINT32>(std::move(sub_block)));
} else {
if (mode_ != kPlainBinaryMode) {
return Status::Corruption("Unrecognized Dictionary encoded data block header");
}
data_decoder_.reset(new BinaryPlainBlockDecoder(std::move(sub_block)));
}
RETURN_NOT_OK(data_decoder_->ParseHeader());
parsed_ = true;
return Status::OK();
}
void BinaryDictBlockDecoder::SeekToPositionInBlock(uint pos) {
data_decoder_->SeekToPositionInBlock(pos);
}
Status BinaryDictBlockDecoder::SeekAtOrAfterValue(const void* value_void, bool* exact) {
if (mode_ == kCodeWordMode) {
DCHECK(value_void != nullptr);
Status s = dict_decoder_->SeekAtOrAfterValue(value_void, exact);
if (!s.ok()) {
// This case means the value_void is larger that the largest key
// in the dictionary block. Therefore, it is impossible to be in
// the current data block, and we adjust the index to be the end
// of the block
data_decoder_->SeekToPositionInBlock(data_decoder_->Count() - 1);
return s;
}
size_t index = dict_decoder_->GetCurrentIndex();
bool tmp;
return data_decoder_->SeekAtOrAfterValue(&index, &tmp);
} else {
DCHECK_EQ(mode_, kPlainBinaryMode);
return data_decoder_->SeekAtOrAfterValue(value_void, exact);
}
}
// TODO: implement CopyNextAndEval for more blocks. Eg. other blocks can
// store their min/max values. CopyNextAndEval in these blocks could
// short-circuit if the query can does not search for values within the
// min/max range, or copy all and evaluate otherwise.
Status BinaryDictBlockDecoder::CopyNextAndEval(size_t* n,
ColumnMaterializationContext* ctx,
SelectionVectorView* sel,
ColumnDataView* dst) {
ctx->SetDecoderEvalSupported();
if (mode_ == kPlainBinaryMode) {
// Copy all strings and evaluate them Slice-by-Slice.
return data_decoder_->CopyNextAndEval(n, ctx, sel, dst);
}
// Predicates that have no matching words should return no data.
SelectionVector* codewords_matching_pred = parent_cfile_iter_->GetCodeWordsMatchingPredicate();
CHECK(codewords_matching_pred != nullptr);
if (!codewords_matching_pred->AnySelected()) {
// If nothing is selected, move the data_decoder_ pointer forward and clear
// the corresponding bits in the selection vector.
int skip = static_cast<int>(*n);
data_decoder_->SeekForward(&skip);
*n = static_cast<size_t>(skip);
sel->ClearBits(*n);
return Status::OK();
}
// IsNotNull predicates should return all data.
if (ctx->pred()->predicate_type() == PredicateType::IsNotNull) {
return CopyNextDecodeStrings(n, dst);
}
bool retain_dict = false;
// Load the rows' codeword values into a buffer for scanning.
BShufBlockDecoder<UINT32>* d_bptr = down_cast<BShufBlockDecoder<UINT32>*>(data_decoder_.get());
codeword_buf_.resize(*n * sizeof(uint32_t));
d_bptr->CopyNextValuesToArray(n, codeword_buf_.data());
Slice* out = reinterpret_cast<Slice*>(dst->data());
for (size_t i = 0; i < *n; i++, out++) {
// Check with the SelectionVectorView to see whether the data has already
// been cleared, in which case we can skip evaluation.
if (!sel->TestBit(i)) {
continue;
}
uint32_t codeword = *reinterpret_cast<uint32_t*>(&codeword_buf_[i*sizeof(uint32_t)]);
if (BitmapTest(codewords_matching_pred->bitmap(), codeword)) {
// Row is included in predicate: point the cell in the block
// to the entry in the dictionary.
*out = dict_decoder_->string_at_index(codeword);
retain_dict = true;
} else {
// Mark that the row will not be returned.
sel->ClearBit(i);
}
}
if (retain_dict) {
dst->memory()->RetainReference(dict_decoder_->block_handle());
}
return Status::OK();
}
Status BinaryDictBlockDecoder::CopyNextDecodeStrings(size_t* n, ColumnDataView* dst) {
DCHECK(parsed_);
CHECK_EQ(dst->type_info()->physical_type(), BINARY);
DCHECK_LE(*n, dst->nrows());
DCHECK_EQ(dst->stride(), sizeof(Slice));
Slice* out = reinterpret_cast<Slice*>(dst->data());
codeword_buf_.resize((*n)*sizeof(uint32_t));
// Copy the codewords into a temporary buffer first.
BShufBlockDecoder<UINT32>* d_bptr = down_cast<BShufBlockDecoder<UINT32>*>(data_decoder_.get());
RETURN_NOT_OK(d_bptr->CopyNextValuesToArray(n, codeword_buf_.data()));
// Now point the cells in the destination block to the string data in the dictionary
// block.
for (int i = 0; i < *n; i++) {
uint32_t codeword = *reinterpret_cast<uint32_t*>(&codeword_buf_[i*sizeof(uint32_t)]);
*out++ = dict_decoder_->string_at_index(codeword);
}
dst->memory()->RetainReference(dict_decoder_->block_handle());
return Status::OK();
}
Status BinaryDictBlockDecoder::CopyNextValues(size_t* n, ColumnDataView* dst) {
if (mode_ == kCodeWordMode) {
return CopyNextDecodeStrings(n, dst);
} else {
DCHECK_EQ(mode_, kPlainBinaryMode);
return data_decoder_->CopyNextValues(n, dst);
}
}
} // namespace cfile
} // namespace kudu