blob: 731e16db08ac61edfe3ee1635fc6bad9c8477c6f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/cfile/binary_dict_block.h"
#include <glog/logging.h>
#include <algorithm>
#include "kudu/cfile/cfile_reader.h"
#include "kudu/cfile/cfile_util.h"
#include "kudu/cfile/cfile_writer.h"
#include "kudu/cfile/bshuf_block.h"
#include "kudu/common/columnblock.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/util/coding.h"
#include "kudu/util/coding-inl.h"
#include "kudu/util/group_varint-inl.h"
#include "kudu/util/hexdump.h"
#include "kudu/util/memory/arena.h"
namespace kudu {
namespace cfile {
BinaryDictBlockBuilder::BinaryDictBlockBuilder(const WriterOptions* options)
: options_(options),
dict_block_(options_),
dictionary_strings_arena_(1024, 32*1024*1024),
mode_(kCodeWordMode) {
data_builder_.reset(new BShufBlockBuilder<UINT32>(options_));
Reset();
}
void BinaryDictBlockBuilder::Reset() {
buffer_.clear();
buffer_.resize(kMaxHeaderSize);
buffer_.reserve(options_->storage_attributes.cfile_block_size);
if (mode_ == kCodeWordMode &&
dict_block_.IsBlockFull(options_->storage_attributes.cfile_block_size)) {
mode_ = kPlainBinaryMode;
data_builder_.reset(new BinaryPlainBlockBuilder(options_));
} else {
data_builder_->Reset();
}
finished_ = false;
}
Slice BinaryDictBlockBuilder::Finish(rowid_t ordinal_pos) {
finished_ = true;
InlineEncodeFixed32(&buffer_[0], mode_);
// TODO: if we could modify the the Finish() API a little bit, we can
// avoid an extra memory copy (buffer_.append(..))
Slice data_slice = data_builder_->Finish(ordinal_pos);
buffer_.append(data_slice.data(), data_slice.size());
return Slice(buffer_);
}
// The current block is considered full when the the size of data block
// exceeds limit or when the size of dictionary block exceeds the
// CFile block size.
//
// If it is the latter case, all the subsequent data blocks will switch to
// StringPlainBlock automatically.
bool BinaryDictBlockBuilder::IsBlockFull(size_t limit) const {
int block_size = options_->storage_attributes.cfile_block_size;
if (data_builder_->IsBlockFull(block_size)) return true;
if (dict_block_.IsBlockFull(block_size) && (mode_ == kCodeWordMode)) return true;
return false;
}
int BinaryDictBlockBuilder::AddCodeWords(const uint8_t* vals, size_t count) {
DCHECK(!finished_);
DCHECK_GT(count, 0);
size_t i;
if (data_builder_->Count() == 0) {
const Slice* first = reinterpret_cast<const Slice*>(vals);
first_key_.assign_copy(first->data(), first->size());
}
for (i = 0; i < count; i++) {
const Slice* src = reinterpret_cast<const Slice*>(vals);
const char* c_str = reinterpret_cast<const char*>(src->data());
StringPiece current_item(c_str, src->size());
uint32_t codeword;
if (!FindCopy(dictionary_, current_item, &codeword)) {
// The dictionary block is full
if (dict_block_.Add(vals, 1) == 0) {
break;
}
const uint8_t* s_ptr = dictionary_strings_arena_.AddSlice(*src);
if (s_ptr == nullptr) {
// Arena does not have enough space for string content
// Ideally, it should not happen.
LOG(ERROR) << "Arena of Dictionary Encoder does not have enough memory for strings";
break;
}
const char* s_content = reinterpret_cast<const char*>(s_ptr);
codeword = dict_block_.Count() - 1;
InsertOrDie(&dictionary_, StringPiece(s_content, src->size()), codeword);
}
// The data block is full
if (data_builder_->Add(reinterpret_cast<const uint8_t*>(&codeword), 1) == 0) {
break;
}
vals += sizeof(Slice);
}
return i;
}
int BinaryDictBlockBuilder::Add(const uint8_t* vals, size_t count) {
if (mode_ == kCodeWordMode) {
return AddCodeWords(vals, count);
} else {
DCHECK_EQ(mode_, kPlainBinaryMode);
return data_builder_->Add(vals, count);
}
}
Status BinaryDictBlockBuilder::AppendExtraInfo(CFileWriter* c_writer, CFileFooterPB* footer) {
Slice dict_slice = dict_block_.Finish(0);
std::vector<Slice> dict_v;
dict_v.push_back(dict_slice);
BlockPointer ptr;
Status s = c_writer->AppendDictBlock(dict_v, &ptr, "Append dictionary block");
if (!s.ok()) {
LOG(WARNING) << "Unable to append block to file: " << s.ToString();
return s;
}
ptr.CopyToPB(footer->mutable_dict_block_ptr());
return Status::OK();
}
size_t BinaryDictBlockBuilder::Count() const {
return data_builder_->Count();
}
Status BinaryDictBlockBuilder::GetFirstKey(void* key_void) const {
if (mode_ == kCodeWordMode) {
CHECK(finished_);
Slice* slice = reinterpret_cast<Slice*>(key_void);
*slice = Slice(first_key_);
return Status::OK();
} else {
DCHECK_EQ(mode_, kPlainBinaryMode);
return data_builder_->GetFirstKey(key_void);
}
}
////////////////////////////////////////////////////////////
// Decoding
////////////////////////////////////////////////////////////
BinaryDictBlockDecoder::BinaryDictBlockDecoder(Slice slice, CFileIterator* iter)
: data_(std::move(slice)),
parsed_(false) {
dict_decoder_ = iter->GetDictDecoder();
}
Status BinaryDictBlockDecoder::ParseHeader() {
CHECK(!parsed_);
if (data_.size() < kMinHeaderSize) {
return Status::Corruption(
strings::Substitute("not enough bytes for header: dictionary block header "
"size ($0) less than minimum possible header length ($1)",
data_.size(), kMinHeaderSize));
}
bool valid = tight_enum_test_cast<DictEncodingMode>(DecodeFixed32(&data_[0]), &mode_);
if (PREDICT_FALSE(!valid)) {
return Status::Corruption("header Mode information corrupted");
}
Slice content(data_.data() + 4, data_.size() - 4);
if (mode_ == kCodeWordMode) {
data_decoder_.reset(new BShufBlockDecoder<UINT32>(content));
} else {
if (mode_ != kPlainBinaryMode) {
return Status::Corruption("Unrecognized Dictionary encoded data block header");
}
data_decoder_.reset(new BinaryPlainBlockDecoder(content));
}
RETURN_NOT_OK(data_decoder_->ParseHeader());
parsed_ = true;
return Status::OK();
}
void BinaryDictBlockDecoder::SeekToPositionInBlock(uint pos) {
data_decoder_->SeekToPositionInBlock(pos);
}
Status BinaryDictBlockDecoder::SeekAtOrAfterValue(const void* value_void, bool* exact) {
if (mode_ == kCodeWordMode) {
DCHECK(value_void != nullptr);
Status s = dict_decoder_->SeekAtOrAfterValue(value_void, exact);
if (!s.ok()) {
// This case means the value_void is larger that the largest key
// in the dictionary block. Therefore, it is impossible to be in
// the current data block, and we adjust the index to be the end
// of the block
data_decoder_->SeekToPositionInBlock(data_decoder_->Count() - 1);
return s;
}
size_t index = dict_decoder_->GetCurrentIndex();
bool tmp;
return data_decoder_->SeekAtOrAfterValue(&index, &tmp);
} else {
DCHECK_EQ(mode_, kPlainBinaryMode);
return data_decoder_->SeekAtOrAfterValue(value_void, exact);
}
}
Status BinaryDictBlockDecoder::CopyNextDecodeStrings(size_t* n, ColumnDataView* dst) {
DCHECK(parsed_);
CHECK_EQ(dst->type_info()->physical_type(), BINARY);
DCHECK_LE(*n, dst->nrows());
DCHECK_EQ(dst->stride(), sizeof(Slice));
Arena* out_arena = dst->arena();
Slice* out = reinterpret_cast<Slice*>(dst->data());
codeword_buf_.resize((*n)*sizeof(uint32_t));
// Copy the codewords into a temporary buffer first.
// And then Copy the strings corresponding to the codewords to the destination buffer.
BShufBlockDecoder<UINT32>* d_bptr = down_cast<BShufBlockDecoder<UINT32>*>(data_decoder_.get());
RETURN_NOT_OK(d_bptr->CopyNextValuesToArray(n, codeword_buf_.data()));
for (int i = 0; i < *n; i++) {
uint32_t codeword = *reinterpret_cast<uint32_t*>(&codeword_buf_[i*sizeof(uint32_t)]);
Slice elem = dict_decoder_->string_at_index(codeword);
CHECK(out_arena->RelocateSlice(elem, out));
out++;
}
return Status::OK();
}
Status BinaryDictBlockDecoder::CopyNextValues(size_t* n, ColumnDataView* dst) {
if (mode_ == kCodeWordMode) {
return CopyNextDecodeStrings(n, dst);
} else {
DCHECK_EQ(mode_, kPlainBinaryMode);
return data_decoder_->CopyNextValues(n, dst);
}
}
} // namespace cfile
} // namespace kudu