blob: 716e08aef4b8dff8ef29c5840d5e875936b1661f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "parquet/column/levels.h"
#include <cstdint>
#include "parquet/util/rle-encoding.h"
namespace parquet {
LevelEncoder::LevelEncoder() {}
LevelEncoder::~LevelEncoder() {}
void LevelEncoder::Init(Encoding::type encoding, int16_t max_level,
int num_buffered_values, uint8_t* data, int data_size) {
bit_width_ = BitUtil::Log2(max_level + 1);
encoding_ = encoding;
switch (encoding) {
case Encoding::RLE: {
rle_encoder_.reset(new RleEncoder(data, data_size, bit_width_));
break;
}
case Encoding::BIT_PACKED: {
int num_bytes = BitUtil::Ceil(num_buffered_values * bit_width_, 8);
bit_packed_encoder_.reset(new BitWriter(data, num_bytes));
break;
}
default:
throw ParquetException("Unknown encoding type for levels.");
}
}
int LevelEncoder::MaxBufferSize(
Encoding::type encoding, int16_t max_level, int num_buffered_values) {
int bit_width = BitUtil::Log2(max_level + 1);
int num_bytes = 0;
switch (encoding) {
case Encoding::RLE: {
// TODO: Due to the way we currently check if the buffer is full enough,
// we need to have MinBufferSize as head room.
num_bytes = RleEncoder::MaxBufferSize(bit_width, num_buffered_values) +
RleEncoder::MinBufferSize(bit_width);
break;
}
case Encoding::BIT_PACKED: {
num_bytes = BitUtil::Ceil(num_buffered_values * bit_width, 8);
break;
}
default:
throw ParquetException("Unknown encoding type for levels.");
}
return num_bytes;
}
int LevelEncoder::Encode(int batch_size, const int16_t* levels) {
int num_encoded = 0;
if (!rle_encoder_ && !bit_packed_encoder_) {
throw ParquetException("Level encoders are not initialized.");
}
if (encoding_ == Encoding::RLE) {
for (int i = 0; i < batch_size; ++i) {
if (!rle_encoder_->Put(*(levels + i))) { break; }
++num_encoded;
}
rle_encoder_->Flush();
rle_length_ = rle_encoder_->len();
} else {
for (int i = 0; i < batch_size; ++i) {
if (!bit_packed_encoder_->PutValue(*(levels + i), bit_width_)) { break; }
++num_encoded;
}
bit_packed_encoder_->Flush();
}
return num_encoded;
}
LevelDecoder::LevelDecoder() : num_values_remaining_(0) {}
LevelDecoder::~LevelDecoder() {}
int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
int num_buffered_values, const uint8_t* data) {
int32_t num_bytes = 0;
encoding_ = encoding;
num_values_remaining_ = num_buffered_values;
bit_width_ = BitUtil::Log2(max_level + 1);
switch (encoding) {
case Encoding::RLE: {
num_bytes = *reinterpret_cast<const int32_t*>(data);
const uint8_t* decoder_data = data + sizeof(int32_t);
if (!rle_decoder_) {
rle_decoder_.reset(new RleDecoder(decoder_data, num_bytes, bit_width_));
} else {
rle_decoder_->Reset(decoder_data, num_bytes, bit_width_);
}
return sizeof(int32_t) + num_bytes;
}
case Encoding::BIT_PACKED: {
num_bytes = BitUtil::Ceil(num_buffered_values * bit_width_, 8);
if (!bit_packed_decoder_) {
bit_packed_decoder_.reset(new BitReader(data, num_bytes));
} else {
bit_packed_decoder_->Reset(data, num_bytes);
}
return num_bytes;
}
default:
throw ParquetException("Unknown encoding type for levels.");
}
return -1;
}
int LevelDecoder::Decode(int batch_size, int16_t* levels) {
int num_decoded = 0;
int num_values = std::min(num_values_remaining_, batch_size);
if (encoding_ == Encoding::RLE) {
num_decoded = rle_decoder_->GetBatch(levels, num_values);
} else {
num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values);
}
num_values_remaining_ -= num_decoded;
return num_decoded;
}
} // namespace parquet