blob: 7b98cc0c2b3d53745d1ede29a08eb2d41cddf8d5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef ENCODING_INT64RLE_DECODER_H
#define ENCODING_INT64RLE_DECODER_H
#include <vector>
#include "common/allocator/alloc_base.h"
#include "decoder.h"
#include "encoder.h"
#include "encoding/encode_utils.h"
#include "encoding/int64_packer.h"
namespace storage {
class Int64RleDecoder : public Decoder {
private:
uint32_t length_;
uint32_t bit_width_;
int bitpacking_num_;
bool is_length_and_bitwidth_readed_;
int current_count_;
common::ByteStream byte_cache_;
int64_t *current_buffer_;
Int64Packer *packer_;
uint8_t *tmp_buf_;
public:
Int64RleDecoder()
: length_(0),
bit_width_(0),
bitpacking_num_(0),
is_length_and_bitwidth_readed_(false),
current_count_(0),
byte_cache_(1024, common::MOD_DECODER_OBJ),
current_buffer_(nullptr),
packer_(nullptr),
tmp_buf_(nullptr) {}
~Int64RleDecoder() override { destroy(); }
bool has_remaining(const common::ByteStream &buffer) override {
return buffer.has_remaining() || has_next_package();
}
int read_boolean(bool &ret_value, common::ByteStream &in) override {
return common::E_TYPE_NOT_MATCH;
}
int read_int32(int32_t &ret_value, common::ByteStream &in) override {
return common::E_TYPE_NOT_MATCH;
}
int read_int64(int64_t &ret_value, common::ByteStream &in) override {
ret_value = read_int(in);
return common::E_OK;
}
int read_float(float &ret_value, common::ByteStream &in) override {
return common::E_TYPE_NOT_MATCH;
}
int read_double(double &ret_value, common::ByteStream &in) override {
return common::E_TYPE_NOT_MATCH;
}
int read_String(common::String &ret_value, common::PageArena &pa,
common::ByteStream &in) override {
return common::E_TYPE_NOT_MATCH;
}
void init() {
packer_ = nullptr;
is_length_and_bitwidth_readed_ = false;
length_ = 0;
bit_width_ = 0;
bitpacking_num_ = 0;
current_count_ = 0;
}
bool has_next(common::ByteStream &buffer) {
if (current_count_ > 0 || buffer.remaining_size() > 0 ||
has_next_package()) {
return true;
}
return false;
}
bool has_next_package() {
return current_count_ > 0 || byte_cache_.remaining_size() > 0;
}
int64_t read_int(common::ByteStream &buffer) {
if (!is_length_and_bitwidth_readed_) {
// start to reader a new rle+bit-packing pattern
read_length_and_bitwidth(buffer);
}
if (current_count_ == 0) {
uint8_t header;
int ret = common::E_OK;
if (RET_FAIL(
common::SerializationUtil::read_ui8(header, byte_cache_))) {
return ret;
}
call_read_bit_packing_buffer(header);
}
--current_count_;
int64_t result = current_buffer_[bitpacking_num_ - current_count_ - 1];
if (!has_next_package()) {
is_length_and_bitwidth_readed_ = false;
}
return result;
}
int call_read_bit_packing_buffer(uint8_t header) {
int bit_packed_group_count = (int)(header >> 1);
// in last bit-packing group, there may be some useless value,
// lastBitPackedNum indicates how many values is useful
uint8_t last_bit_packed_num;
int ret = common::E_OK;
if (RET_FAIL(common::SerializationUtil::read_ui8(last_bit_packed_num,
byte_cache_))) {
return ret;
}
if (bit_packed_group_count > 0) {
current_count_ =
(bit_packed_group_count - 1) * 8 + last_bit_packed_num;
bitpacking_num_ = current_count_;
} else {
printf(
"tsfile-encoding IntRleDecoder: bit_packed_group_count %d, "
"smaller "
"than 1",
bit_packed_group_count);
}
read_bit_packing_buffer(bit_packed_group_count, last_bit_packed_num);
return ret;
}
void read_bit_packing_buffer(int bit_packed_group_count,
int last_bit_packed_num) {
if (current_buffer_ != nullptr) {
delete[] current_buffer_;
}
current_buffer_ = new int64_t[bit_packed_group_count * 8];
int bytes_to_read = bit_packed_group_count * bit_width_;
if (bytes_to_read > (int)byte_cache_.remaining_size()) {
bytes_to_read = byte_cache_.remaining_size();
}
std::vector<unsigned char> bytes(bytes_to_read);
for (int i = 0; i < bytes_to_read; i++) {
common::SerializationUtil::read_ui8(bytes[i], byte_cache_);
}
// save all int values in currentBuffer
packer_->unpack_all_values(
bytes.data(), bytes_to_read,
current_buffer_); // decode from bytes, save in currentBuffer
}
int read_length_and_bitwidth(common::ByteStream &buffer) {
int ret = common::E_OK;
if (RET_FAIL(
common::SerializationUtil::read_var_uint(length_, buffer))) {
return common::E_PARTIAL_READ;
} else {
tmp_buf_ =
(uint8_t *)common::mem_alloc(length_, common::MOD_DECODER_OBJ);
if (tmp_buf_ == nullptr) {
return common::E_OOM;
}
uint32_t ret_read_len = 0;
if (RET_FAIL(buffer.read_buf((uint8_t *)tmp_buf_, length_,
ret_read_len))) {
return ret;
} else if (length_ != ret_read_len) {
ret = common::E_PARTIAL_READ;
}
byte_cache_.wrap_from((char *)tmp_buf_, length_);
is_length_and_bitwidth_readed_ = true;
uint8_t tmp_bit_width;
common::SerializationUtil::read_ui8(tmp_bit_width, byte_cache_);
bit_width_ = tmp_bit_width;
init_packer();
}
return ret;
}
void init_packer() { packer_ = new Int64Packer(bit_width_); }
void destroy() { /* do nothing for BitpackEncoder */
if (packer_) {
delete (packer_);
packer_ = nullptr;
}
if (current_buffer_) {
delete[] current_buffer_;
current_buffer_ = nullptr;
}
if (tmp_buf_) {
common::mem_free(tmp_buf_);
tmp_buf_ = nullptr;
}
}
void reset() override {
length_ = 0;
bit_width_ = 0;
bitpacking_num_ = 0;
is_length_and_bitwidth_readed_ = false;
current_count_ = 0;
if (current_buffer_) {
delete[] current_buffer_;
current_buffer_ = nullptr;
}
if (packer_) {
delete (packer_);
packer_ = nullptr;
}
if (tmp_buf_) {
common::mem_free(tmp_buf_);
tmp_buf_ = nullptr;
}
}
};
} // end namespace storage
#endif // ENCODING_BITPACK_ENCODER_H