blob: ed7c64111070502f5d95fd8f5e72922bd83c7313 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/cfile/binary_plain_block.h"
#include <algorithm>
#include <cstdint>
#include <functional>
#include <ostream>
#include <vector>
#include <glog/logging.h>
#include "kudu/cfile/block_handle.h"
#include "kudu/cfile/cfile_util.h"
#include "kudu/common/column_materialization_context.h"
#include "kudu/common/column_predicate.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/rowblock_memory.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/coding.h"
#include "kudu/util/coding-inl.h"
#include "kudu/util/group_varint-inl.h"
#include "kudu/util/hexdump.h"
using std::vector;
namespace kudu {
namespace cfile {
BinaryPlainBlockBuilder::BinaryPlainBlockBuilder(const WriterOptions *options)
: end_of_data_offset_(0),
size_estimate_(0),
options_(options) {
Reset();
}
BinaryPlainBlockBuilder::~BinaryPlainBlockBuilder() = default;
void BinaryPlainBlockBuilder::Reset() {
offsets_.clear();
buffer_.clear();
buffer_.reserve(options_->storage_attributes.cfile_block_size);
buffer_.resize(kHeaderSize);
size_estimate_ = kHeaderSize;
end_of_data_offset_ = kHeaderSize;
finished_ = false;
}
bool BinaryPlainBlockBuilder::IsBlockFull() const {
return size_estimate_ > options_->storage_attributes.cfile_block_size;
}
void BinaryPlainBlockBuilder::Finish(rowid_t ordinal_pos, vector<Slice>* slices) {
finished_ = true;
size_t offsets_pos = buffer_.size();
// Set up the header
InlineEncodeFixed32(&buffer_[0], ordinal_pos);
InlineEncodeFixed32(&buffer_[4], offsets_.size());
InlineEncodeFixed32(&buffer_[8], offsets_pos);
// append the offsets, if non-empty
if (!offsets_.empty()) {
coding::AppendGroupVarInt32Sequence(&buffer_, 0, &offsets_[0], offsets_.size());
}
*slices = { Slice(buffer_) };
}
int BinaryPlainBlockBuilder::Add(const uint8_t *vals, size_t count) {
DCHECK(!finished_);
DCHECK_GT(count, 0);
size_t i = 0;
// If the block is full, should stop adding more items.
while (!IsBlockFull() && i < count) {
// Every fourth entry needs a gvint selector byte
// TODO(todd): does it cost a lot to account these things specifically?
// maybe cheaper to just over-estimate - allocation is cheaper than math?
if (offsets_.size() % 4 == 0) {
size_estimate_++;
}
const Slice *src = reinterpret_cast<const Slice *>(vals);
size_t offset = buffer_.size();
offsets_.push_back(offset);
size_estimate_ += coding::CalcRequiredBytes32(offset);
buffer_.append(src->data(), src->size());
size_estimate_ += src->size();
i++;
vals += sizeof(Slice);
}
end_of_data_offset_ = buffer_.size();
return i;
}
size_t BinaryPlainBlockBuilder::Count() const {
return offsets_.size();
}
Status BinaryPlainBlockBuilder::GetKeyAtIdx(void *key_void, int idx) const {
Slice *slice = reinterpret_cast<Slice *>(key_void);
if (idx >= offsets_.size()) {
return Status::InvalidArgument("index too large");
}
if (offsets_.empty()) {
return Status::NotFound("no keys in data block");
}
if (PREDICT_FALSE(offsets_.size() == 1)) {
*slice = Slice(&buffer_[kHeaderSize],
end_of_data_offset_ - kHeaderSize);
} else if (idx + 1 == offsets_.size()) {
*slice = Slice(&buffer_[offsets_[idx]],
end_of_data_offset_ - offsets_[idx]);
} else {
*slice = Slice(&buffer_[offsets_[idx]],
offsets_[idx + 1] - offsets_[idx]);
}
return Status::OK();
}
Status BinaryPlainBlockBuilder::GetFirstKey(void *key_void) const {
CHECK(finished_);
return GetKeyAtIdx(key_void, 0);
}
Status BinaryPlainBlockBuilder::GetLastKey(void *key_void) const {
CHECK(finished_);
return GetKeyAtIdx(key_void, offsets_.size() - 1);
}
////////////////////////////////////////////////////////////
// Decoding
////////////////////////////////////////////////////////////
BinaryPlainBlockDecoder::BinaryPlainBlockDecoder(scoped_refptr<BlockHandle> block)
: block_(std::move(block)),
data_(block_->data()),
parsed_(false),
num_elems_(0),
ordinal_pos_base_(0),
cur_idx_(0) {
}
BinaryPlainBlockDecoder::~BinaryPlainBlockDecoder() = default;
Status BinaryPlainBlockDecoder::ParseHeader() {
CHECK(!parsed_);
if (data_.size() < kMinHeaderSize) {
return Status::Corruption(
strings::Substitute("not enough bytes for header: string block header "
"size ($0) less than minimum possible header length ($1)",
data_.size(), kMinHeaderSize));
}
// Decode header.
ordinal_pos_base_ = DecodeFixed32(&data_[0]);
num_elems_ = DecodeFixed32(&data_[4]);
size_t offsets_pos = DecodeFixed32(&data_[8]);
// Sanity check.
if (offsets_pos > data_.size()) {
return Status::Corruption(
StringPrintf("offsets_pos %ld > block size %ld in plain string block",
offsets_pos, data_.size()));
}
// Decode the string offsets themselves
const uint8_t *p = data_.data() + offsets_pos;
const uint8_t *limit = data_.data() + data_.size();
// Reserve one extra element, which we'll fill in at the end
// with an offset past the last element.
offsets_buf_.resize(sizeof(uint32_t) * (num_elems_ + 1));
uint32_t* dst_ptr = reinterpret_cast<uint32_t*>(offsets_buf_.data());
size_t rem = num_elems_;
while (rem >= 4) {
if (PREDICT_TRUE(p + 16 < limit)) {
#ifndef __aarch64__
p = coding::DecodeGroupVarInt32_SSE(
p, &dst_ptr[0], &dst_ptr[1], &dst_ptr[2], &dst_ptr[3]);
#else
p = coding::DecodeGroupVarInt32_SlowButSafe(
p, &dst_ptr[0], &dst_ptr[1], &dst_ptr[2], &dst_ptr[3]);
#endif //__aarch64__
// The above function should add at most 17 (4 32-bit ints plus a selector byte) to
// 'p'. Thus, since we checked that (p + 16 < limit) above, we are guaranteed that
// (p <= limit) now.
DCHECK_LE(p, limit);
} else {
p = coding::DecodeGroupVarInt32_SlowButSafe(
p, &dst_ptr[0], &dst_ptr[1], &dst_ptr[2], &dst_ptr[3]);
if (PREDICT_FALSE(p > limit)) {
// Only need to check 'p' overrun in the slow path, because 'p' may have
// been within 16 bytes of 'limit'.
LOG(WARNING) << "bad block: " << HexDump(data_);
return Status::Corruption(StringPrintf("unable to decode offsets in block"));
}
}
dst_ptr += 4;
rem -= 4;
}
if (rem > 0) {
uint32_t ints[4];
p = coding::DecodeGroupVarInt32_SlowButSafe(p, &ints[0], &ints[1], &ints[2], &ints[3]);
if (PREDICT_FALSE(p > limit)) {
LOG(WARNING) << "bad block: " << HexDump(data_);
return Status::Corruption(
StringPrintf("unable to decode offsets in block"));
}
for (int i = 0; i < rem; i++) {
*dst_ptr++ = ints[i];
}
}
// Add one extra entry pointing after the last item to make the indexing easier.
*dst_ptr++ = offsets_pos;
parsed_ = true;
return Status::OK();
}
void BinaryPlainBlockDecoder::SeekToPositionInBlock(uint pos) {
if (PREDICT_FALSE(num_elems_ == 0)) {
DCHECK_EQ(0, pos);
return;
}
DCHECK_LE(pos, num_elems_);
cur_idx_ = pos;
}
Status BinaryPlainBlockDecoder::SeekAtOrAfterValue(const void *value_void, bool *exact) {
DCHECK(value_void != nullptr);
const Slice &target = *reinterpret_cast<const Slice *>(value_void);
// Binary search in restart array to find the first restart point
// with a key >= target
uint32_t left = 0;
uint32_t right = num_elems_;
while (left != right) {
uint32_t mid = (left + right) / 2;
Slice mid_key(string_at_index(mid));
int c = mid_key.compare(target);
if (c < 0) {
left = mid + 1;
} else if (c > 0) {
right = mid;
} else {
cur_idx_ = mid;
*exact = true;
return Status::OK();
}
}
*exact = false;
cur_idx_ = left;
if (cur_idx_ == num_elems_) {
return Status::NotFound("after last key in block");
}
return Status::OK();
}
template <typename CellHandler>
Status BinaryPlainBlockDecoder::HandleBatch(size_t* n, ColumnDataView* dst, CellHandler c) {
DCHECK(parsed_);
CHECK_EQ(dst->type_info()->physical_type(), BINARY);
DCHECK_LE(*n, dst->nrows());
DCHECK_EQ(dst->stride(), sizeof(Slice));
if (PREDICT_FALSE(*n == 0 || cur_idx_ >= num_elems_)) {
*n = 0;
return Status::OK();
}
size_t max_fetch = std::min(*n, static_cast<size_t>(num_elems_ - cur_idx_));
Slice *out = reinterpret_cast<Slice*>(dst->data());
for (size_t i = 0; i < max_fetch; i++, out++, cur_idx_++) {
Slice elem(string_at_index(cur_idx_));
c(i, elem, out);
}
*n = max_fetch;
return Status::OK();
}
Status BinaryPlainBlockDecoder::CopyNextValues(size_t* n, ColumnDataView* dst) {
dst->memory()->RetainReference(block_);
return HandleBatch(n, dst, [&](size_t /*i*/, Slice elem, Slice* out) {
*out = elem;
});
}
Status BinaryPlainBlockDecoder::CopyNextAndEval(size_t* n,
ColumnMaterializationContext* ctx,
SelectionVectorView* sel,
ColumnDataView* dst) {
bool retain_block = false;
ctx->SetDecoderEvalSupported();
Status s = HandleBatch(n, dst, [&](size_t i, Slice elem, Slice* out) {
if (!sel->TestBit(i)) {
return;
}
if (ctx->pred()->EvaluateCell<BINARY>(static_cast<const void*>(&elem))) {
*out = elem;
retain_block = true;
} else {
sel->ClearBit(i);
}
});
if (PREDICT_TRUE(s.ok() && retain_block)) {
dst->memory()->RetainReference(block_);
}
return s;
}
} // namespace cfile
} // namespace kudu