blob: 96803a7d72fd6f74ccc9fdb7aae27599ea65c3ba [file] [log] [blame]
///this script to the Apache Software Foundation (ASF) under one
/// or more contributor license agreements. See the NOTICE file
/// distributed with this work for additional information
/// regarding copyright ownership. The ASF licenses this file
/// to you under the Apache License, Version 2.0 (the
/// "License"); you may not use this file except in compliance
/// with the License. You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing,
/// software distributed under the License is distributed on an
/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
/// KIND, either express or implied. See the License for the
/// specific language governing permissions and limitations
/// under the License.
#include "exec/parquet/parquet-delta-decoder.h"
#include "util/bit-stream-utils.inline.h"
using strings::Substitute;
namespace impala {
template<typename INT_T>
Status ParquetDeltaDecoder<INT_T>::NewPage(const uint8_t* input_buffer,
int input_buffer_len) {
reader_.Reset(input_buffer, input_buffer_len);
/// Read the header.
if (!reader_.GetUleb128<std::size_t>(&block_size_in_values_)) {
return Status("Error reading block size.");
}
if (!reader_.GetUleb128<std::size_t>(&miniblocks_in_block_)) {
return Status("Error reading the number of miniblocks in a block.");
}
if (!reader_.GetUleb128<std::size_t>(&total_value_count_)) {
return Status("Error reading the number of total values.");
}
if (!reader_.GetZigZagInteger<INT_T>(&last_read_value_)) {
return Status("Error reading the first value.");
}
/// The number of values in a block must be a non-zero multiple of 128.
if (UNLIKELY(block_size_in_values_ % 128 != 0 || block_size_in_values_ == 0)) {
return Status(Substitute("The number of values in a block must be a non-zero "
"multiple of 128. Current value: $0.", block_size_in_values_));
}
/// The number of miniblocks in a block must be a divisor of the number of
/// values in a block.
if (UNLIKELY(block_size_in_values_ % miniblocks_in_block_ != 0)) {
return Status(Substitute("The number of miniblocks in a block must be a divisor of "
"the number of values in a block. Number of miniblocks in a block: $0, number "
"of values in a block: $1.", miniblocks_in_block_, block_size_in_values_));
}
miniblock_size_in_values_ = block_size_in_values_ / miniblocks_in_block_;
/// The number of values in a miniblock must be a multiple of 32.
if (UNLIKELY(miniblock_size_in_values_ % 32 != 0)) {
return Status(Substitute("The number of values in a miniblock must be a multiple "
"of 32. Current value: $0.", miniblock_size_in_values_));
}
bitwidths_.resize(miniblocks_in_block_);
input_values_remaining_ = total_value_count_;
num_buffered_values_ = 0;
next_buffered_value_index_ = 0;
/// If there are elements other than the first one, we read the first block header.
if (LIKELY(input_values_remaining_ > 1)) {
bool block_header_read = ReadBlockHeader();
if (UNLIKELY(!block_header_read)) {
return Status("First block header could not be read.");
}
}
initialized_ = true;
return Status::OK();
}
template<typename INT_T>
WARN_UNUSED_RESULT
int ParquetDeltaDecoder<INT_T>::NextValue(INT_T* value) {
return NextValues(1, reinterpret_cast<uint8_t*>(value), sizeof(INT_T));
}
template<typename INT_T>
WARN_UNUSED_RESULT
int ParquetDeltaDecoder<INT_T>::NextValues(int num_values, uint8_t* values,
std::size_t stride) {
return GetOrSkipNextValues<false, INT_T>(num_values, values, stride);
}
template<typename INT_T>
template <typename OutType>
int ParquetDeltaDecoder<INT_T>::NextValuesConverted(int num_values, uint8_t* values,
std::size_t stride) {
static_assert(std::is_same_v<int8_t, OutType>
|| std::is_same_v<int16_t, OutType>
|| std::is_same_v<int32_t, OutType>
|| std::is_same_v<int64_t, OutType>
|| std::is_same_v<uint8_t, OutType>
|| std::is_same_v<uint16_t, OutType>
|| std::is_same_v<uint32_t, OutType>
|| std::is_same_v<uint64_t, OutType>);
return GetOrSkipNextValues<false, OutType>(num_values, values, stride);
}
template<typename INT_T>
WARN_UNUSED_RESULT
int ParquetDeltaDecoder<INT_T>::SkipValues(int num_values) {
return GetOrSkipNextValues<true, INT_T>(num_values, nullptr, sizeof(INT_T));
}
template<typename INT_T>
template<bool SKIP, typename OutType>
int ParquetDeltaDecoder<INT_T>::GetOrSkipNextValues(const int num_values,
uint8_t* values, std::size_t stride) {
static_assert(!SKIP || std::is_same_v<INT_T, OutType>,
"In case of skipping values, the result type is not important.");
DCHECK(initialized_);
DCHECK_GE(stride, sizeof(OutType));
DCHECK_GE(num_values, 0);
int extracted_values = 0;
/// Handle the very first value in the input which is in the header.
if (UNLIKELY(input_values_remaining_ == total_value_count_)) {
values = GetOrSkipFirstValue<SKIP, OutType>(values, stride);
extracted_values++;
}
/// Get the values from the internal buffer.
std::pair<int, uint8_t*> buffer_result = GetOrSkipBufferedValues<SKIP, OutType>(
num_values - extracted_values, values, stride);
extracted_values += buffer_result.first;
values = buffer_result.second;
/// At this point, all the values remaining are in the input buffer as we have consumed
/// the internal buffer. We avoid extracting more values than available.
const int yet_to_extract = std::min<std::size_t>(num_values - extracted_values,
input_values_remaining_);
if (UNLIKELY(yet_to_extract == 0)) return extracted_values;
const int extracted_from_input = GetOrSkipFromInput<SKIP, OutType>(yet_to_extract,
values, stride);
if (UNLIKELY(extracted_from_input == -1)) return -1;
extracted_values += extracted_from_input;
return extracted_values;
}
template<typename INT_T>
bool ParquetDeltaDecoder<INT_T>::FillBuffer() {
const int values = ReadAndDecodeBatchFromInput<false, INT_T>(buffer_.data(),
sizeof(INT_T));
DCHECK(initialized_);
if (UNLIKELY(values <= 0)) return false;
num_buffered_values_ = values;
next_buffered_value_index_ = 0;
return true;
}
template<typename INT_T>
template<bool SKIP, typename OutType>
uint8_t* ParquetDeltaDecoder<INT_T>::GetOrSkipFirstValue(uint8_t* values, int stride) {
DCHECK(initialized_);
DCHECK_GE(stride, sizeof(OutType));
DCHECK(input_values_remaining_ == total_value_count_);
if (!SKIP) {
*reinterpret_cast<OutType*>(values) = last_read_value_;
values += stride;
}
input_values_remaining_--;
return values;
}
template<typename INT_T>
template<bool SKIP, typename OutType>
std::pair<int, uint8_t*> ParquetDeltaDecoder<INT_T>::GetOrSkipBufferedValues(
int num_values, uint8_t* values, std::size_t stride) {
DCHECK(initialized_);
DCHECK_GE(stride, sizeof(OutType));
DCHECK_GE(num_values, 0);
const int remaining_buffered_values =
num_buffered_values_ - next_buffered_value_index_;
/// We avoid extracting more values than we have.
const int num_to_extract = std::min(num_values, remaining_buffered_values);
/// Consume the remaining values in the buffer.
values = CopyOrSkipBufferedValues<SKIP, OutType>(num_to_extract, values, stride);
return std::make_pair(num_to_extract, values);
}
template<typename INT_T>
template<bool SKIP, typename OutType>
int ParquetDeltaDecoder<INT_T>::GetOrSkipFromInput(int num_values, uint8_t* values,
int stride) {
DCHECK(initialized_);
DCHECK_GE(stride, sizeof(OutType));
DCHECK_GE(num_values, 0);
DCHECK_LE(num_values, input_values_remaining_);
/// We extract fully loaded buffers the number of times we can.
const int full_buffers_to_extract = num_values / BATCH_SIZE;
for (int i = 0; i < full_buffers_to_extract; i++) {
const int num_values = ReadAndDecodeBatchFromInput<SKIP, OutType>(
reinterpret_cast<OutType*>(values), stride);
if (UNLIKELY(num_values != BATCH_SIZE)) return -1;
values += num_values * stride;
}
int extracted_values = full_buffers_to_extract * BATCH_SIZE;
/// Extract the remaining number of values.
const int remainder = num_values - extracted_values;
if (remainder == 0) return extracted_values;
bool buffer_filled = FillBuffer();
if (UNLIKELY(!buffer_filled)) return -1;
CopyOrSkipBufferedValues<SKIP, OutType>(remainder, values, stride);
extracted_values += remainder;
return extracted_values;
}
template <typename INT_T>
template <bool SKIP, typename OutType>
int ParquetDeltaDecoder<INT_T>::ReadAndDecodeBatchFromInput(OutType* __restrict__ out,
int stride) {
DCHECK(initialized_);
DCHECK_GE(stride, sizeof(OutType));
DCHECK_GE(input_values_remaining_, 0);
/// Check whether we need to initialise a new block or miniblock.
/// Before the very first read, `NewPage` has already read the first block header.
if (values_read_from_miniblock_ == miniblock_size_in_values_) {
values_read_from_miniblock_ = 0;
miniblocks_read_from_block_ += 1;
if (miniblocks_read_from_block_ == miniblocks_in_block_) {
bool block_header_read = ReadBlockHeader();
if (UNLIKELY(!block_header_read)) return -1;
}
}
const int bit_width = bitwidths_[miniblocks_read_from_block_];
int values_read = -1;
if (SKIP) {
/// Even if we skip we need to decode the values because the they are needed to
/// calculate the following values.
/// TODO: We only need the sum of the deltas, not the intermediate values, so we could
/// optimize this.
values_read = reader_.UnpackAndDeltaDecodeBatch<INT_T, INT_T>(bit_width,
&last_read_value_, min_delta_in_block_, BATCH_SIZE,
buffer_.data(), sizeof(INT_T));
} else {
values_read = reader_.UnpackAndDeltaDecodeBatch<OutType, INT_T>( bit_width,
&last_read_value_, min_delta_in_block_, BATCH_SIZE, out, stride);
}
/// Miniblocks always contain contain a multiple of BATCH_SIZE values or are
/// padded.
if (UNLIKELY(values_read != BATCH_SIZE)) return -1;
values_read_from_miniblock_ += values_read;
const int actual_values_read = std::min<std::size_t>(
values_read, input_values_remaining_);
input_values_remaining_ -= actual_values_read;
return actual_values_read;
}
template<typename INT_T>
bool ParquetDeltaDecoder<INT_T>::ReadBlockHeader() {
miniblocks_read_from_block_ = 0;
values_read_from_miniblock_ = 0;
bool min_delta_read = reader_.GetZigZagInteger<INT_T>(&min_delta_in_block_);
if (UNLIKELY(!min_delta_read)) return false;
for (std::size_t i = 0; i < miniblocks_in_block_; i++) {
bool bitwidths_read = reader_.GetBytes(1, &bitwidths_[i]);
if (UNLIKELY(!bitwidths_read)) return false;
if (UNLIKELY(bitwidths_[i] > sizeof(INT_T) * 8)) {
return false;
}
}
return true;
}
template<typename INT_T>
template<bool SKIP, typename OutType>
uint8_t* ParquetDeltaDecoder<INT_T>::CopyOrSkipBufferedValues(int num_values,
uint8_t* dest, int stride) {
DCHECK_GE(num_values, 0);
if (SKIP) {
SkipBufferedValues(num_values);
return nullptr;
}
return CopyBufferedValues<OutType>(num_values, dest, stride);
}
template<typename INT_T>
template <typename OutType>
uint8_t* ParquetDeltaDecoder<INT_T>::CopyBufferedValues(int num_values, uint8_t* dest,
int stride) {
DCHECK_GE(stride, sizeof(OutType));
DCHECK(num_buffered_values_ - next_buffered_value_index_ >= num_values);
for (int i = 0; i < num_values; i++) {
*reinterpret_cast<OutType*>(dest) = buffer_[next_buffered_value_index_];
next_buffered_value_index_++;
dest += stride;
}
return dest;
}
template<typename INT_T>
void ParquetDeltaDecoder<INT_T>::SkipBufferedValues(int num_values) {
DCHECK(num_buffered_values_ - next_buffered_value_index_ >= num_values);
next_buffered_value_index_ += num_values;
}
template class ParquetDeltaDecoder<int32_t>;
template class ParquetDeltaDecoder<int64_t>;
template int ParquetDeltaDecoder<int32_t>::NextValuesConverted<int8_t>(int num_values,
uint8_t* values, std::size_t stride);
template int ParquetDeltaDecoder<int32_t>::NextValuesConverted<int16_t>(int num_values,
uint8_t* values, std::size_t stride);
template int ParquetDeltaDecoder<int32_t>::NextValuesConverted<int32_t>(int num_values,
uint8_t* values, std::size_t stride);
template int ParquetDeltaDecoder<int32_t>::NextValuesConverted<int64_t>(int num_values,
uint8_t* values, std::size_t stride);
template int ParquetDeltaDecoder<int64_t>::NextValuesConverted<int64_t>(int num_values,
uint8_t* values, std::size_t stride);
} // namespace impala