blob: 033ca5cbdd1d848567d6b68eb9a1c897a705ca3c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "level_decoder.h"
#include <gen_cpp/parquet_types.h>
#include <algorithm>
#include "util/bit_stream_utils.inline.h"
#include "util/bit_util.h"
#include "util/coding.h"
#include "vec/exec/format/parquet/parquet_common.h"
static constexpr size_t V1_LEVEL_SIZE = 4;
#include "common/cast_set.h"
#include "common/compile_check_begin.h"
doris::Status doris::vectorized::LevelDecoder::init(doris::Slice* slice,
tparquet::Encoding::type encoding,
doris::vectorized::level_t max_level,
uint32_t num_levels) {
_encoding = encoding;
_bit_width = cast_set<level_t>(BitUtil::log2(max_level + 1));
_max_level = max_level;
_num_levels = num_levels;
switch (encoding) {
case tparquet::Encoding::RLE: {
if (slice->size < V1_LEVEL_SIZE) {
return Status::Corruption("Wrong parquet level format");
}
uint8_t* data = (uint8_t*)slice->data;
uint32_t num_bytes = decode_fixed32_le(data);
if (num_bytes > slice->size - V1_LEVEL_SIZE) {
return Status::Corruption("Wrong parquet level format");
}
_rle_decoder = RleDecoder<level_t>(data + V1_LEVEL_SIZE, num_bytes, _bit_width);
slice->data += V1_LEVEL_SIZE + num_bytes;
slice->size -= V1_LEVEL_SIZE + num_bytes;
break;
}
case tparquet::Encoding::BIT_PACKED: {
uint32_t num_bits = num_levels * _bit_width;
uint32_t num_bytes = BitUtil::RoundUpNumBytes(num_bits);
if (num_bytes > slice->size) {
return Status::Corruption("Wrong parquet level format");
}
_bit_packed_decoder = BitReader((uint8_t*)slice->data, num_bytes);
slice->data += num_bytes;
slice->size -= num_bytes;
break;
}
default:
return Status::IOError("Unsupported encoding for parquet level");
}
return Status::OK();
}
doris::Status doris::vectorized::LevelDecoder::init_v2(const doris::Slice& levels,
doris::vectorized::level_t max_level,
uint32_t num_levels) {
_encoding = tparquet::Encoding::RLE;
_bit_width = cast_set<level_t>(BitUtil::log2(max_level + 1));
_max_level = max_level;
_num_levels = num_levels;
size_t byte_length = levels.size;
_rle_decoder =
RleDecoder<level_t>((uint8_t*)levels.data, cast_set<int>(byte_length), _bit_width);
return Status::OK();
}
size_t doris::vectorized::LevelDecoder::get_levels(doris::vectorized::level_t* levels, size_t n) {
// toto template.
if (_encoding == tparquet::Encoding::RLE) {
n = std::min((size_t)_num_levels, n);
auto num_decoded = _rle_decoder.get_values(levels, n);
_num_levels -= num_decoded;
return num_decoded;
} else if (_encoding == tparquet::Encoding::BIT_PACKED) {
n = std::min((size_t)_num_levels, n);
for (size_t i = 0; i < n; ++i) {
if (!_bit_packed_decoder.GetValue(_bit_width, &levels[i])) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"Failed to decode BIT_PACKED levels");
}
}
_num_levels -= n;
return n;
}
return 0;
}
#include "common/compile_check_end.h"