blob: c79ed07db80777c6223926d1b96a140919bf08cc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/cfile/binary_prefix_block.h"
#include <algorithm>
#include <cstdint>
#include <cstring>
#include <ostream>
#include <string>
#include <vector>
#include <glog/logging.h>
#include "kudu/cfile/block_handle.h"
#include "kudu/cfile/cfile_util.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/fastmem.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/coding.h"
#include "kudu/util/coding-inl.h"
#include "kudu/util/group_varint-inl.h"
#include "kudu/util/hexdump.h"
#include "kudu/util/logging.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/slice.h"
namespace kudu {
namespace cfile {
using kudu::coding::AppendGroupVarInt32;
using std::string;
using std::vector;
using strings::Substitute;
////////////////////////////////////////////////////////////
// Utility code used by both encoding and decoding
////////////////////////////////////////////////////////////
static const uint8_t *DecodeEntryLengths(
const uint8_t *ptr, const uint8_t *limit,
uint32_t *shared, uint32_t *non_shared) {
if ((ptr = GetVarint32Ptr(ptr, limit, shared)) == nullptr) return nullptr;
if ((ptr = GetVarint32Ptr(ptr, limit, non_shared)) == nullptr) return nullptr;
if (limit - ptr < *non_shared) {
return nullptr;
}
return ptr;
}
////////////////////////////////////////////////////////////
// StringPrefixBlockBuilder encoding
////////////////////////////////////////////////////////////
BinaryPrefixBlockBuilder::BinaryPrefixBlockBuilder(const WriterOptions *options)
: val_count_(0),
vals_since_restart_(0),
finished_(false),
options_(options) {
Reset();
}
void BinaryPrefixBlockBuilder::Reset() {
finished_ = false;
val_count_ = 0;
vals_since_restart_ = 0;
buffer_.clear();
buffer_.reserve(options_->storage_attributes.cfile_block_size);
restarts_.clear();
last_val_.clear();
}
bool BinaryPrefixBlockBuilder::IsBlockFull() const {
// TODO(todd): take restarts size into account
return buffer_.size() > options_->storage_attributes.cfile_block_size;
}
void BinaryPrefixBlockBuilder::Finish(rowid_t ordinal_pos, vector<Slice>* slices) {
CHECK(!finished_) << "already finished";
header_buf_.clear();
AppendGroupVarInt32(&header_buf_, val_count_, ordinal_pos,
options_->block_restart_interval, 0);
// Serialize the restart points.
// Note that the values stored in restarts_ are relative to the
// start of the *buffer*, which is not the same as the start of
// the block. So, we must subtract the header offset from each.
buffer_.reserve(buffer_.size()
+ restarts_.size() * sizeof(uint32_t) // the data
+ sizeof(uint32_t)); // the restart count);
for (uint32_t restart : restarts_) {
// The encoded offsets are relative to the start of the block,
// inclusive of the header.
InlinePutFixed32(&buffer_, restart + header_buf_.size());
}
InlinePutFixed32(&buffer_, restarts_.size());
finished_ = true;
*slices = { Slice(header_buf_), Slice(buffer_) };
}
int BinaryPrefixBlockBuilder::Add(const uint8_t *vals, size_t count) {
DCHECK_GT(count, 0);
DCHECK(!finished_);
DCHECK_LE(vals_since_restart_, options_->block_restart_interval);
int added = 0;
const Slice* slices = reinterpret_cast<const Slice*>(vals);
Slice prev_val(last_val_);
while (!IsBlockFull() && added < count) {
const Slice val = slices[added];
int old_size = buffer_.size();
buffer_.resize(old_size + 5 * 2 + val.size());
uint8_t* dst_p = &buffer_[old_size];
size_t shared = 0;
if (vals_since_restart_ < options_->block_restart_interval) {
// See how much sharing to do with previous string
shared = CommonPrefixLength(prev_val, val);
} else {
// Restart compression
restarts_.push_back(old_size);
vals_since_restart_ = 0;
}
const size_t non_shared = val.size() - shared;
// Add "<shared><non_shared>" to buffer_
dst_p = InlineEncodeVarint32(dst_p, shared);
dst_p = InlineEncodeVarint32(dst_p, non_shared);
// Add string delta to buffer_
memcpy(dst_p, val.data() + shared, non_shared);
dst_p += non_shared;
// Chop off the extra size on the end of the buffer.
buffer_.resize(dst_p - &buffer_[0]);
// Update state
prev_val = val;
added++;
vals_since_restart_++;
}
last_val_.assign_copy(prev_val.data(), prev_val.size());
val_count_ += added;
return added;
}
size_t BinaryPrefixBlockBuilder::Count() const {
return val_count_;
}
Status BinaryPrefixBlockBuilder::GetFirstKey(void *key) const {
if (val_count_ == 0) {
return Status::NotFound("no keys in data block");
}
const uint8_t *p = &buffer_[0];
uint32_t shared;
uint32_t non_shared;
p = DecodeEntryLengths(p, &buffer_[buffer_.size()], &shared, &non_shared);
if (p == nullptr) {
return Status::Corruption("Could not decode first entry in string block");
}
CHECK(shared == 0) << "first entry in string block had a non-zero 'shared': "
<< shared;
*reinterpret_cast<Slice *>(key) = Slice(p, non_shared);
return Status::OK();
}
Status BinaryPrefixBlockBuilder::GetLastKey(void *key) const {
if (val_count_ == 0) {
return Status::NotFound("no keys in data block");
}
*reinterpret_cast<Slice *>(key) = Slice(last_val_);
return Status::OK();
}
////////////////////////////////////////////////////////////
// StringPrefixBlockDecoder
////////////////////////////////////////////////////////////
BinaryPrefixBlockDecoder::BinaryPrefixBlockDecoder(scoped_refptr<BlockHandle> block)
: block_(std::move(block)),
data_(block_->data()),
parsed_(false),
num_elems_(0),
ordinal_pos_base_(0),
num_restarts_(0),
restarts_(nullptr),
data_start_(nullptr),
cur_idx_(0),
next_ptr_(nullptr) {
}
Status BinaryPrefixBlockDecoder::ParseHeader() {
// First parse the actual header.
uint32_t unused;
// Make sure the Slice we are referring to is at least the size of the
// minimum possible header
if (PREDICT_FALSE(data_.size() < kMinHeaderSize)) {
return Status::Corruption(
strings::Substitute("not enough bytes for header: string block header "
"size ($0) less than minimum possible header length ($1)",
data_.size(), kMinHeaderSize));
// TODO include hexdump
}
// Make sure the actual size of the group varints in the Slice we are
// referring to is as big as it claims to be
size_t header_size = coding::DecodeGroupVarInt32_GetGroupSize(data_.data());
if (PREDICT_FALSE(data_.size() < header_size)) {
return Status::Corruption(
strings::Substitute("string block header size ($0) less than length "
"from in header ($1)", data_.size(), header_size));
// TODO include hexdump
}
// We should have enough space in the Slice to decode the group varints
// safely now
data_start_ =
coding::DecodeGroupVarInt32_SlowButSafe(
data_.data(),
&num_elems_, &ordinal_pos_base_,
&restart_interval_, &unused);
// Then the footer, which points us to the restarts array
num_restarts_ = DecodeFixed32(
data_.data() + data_.size() - sizeof(uint32_t));
// sanity check the restarts size
uint32_t restarts_size = num_restarts_ * sizeof(uint32_t);
if (restarts_size > data_.size()) {
return Status::Corruption(
StringPrintf("restart count %d too big to fit in block size %d",
num_restarts_, static_cast<int>(data_.size())));
}
// TODO: check relationship between num_elems, num_restarts_,
// and restart_interval_
restarts_ = reinterpret_cast<const uint32_t *>(
data_.data() + data_.size()
- sizeof(uint32_t) // rewind before the restart length
- restarts_size);
SeekToStart();
parsed_ = true;
return Status::OK();
}
void BinaryPrefixBlockDecoder::SeekToStart() {
SeekToRestartPoint(0);
}
void BinaryPrefixBlockDecoder::SeekToPositionInBlock(uint pos) {
if (PREDICT_FALSE(num_elems_ == 0)) {
DCHECK_EQ(0, pos);
return;
}
DCHECK_LE(pos, num_elems_);
// Seeking past the last element is valid -- it just sets us to the
// same state as if we have read all of the elements.
if (pos == num_elems_) {
cur_idx_ = pos;
return;
}
int target_restart = pos/restart_interval_;
SeekToRestartPoint(target_restart);
// Seek forward to the right index
// TODO: Seek calls should return a Status
CHECK_OK(SkipForward(pos - cur_idx_));
DCHECK_EQ(cur_idx_, pos);
}
// Get the pointer to the entry corresponding to the given restart
// point. Note that the restart points in the file do not include
// the '0' restart point, since that is simply the beginning of
// the data and hence a waste of space. So, 'idx' may range from
// 0 (first record) through num_restarts_ (exclusive).
const uint8_t * BinaryPrefixBlockDecoder::GetRestartPoint(uint32_t idx) const {
DCHECK_LE(idx, num_restarts_);
if (PREDICT_TRUE(idx > 0)) {
return data_.data() + restarts_[idx - 1];
} else {
return data_start_;
}
}
// Note: see GetRestartPoint() for 'idx' semantics
void BinaryPrefixBlockDecoder::SeekToRestartPoint(uint32_t idx) {
if (PREDICT_FALSE(num_elems_ == 0)) {
DCHECK_EQ(0, idx);
return;
}
next_ptr_ = GetRestartPoint(idx);
cur_idx_ = idx * restart_interval_;
CHECK_OK(ParseNextValue()); // TODO: handle corrupted blocks
}
Status BinaryPrefixBlockDecoder::SeekAtOrAfterValue(const void *value_void,
bool *exact_match) {
DCHECK(value_void != nullptr);
const Slice &target = *reinterpret_cast<const Slice *>(value_void);
// Binary search in restart array to find the first restart point
// with a key >= target
int32_t left = 0;
int32_t right = num_restarts_;
while (left < right) {
uint32_t mid = (left + right + 1) / 2;
const uint8_t *entry = GetRestartPoint(mid);
uint32_t shared, non_shared;
const uint8_t *key_ptr = DecodeEntryLengths(entry, &shared, &non_shared);
if (key_ptr == nullptr || (shared != 0)) {
string err =
StringPrintf("bad entry restart=%d shared=%d\n", mid, shared) +
HexDump(Slice(entry, 16));
return Status::Corruption(err);
}
const Slice mid_key(key_ptr, non_shared);
if (mid_key < target) {
// Key at "mid" is smaller than "target". Therefore all
// blocks before "mid" are uninteresting.
left = mid;
} else {
// Key at "mid" is >= "target". Therefore all blocks at or
// after "mid" are uninteresting.
right = mid - 1;
}
}
// Linear search (within restart block) for first key >= target
SeekToRestartPoint(left);
while (true) {
#ifndef NDEBUG
VLOG(3) << "loop iter:\n"
<< "cur_idx = " << cur_idx_ << "\n"
<< "target =" << KUDU_REDACT(target.ToDebugString()) << "\n"
<< "cur_val_=" << KUDU_REDACT(Slice(cur_val_).ToDebugString());
#endif
int cmp = Slice(cur_val_).compare(target);
if (cmp >= 0) {
*exact_match = (cmp == 0);
return Status::OK();
}
RETURN_NOT_OK(ParseNextValue());
cur_idx_++;
}
}
Status BinaryPrefixBlockDecoder::CopyNextValues(size_t *n, ColumnDataView *dst) {
DCHECK(parsed_);
CHECK_EQ(dst->type_info()->physical_type(), BINARY);
DCHECK_EQ(dst->stride(), sizeof(Slice));
DCHECK_LE(*n, dst->nrows());
Arena *out_arena = dst->arena();
Slice *out = reinterpret_cast<Slice *>(dst->data());
if (PREDICT_FALSE(*n == 0 || cur_idx_ >= num_elems_)) {
*n = 0;
return Status::OK();
}
size_t i = 0;
size_t max_fetch = std::min(*n, static_cast<size_t>(num_elems_ - cur_idx_));
// Grab the first row, which we've cached from the last call or seek.
const uint8_t *out_data = out_arena->AddSlice(cur_val_);
if (PREDICT_FALSE(out_data == nullptr)) {
return Status::IOError(
"Out of memory",
StringPrintf("Failed to allocate %d bytes in output arena",
static_cast<int>(cur_val_.size())));
}
// Put a slice to it in the output array
Slice prev_val(out_data, cur_val_.size());
*out++ = prev_val;
i++;
cur_idx_++;
#ifndef NDEBUG
cur_val_.assign_copy("INVALID");
#endif
// Now iterate pulling more rows from the block, decoding relative
// to the previous value.
for (; i < max_fetch; i++) {
Slice copied;
RETURN_NOT_OK(ParseNextIntoArena(prev_val, dst->arena(), &copied));
*out++ = copied;
prev_val = copied;
cur_idx_++;
}
// Fetch the next value to be returned, using the last value we fetched
// for the delta.
cur_val_.assign_copy(prev_val.data(), prev_val.size());
if (cur_idx_ < num_elems_) {
RETURN_NOT_OK(ParseNextValue());
} else {
next_ptr_ = nullptr;
}
*n = i;
return Status::OK();
}
// Decode the lengths pointed to by 'ptr', doing bounds checking.
//
// Returns a pointer to where the value itself starts.
// Returns NULL if the varints themselves, or the value that
// they prefix extend past the end of the block data.
const uint8_t *BinaryPrefixBlockDecoder::DecodeEntryLengths(
const uint8_t *ptr, uint32_t *shared, uint32_t *non_shared) const {
// data ends where the restart info begins
const uint8_t *limit = reinterpret_cast<const uint8_t *>(restarts_);
return kudu::cfile::DecodeEntryLengths(ptr, limit, shared, non_shared);
}
Status BinaryPrefixBlockDecoder::SkipForward(int n) {
DCHECK_LE(cur_idx_ + n, num_elems_) <<
"skip(" << n << ") curidx=" << cur_idx_
<< " num_elems=" << num_elems_;
// If we're seeking exactly to the end of the data, we don't
// need to actually prepare the next value (in fact, it would
// crash). So, short-circuit here.
if (PREDICT_FALSE(cur_idx_ + n == num_elems_)) {
cur_idx_ += n;
return Status::OK();
}
// Probably a faster way to implement this using restarts,
for (int i = 0; i < n; i++) {
RETURN_NOT_OK(ParseNextValue());
cur_idx_++;
}
return Status::OK();
}
Status BinaryPrefixBlockDecoder::CheckNextPtr() {
DCHECK(next_ptr_ != nullptr);
if (PREDICT_FALSE(next_ptr_ == reinterpret_cast<const uint8_t *>(restarts_))) {
DCHECK_EQ(cur_idx_, num_elems_ - 1);
return Status::NotFound("Trying to parse past end of array");
}
return Status::OK();
}
inline Status BinaryPrefixBlockDecoder::ParseNextIntoArena(Slice prev_val,
Arena *dst,
Slice *copied) {
RETURN_NOT_OK(CheckNextPtr());
uint32_t shared, non_shared;
const uint8_t *val_delta = DecodeEntryLengths(next_ptr_, &shared, &non_shared);
if (val_delta == nullptr) {
return Status::Corruption(
StringPrintf("Could not decode value length data at idx %d",
cur_idx_));
}
DCHECK_LE(shared, prev_val.size())
<< "Spcified longer shared amount than previous key length";
uint8_t *buf = reinterpret_cast<uint8_t *>(dst->AllocateBytes(non_shared + shared));
strings::memcpy_inlined(buf, prev_val.data(), shared);
strings::memcpy_inlined(buf + shared, val_delta, non_shared);
*copied = Slice(buf, non_shared + shared);
next_ptr_ = val_delta + non_shared;
return Status::OK();
}
// Parses the data pointed to by next_ptr_ and stores it in cur_val_
// Advances next_ptr_ to point to the following values.
// Does not modify cur_idx_
inline Status BinaryPrefixBlockDecoder::ParseNextValue() {
RETURN_NOT_OK(CheckNextPtr());
uint32_t shared, non_shared;
const uint8_t *val_delta = DecodeEntryLengths(next_ptr_, &shared, &non_shared);
if (val_delta == nullptr) {
return Status::Corruption(
StringPrintf("Could not decode value length data at idx %d",
cur_idx_));
}
// Chop the current key to the length that is shared with the next
// key, then append the delta portion.
DCHECK_LE(shared, cur_val_.size())
<< "Specified longer shared amount than previous key length";
cur_val_.resize(shared);
cur_val_.append(val_delta, non_shared);
DCHECK_EQ(cur_val_.size(), shared + non_shared);
next_ptr_ = val_delta + non_shared;
return Status::OK();
}
} // namespace cfile
} // namespace kudu