blob: 099afc79009da6e0f464a44a40bb85b82a45a959 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef ENCODING_BITPACK_DECODER_H
#define ENCODING_BITPACK_DECODER_H
#include <vector>
#include "common/allocator/alloc_base.h"
#include "encoder.h"
#include "encoding/encode_utils.h"
#include "encoding/intpacker.h"
namespace storage {
class BitPackDecoder {
private:
uint32_t length_;
uint32_t bit_width_;
int bitpacking_num_;
bool is_length_and_bitwidth_readed_;
int current_count_;
common::ByteStream byte_cache_;
int64_t *current_buffer_;
IntPacker *packer_;
uint8_t *tmp_buf_;
public:
BitPackDecoder()
: current_count_(0),
byte_cache_(1024, common::MOD_DECODER_OBJ),
current_buffer_(nullptr),
packer_(nullptr),
tmp_buf_(nullptr) {}
~BitPackDecoder() { destroy(); }
void init() {
packer_ = nullptr;
is_length_and_bitwidth_readed_ = false;
length_ = 0;
bit_width_ = 0;
bitpacking_num_ = 0;
current_count_ = 0;
}
bool has_next(common::ByteStream &buffer) {
if (current_count_ > 0 || buffer.remaining_size() > 0 ||
has_next_package()) {
return true;
}
return false;
}
bool has_next_package() {
return current_count_ > 0 || byte_cache_.remaining_size() > 0;
}
int64_t read_int(common::ByteStream &buffer) {
if (!is_length_and_bitwidth_readed_) {
// start to reader a new rle+bit-packing pattern
read_length_and_bitwidth(buffer);
}
if (current_count_ == 0) {
uint8_t header;
int ret = common::E_OK;
if (RET_FAIL(
common::SerializationUtil::read_ui8(header, byte_cache_))) {
return ret;
}
call_read_bit_packing_buffer(header);
}
--current_count_;
int64_t result = current_buffer_[bitpacking_num_ - current_count_ - 1];
if (!has_next_package()) {
is_length_and_bitwidth_readed_ = false;
}
return result;
}
int call_read_bit_packing_buffer(uint8_t header) {
int bit_packed_group_count = (int)(header >> 1);
// in last bit-packing group, there may be some useless value,
// lastBitPackedNum indicates how many values is useful
uint8_t last_bit_packed_num;
int ret = common::E_OK;
if (RET_FAIL(common::SerializationUtil::read_ui8(last_bit_packed_num,
byte_cache_))) {
return ret;
}
if (bit_packed_group_count > 0) {
current_count_ =
(bit_packed_group_count - 1) * 8 + last_bit_packed_num;
bitpacking_num_ = current_count_;
} else {
printf(
"tsfile-encoding IntRleDecoder: bit_packed_group_count %d, "
"smaller "
"than 1",
bit_packed_group_count);
}
read_bit_packing_buffer(bit_packed_group_count, last_bit_packed_num);
return ret;
}
void read_bit_packing_buffer(int bit_packed_group_count,
int last_bit_packed_num) {
if (current_buffer_ != nullptr) {
delete[] current_buffer_;
}
current_buffer_ = new int64_t[bit_packed_group_count * 8];
unsigned char bytes[bit_packed_group_count * bit_width_];
int bytes_to_read = bit_packed_group_count * bit_width_;
if (bytes_to_read > (int)byte_cache_.remaining_size()) {
bytes_to_read = byte_cache_.remaining_size();
}
for (int i = 0; i < bytes_to_read; i++) {
common::SerializationUtil::read_ui8(bytes[i], byte_cache_);
}
// save all int values in currentBuffer
packer_->unpack_all_values(
bytes, bytes_to_read,
current_buffer_); // decode from bytes, save in currentBuffer
}
int read_length_and_bitwidth(common::ByteStream &buffer) {
int ret = common::E_OK;
if (RET_FAIL(
common::SerializationUtil::read_var_uint(length_, buffer))) {
return common::E_PARTIAL_READ;
} else {
tmp_buf_ =
(uint8_t *)common::mem_alloc(length_, common::MOD_DECODER_OBJ);
if (tmp_buf_ == nullptr) {
return common::E_OOM;
}
uint32_t ret_read_len = 0;
if (RET_FAIL(buffer.read_buf((uint8_t *)tmp_buf_, length_,
ret_read_len))) {
return ret;
} else if (length_ != ret_read_len) {
ret = common::E_PARTIAL_READ;
}
byte_cache_.wrap_from((char *)tmp_buf_, length_);
is_length_and_bitwidth_readed_ = true;
uint8_t tmp_bit_width;
common::SerializationUtil::read_ui8(tmp_bit_width, byte_cache_);
bit_width_ = tmp_bit_width;
init_packer();
}
return ret;
}
void init_packer() { packer_ = new IntPacker(bit_width_); }
void destroy() { /* do nothing for BitpackEncoder */
if (packer_) {
delete (packer_);
}
if (current_buffer_) {
delete[] current_buffer_;
}
if (tmp_buf_) {
common::mem_free(tmp_buf_);
}
}
void reset() {
current_count_ = 0;
is_length_and_bitwidth_readed_ = false;
bitpacking_num_ = 0;
}
};
} // end namespace storage
#endif // ENCODING_BITPACK_ENCODER_H