blob: 7baf806c9fc94ee1352573e1eddcb6598ab248a1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef KUDU_CFILE_RLE_BLOCK_H
#define KUDU_CFILE_RLE_BLOCK_H
#include <algorithm>
#include <string>
#include <vector>
#include "kudu/gutil/port.h"
#include "kudu/cfile/block_encodings.h"
#include "kudu/cfile/block_handle.h"
#include "kudu/cfile/cfile_util.h"
#include "kudu/common/columnblock.h"
#include "kudu/util/coding.h"
#include "kudu/util/coding-inl.h"
#include "kudu/util/hexdump.h"
#include "kudu/util/bit-stream-utils.inline.h"
#include "kudu/util/bitmap.h"
#include "kudu/util/rle-encoding.h"
namespace kudu {
namespace cfile {
struct WriterOptions;
enum {
kRleBitmapBlockHeaderSize = 8
};
//
// RLE encoder for the BOOL datatype: uses an RLE-encoded bitmap to
// represent a bool column.
//
class RleBitMapBlockBuilder final : public BlockBuilder {
public:
explicit RleBitMapBlockBuilder(const WriterOptions* options)
: encoder_(&buf_, 1),
options_(options) {
Reset();
}
virtual int Add(const uint8_t* vals, size_t count) OVERRIDE {
for (const uint8_t* val = vals;
val < vals + count;
++val) {
// TODO (perf) : doing this one bit a time is probably
// inefficient.
encoder_.Put(*val, 1);
}
count_ += count;
return count;
}
virtual bool IsBlockFull() const override {
return encoder_.len() > options_->storage_attributes.cfile_block_size;
}
virtual void Finish(rowid_t ordinal_pos, std::vector<Slice>* slices) OVERRIDE {
InlineEncodeFixed32(&buf_[0], count_);
InlineEncodeFixed32(&buf_[4], ordinal_pos);
encoder_.Flush();
*slices = { buf_ };
}
virtual void Reset() OVERRIDE {
count_ = 0;
encoder_.Clear();
encoder_.Reserve(kRleBitmapBlockHeaderSize, 0);
}
virtual size_t Count() const OVERRIDE {
return count_;
}
// TODO Implement this method
virtual Status GetFirstKey(void* key) const OVERRIDE {
return Status::NotSupported("BOOL keys not supported");
}
// TODO Implement this method
virtual Status GetLastKey(void* key) const OVERRIDE {
return Status::NotSupported("BOOL keys not supported");
}
private:
faststring buf_;
RleEncoder<bool> encoder_;
const WriterOptions* const options_;
size_t count_;
};
//
// RLE decoder for bool datatype
//
class RleBitMapBlockDecoder final : public BlockDecoder {
public:
explicit RleBitMapBlockDecoder(scoped_refptr<BlockHandle> block)
: block_(std::move(block)),
data_(block_->data()),
parsed_(false),
num_elems_(0),
ordinal_pos_base_(0),
cur_idx_(0) {
}
virtual Status ParseHeader() OVERRIDE {
CHECK(!parsed_);
if (data_.size() < kRleBitmapBlockHeaderSize) {
return Status::Corruption(
"not enough bytes for header in RleBitMapBlockDecoder");
}
num_elems_ = DecodeFixed32(&data_[0]);
ordinal_pos_base_ = DecodeFixed32(&data_[4]);
parsed_ = true;
rle_decoder_ = RleDecoder<bool>(data_.data() + kRleBitmapBlockHeaderSize,
data_.size() - kRleBitmapBlockHeaderSize, 1);
SeekToPositionInBlock(0);
return Status::OK();
}
virtual void SeekToPositionInBlock(uint pos) OVERRIDE {
CHECK(parsed_) << "Must call ParseHeader()";
DCHECK_LE(pos, num_elems_)
<< "Tried to seek to " << pos << " which is > number of elements ("
<< num_elems_ << ") in the block!";
// If the block is empty (e.g. the column is filled with nulls), there is no data to seek.
if (PREDICT_FALSE(num_elems_ == 0)) {
return;
}
if (cur_idx_ == pos) {
// No need to seek.
return;
} else if (cur_idx_ < pos) {
uint nskip = pos - cur_idx_;
rle_decoder_.Skip(nskip);
} else {
// This approach is also used by CFileReader to
// seek backwards in an RLE encoded block
rle_decoder_ = RleDecoder<bool>(data_.data() + kRleBitmapBlockHeaderSize,
data_.size() - kRleBitmapBlockHeaderSize, 1);
rle_decoder_.Skip(pos);
}
cur_idx_ = pos;
}
virtual Status CopyNextValues(size_t *n, ColumnDataView* dst) OVERRIDE {
DCHECK(parsed_);
DCHECK_LE(*n, dst->nrows());
DCHECK_EQ(dst->stride(), sizeof(bool));
if (PREDICT_FALSE(*n == 0 || cur_idx_ >= num_elems_)) {
*n = 0;
return Status::OK();
}
size_t bits_to_fetch = std::min(*n, static_cast<size_t>(num_elems_ - cur_idx_));
size_t remaining = bits_to_fetch;
uint8_t* data_ptr = dst->data();
// TODO : do this a word/byte at a time as opposed bit at a time
while (remaining > 0) {
bool result = rle_decoder_.Get(reinterpret_cast<bool*>(data_ptr));
DCHECK(result);
remaining--;
data_ptr++;
}
cur_idx_ += bits_to_fetch;
*n = bits_to_fetch;
return Status::OK();
}
virtual Status SeekAtOrAfterValue(const void *value,
bool *exact_match) OVERRIDE {
return Status::NotSupported("BOOL keys are not supported!");
}
virtual bool HasNext() const OVERRIDE { return cur_idx_ < num_elems_; }
virtual size_t Count() const OVERRIDE { return num_elems_; }
virtual size_t GetCurrentIndex() const OVERRIDE { return cur_idx_; }
virtual rowid_t GetFirstRowId() const OVERRIDE { return ordinal_pos_base_; }
private:
scoped_refptr<BlockHandle> block_;
Slice data_;
bool parsed_;
uint32_t num_elems_;
rowid_t ordinal_pos_base_;
uint32_t cur_idx_;
RleDecoder<bool> rle_decoder_;
};
//
// RLE builder for generic integer types. What is missing is some way
// to enforce that this can only be instantiated for INT types.
// TODO : consider if this can also be used for BOOL with only minor
// alterations
template <DataType IntType>
class RleIntBlockBuilder final : public BlockBuilder {
public:
explicit RleIntBlockBuilder(const WriterOptions* options)
: rle_encoder_(&buf_, kCppTypeSize * 8),
options_(options) {
Reset();
}
virtual bool IsBlockFull() const OVERRIDE {
return rle_encoder_.len() > options_->storage_attributes.cfile_block_size;
}
virtual int Add(const uint8_t* vals_void, size_t count) OVERRIDE {
DCHECK_EQ(reinterpret_cast<uintptr_t>(vals_void) & (alignof(CppType) - 1), 0)
<< "Pointer passed to Add() must be naturally-aligned";
const CppType* vals = reinterpret_cast<const CppType*>(vals_void);
if (PREDICT_FALSE(count_ == 0)) {
first_key_ = vals[0];
}
for (size_t i = 0; i < count; ++i) {
rle_encoder_.Put(vals[i], 1);
}
count_ += count;
if (count > 0) {
last_key_ = vals[count - 1];
}
return count;
}
virtual void Finish(rowid_t ordinal_pos, std::vector<Slice>* slices) OVERRIDE {
InlineEncodeFixed32(&buf_[0], count_);
InlineEncodeFixed32(&buf_[4], ordinal_pos);
rle_encoder_.Flush();
*slices = { Slice(buf_) };
}
virtual void Reset() OVERRIDE {
count_ = 0;
rle_encoder_.Clear();
rle_encoder_.Reserve(kRleBitmapBlockHeaderSize, 0);
}
virtual size_t Count() const OVERRIDE {
return count_;
}
virtual Status GetFirstKey(void* key) const OVERRIDE {
if (PREDICT_FALSE(count_ == 0)) {
return Status::NotFound("No keys in the block");
}
UnalignedStore<CppType>(key, first_key_);
return Status::OK();
}
virtual Status GetLastKey(void* key) const OVERRIDE {
if (PREDICT_FALSE(count_ == 0)) {
return Status::NotFound("No keys in the block");
}
UnalignedStore<CppType>(key, last_key_);
return Status::OK();
}
private:
typedef typename TypeTraits<IntType>::cpp_type CppType;
enum {
kCppTypeSize = TypeTraits<IntType>::size
};
CppType first_key_;
CppType last_key_;
faststring buf_;
size_t count_;
RleEncoder<CppType> rle_encoder_;
const WriterOptions* const options_;
};
//
// RLE decoder for generic integer types.
//
// TODO : as with the matching BlockBuilder above (see comments for
// that class), it may be be possible to re-use most of the
// code here for the BOOL type.
//
template <DataType IntType>
class RleIntBlockDecoder final : public BlockDecoder {
public:
explicit RleIntBlockDecoder(scoped_refptr<BlockHandle> block)
: block_(std::move(block)),
data_(block_->data()),
parsed_(false),
num_elems_(0),
ordinal_pos_base_(0),
cur_idx_(0) {
}
virtual Status ParseHeader() OVERRIDE {
CHECK(!parsed_);
if (data_.size() < kRleBitmapBlockHeaderSize) {
return Status::Corruption(
"not enough bytes for header in RleIntBlockDecoder");
}
num_elems_ = DecodeFixed32(&data_[0]);
ordinal_pos_base_ = DecodeFixed32(&data_[4]);
parsed_ = true;
rle_decoder_ = RleDecoder<CppType>(data_.data() + kRleBitmapBlockHeaderSize,
data_.size() - kRleBitmapBlockHeaderSize,
kCppTypeSize * 8);
SeekToPositionInBlock(0);
return Status::OK();
}
virtual void SeekToPositionInBlock(uint pos) OVERRIDE {
CHECK(parsed_) << "Must call ParseHeader()";
DCHECK_LE(pos, num_elems_)
<< "Tried to seek to " << pos << " which is > number of elements ("
<< num_elems_ << ") in the block!";
// If the block is empty (e.g. the column is filled with nulls), there is no data to seek.
if (PREDICT_FALSE(num_elems_ == 0)) {
return;
}
if (cur_idx_ == pos) {
// No need to seek.
return;
} else if (cur_idx_ < pos) {
uint nskip = pos - cur_idx_;
rle_decoder_.Skip(nskip);
} else {
rle_decoder_ = RleDecoder<CppType>(data_.data() + kRleBitmapBlockHeaderSize,
data_.size() - kRleBitmapBlockHeaderSize,
kCppTypeSize * 8);
rle_decoder_.Skip(pos);
}
cur_idx_ = pos;
}
virtual Status SeekAtOrAfterValue(const void *value_void, bool *exact_match) OVERRIDE {
// Currently using linear search as we do not check whether a
// mid-point of a buffer will fall on a literal or not.
//
// TODO (perf): make this faster by moving forward a 'run at a time'
// by perhaps pushing this loop down into RleDecoder itself
// TODO (perf): investigate placing pointers somewhere in either the
// header or the tail to speed up search.
SeekToPositionInBlock(0);
CppType target = UnalignedLoad<CppType>(value_void);
while (cur_idx_ < num_elems_) {
CppType cur_elem = 0;
if (!rle_decoder_.Get(&cur_elem)) {
break;
}
if (cur_elem == target) {
rle_decoder_.RewindOne();
*exact_match = true;
return Status::OK();
}
if (cur_elem > target) {
rle_decoder_.RewindOne();
*exact_match = false;
return Status::OK();
}
cur_idx_++;
}
return Status::NotFound("not in block");
}
virtual Status CopyNextValues(size_t *n, ColumnDataView *dst) OVERRIDE {
DCHECK(parsed_);
DCHECK_LE(*n, dst->nrows());
DCHECK_EQ(dst->stride(), sizeof(CppType));
if (PREDICT_FALSE(*n == 0 || cur_idx_ >= num_elems_)) {
*n = 0;
return Status::OK();
}
size_t to_fetch = std::min(*n, static_cast<size_t>(num_elems_ - cur_idx_));
size_t remaining = to_fetch;
uint8_t* data_ptr = dst->data();
while (remaining > 0) {
bool result = rle_decoder_.Get(reinterpret_cast<CppType*>(data_ptr));
DCHECK(result);
remaining--;
data_ptr += kCppTypeSize;
}
cur_idx_ += to_fetch;
*n = to_fetch;
return Status::OK();
}
Status CopyNextAndEval(size_t* n,
ColumnMaterializationContext* ctx,
SelectionVectorView* sel,
ColumnDataView* dst) override {
DCHECK(parsed_);
DCHECK_LE(*n, dst->nrows());
DCHECK_EQ(dst->stride(), sizeof(CppType));
ctx->SetDecoderEvalSupported();
if (PREDICT_FALSE(*n == 0 || cur_idx_ >= num_elems_)) {
*n = 0;
return Status::OK();
}
const size_t to_fetch = std::min(*n, static_cast<size_t>(num_elems_ - cur_idx_));
size_t remaining = to_fetch;
uint8_t* data_ptr = dst->data();
size_t row_offset = 0;
while (remaining > 0) {
CppType val = 0;
const size_t num_read = rle_decoder_.GetNextRun(&val, remaining);
DCHECK(num_read > 0);
DCHECK_LE(num_read, remaining);
if (ctx->pred()->EvaluateCell<IntType>(static_cast<const void*>(&val))) {
// Copy data for matching predicate
for (size_t row_idx = row_offset; row_idx < row_offset + num_read;
++row_idx, data_ptr += kCppTypeSize) {
// Skip copying if the row has already been cleared.
if (!sel->TestBit(row_idx)) {
continue;
}
*(reinterpret_cast<CppType *>(data_ptr)) = val;
}
} else {
// Mark that the rows will not be returned.
sel->ClearBits(num_read, row_offset);
data_ptr += num_read * kCppTypeSize;
}
remaining -= num_read;
row_offset += num_read;
}
cur_idx_ += to_fetch;
*n = to_fetch;
return Status::OK();
}
virtual bool HasNext() const OVERRIDE {
return cur_idx_ < num_elems_;
}
virtual size_t Count() const OVERRIDE {
return num_elems_;
}
virtual size_t GetCurrentIndex() const OVERRIDE {
return cur_idx_;
}
virtual rowid_t GetFirstRowId() const OVERRIDE {
return ordinal_pos_base_;
};
private:
typedef typename TypeTraits<IntType>::cpp_type CppType;
enum {
kCppTypeSize = TypeTraits<IntType>::size
};
scoped_refptr<BlockHandle> block_;
Slice data_;
bool parsed_;
uint32_t num_elems_;
rowid_t ordinal_pos_base_;
size_t cur_idx_;
RleDecoder<CppType> rle_decoder_;
};
} // namespace cfile
} // namespace kudu
#endif