blob: 109f48ea63cd88cf560a7765353d328cbace4383 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "parquet/encoding.h"
#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "arrow/array.h"
#include "arrow/array/builder_dict.h"
#include "arrow/stl_allocator.h"
#include "arrow/util/bit_stream_utils.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/byte_stream_split.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/hashing.h"
#include "arrow/util/logging.h"
#include "arrow/util/rle_encoding.h"
#include "arrow/util/ubsan.h"
#include "arrow/visitor_inline.h"
#include "parquet/exception.h"
#include "parquet/platform.h"
#include "parquet/schema.h"
#include "parquet/types.h"
using arrow::Status;
using arrow::VisitNullBitmapInline;
using arrow::internal::checked_cast;
template <typename T>
using ArrowPoolVector = std::vector<T, ::arrow::stl::allocator<T>>;
namespace parquet {
constexpr int64_t kInMemoryDefaultCapacity = 1024;
class EncoderImpl : virtual public Encoder {
public:
EncoderImpl(const ColumnDescriptor* descr, Encoding::type encoding, MemoryPool* pool)
: descr_(descr),
encoding_(encoding),
pool_(pool),
type_length_(descr ? descr->type_length() : -1) {}
Encoding::type encoding() const override { return encoding_; }
MemoryPool* memory_pool() const override { return pool_; }
protected:
// For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
const ColumnDescriptor* descr_;
const Encoding::type encoding_;
MemoryPool* pool_;
/// Type length from descr
int type_length_;
};
// ----------------------------------------------------------------------
// Plain encoder implementation
template <typename DType>
class PlainEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
public:
using T = typename DType::c_type;
explicit PlainEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
: EncoderImpl(descr, Encoding::PLAIN, pool), sink_(pool) {}
int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
std::shared_ptr<Buffer> FlushValues() override {
std::shared_ptr<Buffer> buffer;
PARQUET_THROW_NOT_OK(sink_.Finish(&buffer));
return buffer;
}
using TypedEncoder<DType>::Put;
void Put(const T* buffer, int num_values) override;
void Put(const arrow::Array& values) override;
void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) override {
PARQUET_ASSIGN_OR_THROW(
auto buffer, arrow::AllocateBuffer(num_values * sizeof(T), this->memory_pool()));
T* data = reinterpret_cast<T*>(buffer->mutable_data());
int num_valid_values = arrow::util::internal::SpacedCompress<T>(
src, num_values, valid_bits, valid_bits_offset, data);
Put(data, num_valid_values);
}
void UnsafePutByteArray(const void* data, uint32_t length) {
DCHECK(length == 0 || data != nullptr) << "Value ptr cannot be NULL";
sink_.UnsafeAppend(&length, sizeof(uint32_t));
sink_.UnsafeAppend(data, static_cast<int64_t>(length));
}
void Put(const ByteArray& val) {
// Write the result to the output stream
const int64_t increment = static_cast<int64_t>(val.len + sizeof(uint32_t));
if (ARROW_PREDICT_FALSE(sink_.length() + increment > sink_.capacity())) {
PARQUET_THROW_NOT_OK(sink_.Reserve(increment));
}
UnsafePutByteArray(val.ptr, val.len);
}
protected:
arrow::BufferBuilder sink_;
};
template <typename DType>
void PlainEncoder<DType>::Put(const T* buffer, int num_values) {
if (num_values > 0) {
PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T)));
}
}
template <>
inline void PlainEncoder<ByteArrayType>::Put(const ByteArray* src, int num_values) {
for (int i = 0; i < num_values; ++i) {
Put(src[i]);
}
}
template <typename ArrayType>
void DirectPutImpl(const arrow::Array& values, arrow::BufferBuilder* sink) {
if (values.type_id() != ArrayType::TypeClass::type_id) {
std::string type_name = ArrayType::TypeClass::type_name();
throw ParquetException("direct put to " + type_name + " from " +
values.type()->ToString() + " not supported");
}
using value_type = typename ArrayType::value_type;
constexpr auto value_size = sizeof(value_type);
auto raw_values = checked_cast<const ArrayType&>(values).raw_values();
if (values.null_count() == 0) {
// no nulls, just dump the data
PARQUET_THROW_NOT_OK(sink->Append(raw_values, values.length() * value_size));
} else {
PARQUET_THROW_NOT_OK(
sink->Reserve((values.length() - values.null_count()) * value_size));
for (int64_t i = 0; i < values.length(); i++) {
if (values.IsValid(i)) {
sink->UnsafeAppend(&raw_values[i], value_size);
}
}
}
}
template <>
void PlainEncoder<Int32Type>::Put(const arrow::Array& values) {
DirectPutImpl<arrow::Int32Array>(values, &sink_);
}
template <>
void PlainEncoder<Int64Type>::Put(const arrow::Array& values) {
DirectPutImpl<arrow::Int64Array>(values, &sink_);
}
template <>
void PlainEncoder<Int96Type>::Put(const arrow::Array& values) {
ParquetException::NYI("direct put to Int96");
}
template <>
void PlainEncoder<FloatType>::Put(const arrow::Array& values) {
DirectPutImpl<arrow::FloatArray>(values, &sink_);
}
template <>
void PlainEncoder<DoubleType>::Put(const arrow::Array& values) {
DirectPutImpl<arrow::DoubleArray>(values, &sink_);
}
template <typename DType>
void PlainEncoder<DType>::Put(const arrow::Array& values) {
ParquetException::NYI("direct put of " + values.type()->ToString());
}
void AssertBinary(const arrow::Array& values) {
if (values.type_id() != arrow::Type::BINARY &&
values.type_id() != arrow::Type::STRING) {
throw ParquetException("Only BinaryArray and subclasses supported");
}
}
template <>
inline void PlainEncoder<ByteArrayType>::Put(const arrow::Array& values) {
AssertBinary(values);
const auto& data = checked_cast<const arrow::BinaryArray&>(values);
const int64_t total_bytes = data.value_offset(data.length()) - data.value_offset(0);
PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes + data.length() * sizeof(uint32_t)));
if (data.null_count() == 0) {
// no nulls, just dump the data
for (int64_t i = 0; i < data.length(); i++) {
auto view = data.GetView(i);
UnsafePutByteArray(view.data(), static_cast<uint32_t>(view.size()));
}
} else {
for (int64_t i = 0; i < data.length(); i++) {
if (data.IsValid(i)) {
auto view = data.GetView(i);
UnsafePutByteArray(view.data(), static_cast<uint32_t>(view.size()));
}
}
}
}
void AssertFixedSizeBinary(const arrow::Array& values, int type_length) {
if (values.type_id() != arrow::Type::FIXED_SIZE_BINARY &&
values.type_id() != arrow::Type::DECIMAL) {
throw ParquetException("Only FixedSizeBinaryArray and subclasses supported");
}
if (checked_cast<const arrow::FixedSizeBinaryType&>(*values.type()).byte_width() !=
type_length) {
throw ParquetException("Size mismatch: " + values.type()->ToString() +
" should have been " + std::to_string(type_length) + " wide");
}
}
template <>
inline void PlainEncoder<FLBAType>::Put(const arrow::Array& values) {
AssertFixedSizeBinary(values, descr_->type_length());
const auto& data = checked_cast<const arrow::FixedSizeBinaryArray&>(values);
if (data.null_count() == 0) {
// no nulls, just dump the data
PARQUET_THROW_NOT_OK(
sink_.Append(data.raw_values(), data.length() * data.byte_width()));
} else {
const int64_t total_bytes =
data.length() * data.byte_width() - data.null_count() * data.byte_width();
PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes));
for (int64_t i = 0; i < data.length(); i++) {
if (data.IsValid(i)) {
sink_.UnsafeAppend(data.Value(i), data.byte_width());
}
}
}
}
template <>
inline void PlainEncoder<FLBAType>::Put(const FixedLenByteArray* src, int num_values) {
if (descr_->type_length() == 0) {
return;
}
for (int i = 0; i < num_values; ++i) {
// Write the result to the output stream
DCHECK(src[i].ptr != nullptr) << "Value ptr cannot be NULL";
PARQUET_THROW_NOT_OK(sink_.Append(src[i].ptr, descr_->type_length()));
}
}
template <>
class PlainEncoder<BooleanType> : public EncoderImpl, virtual public BooleanEncoder {
public:
explicit PlainEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
: EncoderImpl(descr, Encoding::PLAIN, pool),
bits_available_(kInMemoryDefaultCapacity * 8),
bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
sink_(pool),
bit_writer_(bits_buffer_->mutable_data(),
static_cast<int>(bits_buffer_->size())) {}
int64_t EstimatedDataEncodedSize() override;
std::shared_ptr<Buffer> FlushValues() override;
void Put(const bool* src, int num_values) override;
void Put(const std::vector<bool>& src, int num_values) override;
void PutSpaced(const bool* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) override {
PARQUET_ASSIGN_OR_THROW(
auto buffer, arrow::AllocateBuffer(num_values * sizeof(T), this->memory_pool()));
T* data = reinterpret_cast<T*>(buffer->mutable_data());
int num_valid_values = arrow::util::internal::SpacedCompress<T>(
src, num_values, valid_bits, valid_bits_offset, data);
Put(data, num_valid_values);
}
void Put(const arrow::Array& values) override {
if (values.type_id() != arrow::Type::BOOL) {
throw ParquetException("direct put to boolean from " + values.type()->ToString() +
" not supported");
}
const auto& data = checked_cast<const arrow::BooleanArray&>(values);
if (data.null_count() == 0) {
PARQUET_THROW_NOT_OK(sink_.Reserve(BitUtil::BytesForBits(data.length())));
// no nulls, just dump the data
arrow::internal::CopyBitmap(data.data()->GetValues<uint8_t>(1), data.offset(),
data.length(), sink_.mutable_data(), sink_.length());
sink_.UnsafeAdvance(data.length());
} else {
auto n_valid = BitUtil::BytesForBits(data.length() - data.null_count());
PARQUET_THROW_NOT_OK(sink_.Reserve(n_valid));
arrow::internal::FirstTimeBitmapWriter writer(sink_.mutable_data(), sink_.length(),
n_valid);
for (int64_t i = 0; i < data.length(); i++) {
if (data.IsValid(i)) {
if (data.Value(i)) {
writer.Set();
} else {
writer.Clear();
}
writer.Next();
}
}
writer.Finish();
}
}
private:
int bits_available_;
std::shared_ptr<ResizableBuffer> bits_buffer_;
arrow::BufferBuilder sink_;
arrow::BitUtil::BitWriter bit_writer_;
template <typename SequenceType>
void PutImpl(const SequenceType& src, int num_values);
};
template <typename SequenceType>
void PlainEncoder<BooleanType>::PutImpl(const SequenceType& src, int num_values) {
int bit_offset = 0;
if (bits_available_ > 0) {
int bits_to_write = std::min(bits_available_, num_values);
for (int i = 0; i < bits_to_write; i++) {
bit_writer_.PutValue(src[i], 1);
}
bits_available_ -= bits_to_write;
bit_offset = bits_to_write;
if (bits_available_ == 0) {
bit_writer_.Flush();
PARQUET_THROW_NOT_OK(
sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
bit_writer_.Clear();
}
}
int bits_remaining = num_values - bit_offset;
while (bit_offset < num_values) {
bits_available_ = static_cast<int>(bits_buffer_->size()) * 8;
int bits_to_write = std::min(bits_available_, bits_remaining);
for (int i = bit_offset; i < bit_offset + bits_to_write; i++) {
bit_writer_.PutValue(src[i], 1);
}
bit_offset += bits_to_write;
bits_available_ -= bits_to_write;
bits_remaining -= bits_to_write;
if (bits_available_ == 0) {
bit_writer_.Flush();
PARQUET_THROW_NOT_OK(
sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
bit_writer_.Clear();
}
}
}
int64_t PlainEncoder<BooleanType>::EstimatedDataEncodedSize() {
int64_t position = sink_.length();
return position + bit_writer_.bytes_written();
}
std::shared_ptr<Buffer> PlainEncoder<BooleanType>::FlushValues() {
if (bits_available_ > 0) {
bit_writer_.Flush();
PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
bit_writer_.Clear();
bits_available_ = static_cast<int>(bits_buffer_->size()) * 8;
}
std::shared_ptr<Buffer> buffer;
PARQUET_THROW_NOT_OK(sink_.Finish(&buffer));
return buffer;
}
void PlainEncoder<BooleanType>::Put(const bool* src, int num_values) {
PutImpl(src, num_values);
}
void PlainEncoder<BooleanType>::Put(const std::vector<bool>& src, int num_values) {
PutImpl(src, num_values);
}
// ----------------------------------------------------------------------
// DictEncoder<T> implementations
template <typename DType>
struct DictEncoderTraits {
using c_type = typename DType::c_type;
using MemoTableType = arrow::internal::ScalarMemoTable<c_type>;
};
template <>
struct DictEncoderTraits<ByteArrayType> {
using MemoTableType = arrow::internal::BinaryMemoTable<arrow::BinaryBuilder>;
};
template <>
struct DictEncoderTraits<FLBAType> {
using MemoTableType = arrow::internal::BinaryMemoTable<arrow::BinaryBuilder>;
};
// Initially 1024 elements
static constexpr int32_t kInitialHashTableSize = 1 << 10;
/// See the dictionary encoding section of
/// https://github.com/Parquet/parquet-format. The encoding supports
/// streaming encoding. Values are encoded as they are added while the
/// dictionary is being constructed. At any time, the buffered values
/// can be written out with the current dictionary size. More values
/// can then be added to the encoder, including new dictionary
/// entries.
template <typename DType>
class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder<DType> {
using MemoTableType = typename DictEncoderTraits<DType>::MemoTableType;
public:
typedef typename DType::c_type T;
explicit DictEncoderImpl(const ColumnDescriptor* desc, MemoryPool* pool)
: EncoderImpl(desc, Encoding::PLAIN_DICTIONARY, pool),
buffered_indices_(::arrow::stl::allocator<int32_t>(pool)),
dict_encoded_size_(0),
memo_table_(pool, kInitialHashTableSize) {}
~DictEncoderImpl() override { DCHECK(buffered_indices_.empty()); }
int dict_encoded_size() override { return dict_encoded_size_; }
int WriteIndices(uint8_t* buffer, int buffer_len) override {
// Write bit width in first byte
*buffer = static_cast<uint8_t>(bit_width());
++buffer;
--buffer_len;
arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width());
for (int32_t index : buffered_indices_) {
if (!encoder.Put(index)) return -1;
}
encoder.Flush();
ClearIndices();
return 1 + encoder.len();
}
void set_type_length(int type_length) { this->type_length_ = type_length; }
/// Returns a conservative estimate of the number of bytes needed to encode the buffered
/// indices. Used to size the buffer passed to WriteIndices().
int64_t EstimatedDataEncodedSize() override {
// Note: because of the way RleEncoder::CheckBufferFull() is called, we have to
// reserve
// an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used
// but not reserving them would cause the encoder to fail.
return 1 +
arrow::util::RleEncoder::MaxBufferSize(
bit_width(), static_cast<int>(buffered_indices_.size())) +
arrow::util::RleEncoder::MinBufferSize(bit_width());
}
/// The minimum bit width required to encode the currently buffered indices.
int bit_width() const override {
if (ARROW_PREDICT_FALSE(num_entries() == 0)) return 0;
if (ARROW_PREDICT_FALSE(num_entries() == 1)) return 1;
return BitUtil::Log2(num_entries());
}
/// Encode value. Note that this does not actually write any data, just
/// buffers the value's index to be written later.
inline void Put(const T& value);
// Not implemented for other data types
inline void PutByteArray(const void* ptr, int32_t length);
void Put(const T* src, int num_values) override {
for (int32_t i = 0; i < num_values; i++) {
Put(src[i]);
}
}
void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) override {
arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
num_values);
for (int32_t i = 0; i < num_values; i++) {
if (valid_bits_reader.IsSet()) {
Put(src[i]);
}
valid_bits_reader.Next();
}
}
using TypedEncoder<DType>::Put;
void Put(const arrow::Array& values) override;
void PutDictionary(const arrow::Array& values) override;
template <typename ArrowType, typename T = typename ArrowType::c_type>
void PutIndicesTyped(const arrow::Array& data) {
auto values = data.data()->GetValues<T>(1);
size_t buffer_position = buffered_indices_.size();
buffered_indices_.resize(buffer_position +
static_cast<size_t>(data.length() - data.null_count()));
if (data.null_count() > 0) {
arrow::internal::BitmapReader valid_bits_reader(data.null_bitmap_data(),
data.offset(), data.length());
for (int64_t i = 0; i < data.length(); ++i) {
if (valid_bits_reader.IsSet()) {
buffered_indices_[buffer_position++] = static_cast<int32_t>(values[i]);
}
valid_bits_reader.Next();
}
} else {
for (int64_t i = 0; i < data.length(); ++i) {
buffered_indices_[buffer_position++] = static_cast<int32_t>(values[i]);
}
}
}
void PutIndices(const arrow::Array& data) override {
switch (data.type()->id()) {
case arrow::Type::UINT8:
case arrow::Type::INT8:
return PutIndicesTyped<arrow::UInt8Type>(data);
case arrow::Type::UINT16:
case arrow::Type::INT16:
return PutIndicesTyped<arrow::UInt16Type>(data);
case arrow::Type::UINT32:
case arrow::Type::INT32:
return PutIndicesTyped<arrow::UInt32Type>(data);
case arrow::Type::UINT64:
case arrow::Type::INT64:
return PutIndicesTyped<arrow::UInt64Type>(data);
default:
throw ParquetException("Passed non-integer array to PutIndices");
}
}
std::shared_ptr<Buffer> FlushValues() override {
std::shared_ptr<ResizableBuffer> buffer =
AllocateBuffer(this->pool_, EstimatedDataEncodedSize());
int result_size = WriteIndices(buffer->mutable_data(),
static_cast<int>(EstimatedDataEncodedSize()));
PARQUET_THROW_NOT_OK(buffer->Resize(result_size, false));
return std::move(buffer);
}
/// Writes out the encoded dictionary to buffer. buffer must be preallocated to
/// dict_encoded_size() bytes.
void WriteDict(uint8_t* buffer) override;
/// The number of entries in the dictionary.
int num_entries() const override { return memo_table_.size(); }
private:
/// Clears all the indices (but leaves the dictionary).
void ClearIndices() { buffered_indices_.clear(); }
/// Indices that have not yet be written out by WriteIndices().
ArrowPoolVector<int32_t> buffered_indices_;
/// The number of bytes needed to encode the dictionary.
int dict_encoded_size_;
MemoTableType memo_table_;
};
template <typename DType>
void DictEncoderImpl<DType>::WriteDict(uint8_t* buffer) {
// For primitive types, only a memcpy
DCHECK_EQ(static_cast<size_t>(dict_encoded_size_), sizeof(T) * memo_table_.size());
memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast<T*>(buffer));
}
// ByteArray and FLBA already have the dictionary encoded in their data heaps
template <>
void DictEncoderImpl<ByteArrayType>::WriteDict(uint8_t* buffer) {
memo_table_.VisitValues(0, [&buffer](const arrow::util::string_view& v) {
uint32_t len = static_cast<uint32_t>(v.length());
memcpy(buffer, &len, sizeof(len));
buffer += sizeof(len);
memcpy(buffer, v.data(), len);
buffer += len;
});
}
template <>
void DictEncoderImpl<FLBAType>::WriteDict(uint8_t* buffer) {
memo_table_.VisitValues(0, [&](const arrow::util::string_view& v) {
DCHECK_EQ(v.length(), static_cast<size_t>(type_length_));
memcpy(buffer, v.data(), type_length_);
buffer += type_length_;
});
}
template <typename DType>
inline void DictEncoderImpl<DType>::Put(const T& v) {
// Put() implementation for primitive types
auto on_found = [](int32_t memo_index) {};
auto on_not_found = [this](int32_t memo_index) {
dict_encoded_size_ += static_cast<int>(sizeof(T));
};
int32_t memo_index;
PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert(v, on_found, on_not_found, &memo_index));
buffered_indices_.push_back(memo_index);
}
template <typename DType>
inline void DictEncoderImpl<DType>::PutByteArray(const void* ptr, int32_t length) {
DCHECK(false);
}
template <>
inline void DictEncoderImpl<ByteArrayType>::PutByteArray(const void* ptr,
int32_t length) {
static const uint8_t empty[] = {0};
auto on_found = [](int32_t memo_index) {};
auto on_not_found = [&](int32_t memo_index) {
dict_encoded_size_ += static_cast<int>(length + sizeof(uint32_t));
};
DCHECK(ptr != nullptr || length == 0);
ptr = (ptr != nullptr) ? ptr : empty;
int32_t memo_index;
PARQUET_THROW_NOT_OK(
memo_table_.GetOrInsert(ptr, length, on_found, on_not_found, &memo_index));
buffered_indices_.push_back(memo_index);
}
template <>
inline void DictEncoderImpl<ByteArrayType>::Put(const ByteArray& val) {
return PutByteArray(val.ptr, static_cast<int32_t>(val.len));
}
template <>
inline void DictEncoderImpl<FLBAType>::Put(const FixedLenByteArray& v) {
static const uint8_t empty[] = {0};
auto on_found = [](int32_t memo_index) {};
auto on_not_found = [this](int32_t memo_index) { dict_encoded_size_ += type_length_; };
DCHECK(v.ptr != nullptr || type_length_ == 0);
const void* ptr = (v.ptr != nullptr) ? v.ptr : empty;
int32_t memo_index;
PARQUET_THROW_NOT_OK(
memo_table_.GetOrInsert(ptr, type_length_, on_found, on_not_found, &memo_index));
buffered_indices_.push_back(memo_index);
}
template <>
void DictEncoderImpl<Int96Type>::Put(const arrow::Array& values) {
ParquetException::NYI("Direct put to Int96");
}
template <>
void DictEncoderImpl<Int96Type>::PutDictionary(const arrow::Array& values) {
ParquetException::NYI("Direct put to Int96");
}
template <typename DType>
void DictEncoderImpl<DType>::Put(const arrow::Array& values) {
using ArrayType = typename arrow::CTypeTraits<typename DType::c_type>::ArrayType;
const auto& data = checked_cast<const ArrayType&>(values);
if (data.null_count() == 0) {
// no nulls, just dump the data
for (int64_t i = 0; i < data.length(); i++) {
Put(data.Value(i));
}
} else {
for (int64_t i = 0; i < data.length(); i++) {
if (data.IsValid(i)) {
Put(data.Value(i));
}
}
}
}
template <>
void DictEncoderImpl<FLBAType>::Put(const arrow::Array& values) {
AssertFixedSizeBinary(values, type_length_);
const auto& data = checked_cast<const arrow::FixedSizeBinaryArray&>(values);
if (data.null_count() == 0) {
// no nulls, just dump the data
for (int64_t i = 0; i < data.length(); i++) {
Put(FixedLenByteArray(data.Value(i)));
}
} else {
std::vector<uint8_t> empty(type_length_, 0);
for (int64_t i = 0; i < data.length(); i++) {
if (data.IsValid(i)) {
Put(FixedLenByteArray(data.Value(i)));
}
}
}
}
template <>
void DictEncoderImpl<ByteArrayType>::Put(const arrow::Array& values) {
AssertBinary(values);
const auto& data = checked_cast<const arrow::BinaryArray&>(values);
if (data.null_count() == 0) {
// no nulls, just dump the data
for (int64_t i = 0; i < data.length(); i++) {
auto view = data.GetView(i);
PutByteArray(view.data(), static_cast<int32_t>(view.size()));
}
} else {
for (int64_t i = 0; i < data.length(); i++) {
if (data.IsValid(i)) {
auto view = data.GetView(i);
PutByteArray(view.data(), static_cast<int32_t>(view.size()));
}
}
}
}
template <typename DType>
void AssertCanPutDictionary(DictEncoderImpl<DType>* encoder, const arrow::Array& dict) {
if (dict.null_count() > 0) {
throw ParquetException("Inserted dictionary cannot cannot contain nulls");
}
if (encoder->num_entries() > 0) {
throw ParquetException("Can only call PutDictionary on an empty DictEncoder");
}
}
template <typename DType>
void DictEncoderImpl<DType>::PutDictionary(const arrow::Array& values) {
AssertCanPutDictionary(this, values);
using ArrayType = typename arrow::CTypeTraits<typename DType::c_type>::ArrayType;
const auto& data = checked_cast<const ArrayType&>(values);
dict_encoded_size_ += static_cast<int>(sizeof(typename DType::c_type) * data.length());
for (int64_t i = 0; i < data.length(); i++) {
int32_t unused_memo_index;
PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert(data.Value(i), &unused_memo_index));
}
}
template <>
void DictEncoderImpl<FLBAType>::PutDictionary(const arrow::Array& values) {
AssertFixedSizeBinary(values, type_length_);
AssertCanPutDictionary(this, values);
const auto& data = checked_cast<const arrow::FixedSizeBinaryArray&>(values);
dict_encoded_size_ += static_cast<int>(type_length_ * data.length());
for (int64_t i = 0; i < data.length(); i++) {
int32_t unused_memo_index;
PARQUET_THROW_NOT_OK(
memo_table_.GetOrInsert(data.Value(i), type_length_, &unused_memo_index));
}
}
template <>
void DictEncoderImpl<ByteArrayType>::PutDictionary(const arrow::Array& values) {
AssertBinary(values);
AssertCanPutDictionary(this, values);
const auto& data = checked_cast<const arrow::BinaryArray&>(values);
for (int64_t i = 0; i < data.length(); i++) {
auto v = data.GetView(i);
dict_encoded_size_ += static_cast<int>(v.size() + sizeof(uint32_t));
int32_t unused_memo_index;
PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert(v.data(), static_cast<int32_t>(v.size()),
&unused_memo_index));
}
}
// ----------------------------------------------------------------------
// ByteStreamSplitEncoder<T> implementations
template <typename DType>
class ByteStreamSplitEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
public:
using T = typename DType::c_type;
using TypedEncoder<DType>::Put;
explicit ByteStreamSplitEncoder(
const ColumnDescriptor* descr,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
int64_t EstimatedDataEncodedSize() override;
std::shared_ptr<Buffer> FlushValues() override;
void Put(const T* buffer, int num_values) override;
void Put(const arrow::Array& values) override;
void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) override;
protected:
arrow::TypedBufferBuilder<T> values_;
private:
void PutArrowArray(const arrow::Array& values);
};
template <typename DType>
ByteStreamSplitEncoder<DType>::ByteStreamSplitEncoder(const ColumnDescriptor* descr,
::arrow::MemoryPool* pool)
: EncoderImpl(descr, Encoding::BYTE_STREAM_SPLIT, pool), values_{pool} {}
template <typename DType>
int64_t ByteStreamSplitEncoder<DType>::EstimatedDataEncodedSize() {
return values_.length() * sizeof(T);
}
template <typename DType>
std::shared_ptr<Buffer> ByteStreamSplitEncoder<DType>::FlushValues() {
std::shared_ptr<ResizableBuffer> output_buffer =
AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize());
uint8_t* output_buffer_raw = output_buffer->mutable_data();
const size_t num_values = values_.length();
const uint8_t* raw_values = reinterpret_cast<const uint8_t*>(values_.data());
arrow::util::internal::ByteStreamSplitEncode<T>(raw_values, num_values,
output_buffer_raw);
values_.Reset();
return std::move(output_buffer);
}
template <typename DType>
void ByteStreamSplitEncoder<DType>::Put(const T* buffer, int num_values) {
if (num_values > 0) PARQUET_THROW_NOT_OK(values_.Append(buffer, num_values));
}
template <typename DType>
void ByteStreamSplitEncoder<DType>::Put(const ::arrow::Array& values) {
PutArrowArray(values);
}
template <>
void ByteStreamSplitEncoder<FloatType>::PutArrowArray(const ::arrow::Array& values) {
DirectPutImpl<arrow::FloatArray>(values,
reinterpret_cast<arrow::BufferBuilder*>(&values_));
}
template <>
void ByteStreamSplitEncoder<DoubleType>::PutArrowArray(const ::arrow::Array& values) {
DirectPutImpl<arrow::DoubleArray>(values,
reinterpret_cast<arrow::BufferBuilder*>(&values_));
}
template <typename DType>
void ByteStreamSplitEncoder<DType>::PutSpaced(const T* src, int num_values,
const uint8_t* valid_bits,
int64_t valid_bits_offset) {
PARQUET_ASSIGN_OR_THROW(
auto buffer, arrow::AllocateBuffer(num_values * sizeof(T), this->memory_pool()));
T* data = reinterpret_cast<T*>(buffer->mutable_data());
int num_valid_values = arrow::util::internal::SpacedCompress<T>(
src, num_values, valid_bits, valid_bits_offset, data);
Put(data, num_valid_values);
}
class DecoderImpl : virtual public Decoder {
public:
void SetData(int num_values, const uint8_t* data, int len) override {
num_values_ = num_values;
data_ = data;
len_ = len;
}
int values_left() const override { return num_values_; }
Encoding::type encoding() const override { return encoding_; }
protected:
explicit DecoderImpl(const ColumnDescriptor* descr, Encoding::type encoding)
: descr_(descr), encoding_(encoding), num_values_(0), data_(NULLPTR), len_(0) {}
// For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
const ColumnDescriptor* descr_;
const Encoding::type encoding_;
int num_values_;
const uint8_t* data_;
int len_;
int type_length_;
};
template <typename DType>
class PlainDecoder : public DecoderImpl, virtual public TypedDecoder<DType> {
public:
using T = typename DType::c_type;
explicit PlainDecoder(const ColumnDescriptor* descr);
int Decode(T* buffer, int max_values) override;
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<DType>::Accumulator* builder) override;
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<DType>::DictAccumulator* builder) override;
};
template <>
inline int PlainDecoder<Int96Type>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<Int96Type>::Accumulator* builder) {
ParquetException::NYI("DecodeArrow not supported for Int96");
}
template <>
inline int PlainDecoder<Int96Type>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<Int96Type>::DictAccumulator* builder) {
ParquetException::NYI("DecodeArrow not supported for Int96");
}
template <>
inline int PlainDecoder<BooleanType>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<BooleanType>::DictAccumulator* builder) {
ParquetException::NYI("dictionaries of BooleanType");
}
template <typename DType>
int PlainDecoder<DType>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<DType>::Accumulator* builder) {
using value_type = typename DType::c_type;
constexpr int value_size = static_cast<int>(sizeof(value_type));
int values_decoded = num_values - null_count;
if (ARROW_PREDICT_FALSE(len_ < value_size * values_decoded)) {
ParquetException::EofException();
}
PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
[&]() {
builder->UnsafeAppend(arrow::util::SafeLoadAs<value_type>(data_));
data_ += sizeof(value_type);
},
[&]() { builder->UnsafeAppendNull(); });
num_values_ -= values_decoded;
len_ -= sizeof(value_type) * values_decoded;
return values_decoded;
}
template <typename DType>
int PlainDecoder<DType>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<DType>::DictAccumulator* builder) {
using value_type = typename DType::c_type;
constexpr int value_size = static_cast<int>(sizeof(value_type));
int values_decoded = num_values - null_count;
if (ARROW_PREDICT_FALSE(len_ < value_size * values_decoded)) {
ParquetException::EofException();
}
PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
[&]() {
PARQUET_THROW_NOT_OK(builder->Append(arrow::util::SafeLoadAs<value_type>(data_)));
data_ += sizeof(value_type);
},
[&]() { PARQUET_THROW_NOT_OK(builder->AppendNull()); });
num_values_ -= values_decoded;
len_ -= sizeof(value_type) * values_decoded;
return values_decoded;
}
// Decode routine templated on C++ type rather than type enum
template <typename T>
inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values,
int type_length, T* out) {
int64_t bytes_to_decode = num_values * static_cast<int64_t>(sizeof(T));
if (bytes_to_decode > data_size || bytes_to_decode > INT_MAX) {
ParquetException::EofException();
}
// If bytes_to_decode == 0, data could be null
if (bytes_to_decode > 0) {
memcpy(out, data, bytes_to_decode);
}
return static_cast<int>(bytes_to_decode);
}
template <typename DType>
PlainDecoder<DType>::PlainDecoder(const ColumnDescriptor* descr)
: DecoderImpl(descr, Encoding::PLAIN) {
if (descr_ && descr_->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
type_length_ = descr_->type_length();
} else {
type_length_ = -1;
}
}
// Template specialization for BYTE_ARRAY. The written values do not own their
// own data.
static inline int64_t ReadByteArray(const uint8_t* data, int64_t data_size,
ByteArray* out) {
if (ARROW_PREDICT_FALSE(data_size < 4)) {
ParquetException::EofException();
}
const int32_t len = arrow::util::SafeLoadAs<int32_t>(data);
if (len < 0) {
throw ParquetException("Invalid BYTE_ARRAY value");
}
const int64_t consumed_length = static_cast<int64_t>(len) + 4;
if (ARROW_PREDICT_FALSE(data_size < consumed_length)) {
ParquetException::EofException();
}
*out = ByteArray{static_cast<uint32_t>(len), data + 4};
return consumed_length;
}
template <>
inline int DecodePlain<ByteArray>(const uint8_t* data, int64_t data_size, int num_values,
int type_length, ByteArray* out) {
int bytes_decoded = 0;
for (int i = 0; i < num_values; ++i) {
const auto increment = ReadByteArray(data, data_size, out + i);
if (ARROW_PREDICT_FALSE(increment > INT_MAX - bytes_decoded)) {
throw ParquetException("BYTE_ARRAY chunk too large");
}
data += increment;
data_size -= increment;
bytes_decoded += static_cast<int>(increment);
}
return bytes_decoded;
}
// Template specialization for FIXED_LEN_BYTE_ARRAY. The written values do not
// own their own data.
template <>
inline int DecodePlain<FixedLenByteArray>(const uint8_t* data, int64_t data_size,
int num_values, int type_length,
FixedLenByteArray* out) {
int64_t bytes_to_decode = static_cast<int64_t>(type_length) * num_values;
if (bytes_to_decode > data_size || bytes_to_decode > INT_MAX) {
ParquetException::EofException();
}
for (int i = 0; i < num_values; ++i) {
out[i].ptr = data;
data += type_length;
data_size -= type_length;
}
return static_cast<int>(bytes_to_decode);
}
template <typename DType>
int PlainDecoder<DType>::Decode(T* buffer, int max_values) {
max_values = std::min(max_values, num_values_);
int bytes_consumed = DecodePlain<T>(data_, len_, max_values, type_length_, buffer);
data_ += bytes_consumed;
len_ -= bytes_consumed;
num_values_ -= max_values;
return max_values;
}
class PlainBooleanDecoder : public DecoderImpl,
virtual public TypedDecoder<BooleanType>,
virtual public BooleanDecoder {
public:
explicit PlainBooleanDecoder(const ColumnDescriptor* descr);
void SetData(int num_values, const uint8_t* data, int len) override;
// Two flavors of bool decoding
int Decode(uint8_t* buffer, int max_values) override;
int Decode(bool* buffer, int max_values) override;
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<BooleanType>::Accumulator* out) override;
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<BooleanType>::DictAccumulator* out) override;
private:
std::unique_ptr<arrow::BitUtil::BitReader> bit_reader_;
};
PlainBooleanDecoder::PlainBooleanDecoder(const ColumnDescriptor* descr)
: DecoderImpl(descr, Encoding::PLAIN) {}
void PlainBooleanDecoder::SetData(int num_values, const uint8_t* data, int len) {
num_values_ = num_values;
bit_reader_.reset(new BitUtil::BitReader(data, len));
}
int PlainBooleanDecoder::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<BooleanType>::Accumulator* builder) {
int values_decoded = num_values - null_count;
if (ARROW_PREDICT_FALSE(num_values_ < values_decoded)) {
ParquetException::EofException();
}
PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
[&]() {
bool value;
ARROW_IGNORE_EXPR(bit_reader_->GetValue(1, &value));
builder->UnsafeAppend(value);
},
[&]() { builder->UnsafeAppendNull(); });
num_values_ -= values_decoded;
return values_decoded;
}
inline int PlainBooleanDecoder::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<BooleanType>::DictAccumulator* builder) {
ParquetException::NYI("dictionaries of BooleanType");
}
int PlainBooleanDecoder::Decode(uint8_t* buffer, int max_values) {
max_values = std::min(max_values, num_values_);
bool val;
arrow::internal::BitmapWriter bit_writer(buffer, 0, max_values);
for (int i = 0; i < max_values; ++i) {
if (!bit_reader_->GetValue(1, &val)) {
ParquetException::EofException();
}
if (val) {
bit_writer.Set();
}
bit_writer.Next();
}
bit_writer.Finish();
num_values_ -= max_values;
return max_values;
}
int PlainBooleanDecoder::Decode(bool* buffer, int max_values) {
max_values = std::min(max_values, num_values_);
if (bit_reader_->GetBatch(1, buffer, max_values) != max_values) {
ParquetException::EofException();
}
num_values_ -= max_values;
return max_values;
}
struct ArrowBinaryHelper {
explicit ArrowBinaryHelper(typename EncodingTraits<ByteArrayType>::Accumulator* out) {
this->out = out;
this->builder = out->builder.get();
this->chunk_space_remaining =
::arrow::kBinaryMemoryLimit - this->builder->value_data_length();
}
Status PushChunk() {
std::shared_ptr<::arrow::Array> result;
RETURN_NOT_OK(builder->Finish(&result));
out->chunks.push_back(result);
chunk_space_remaining = ::arrow::kBinaryMemoryLimit;
return Status::OK();
}
bool CanFit(int64_t length) const { return length <= chunk_space_remaining; }
void UnsafeAppend(const uint8_t* data, int32_t length) {
chunk_space_remaining -= length;
builder->UnsafeAppend(data, length);
}
void UnsafeAppendNull() { builder->UnsafeAppendNull(); }
Status Append(const uint8_t* data, int32_t length) {
chunk_space_remaining -= length;
return builder->Append(data, length);
}
Status AppendNull() { return builder->AppendNull(); }
typename EncodingTraits<ByteArrayType>::Accumulator* out;
arrow::BinaryBuilder* builder;
int64_t chunk_space_remaining;
};
template <>
inline int PlainDecoder<ByteArrayType>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<ByteArrayType>::Accumulator* builder) {
ParquetException::NYI();
}
template <>
inline int PlainDecoder<ByteArrayType>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<ByteArrayType>::DictAccumulator* builder) {
ParquetException::NYI();
}
template <>
inline int PlainDecoder<FLBAType>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<FLBAType>::Accumulator* builder) {
int values_decoded = num_values - null_count;
if (ARROW_PREDICT_FALSE(len_ < descr_->type_length() * values_decoded)) {
ParquetException::EofException();
}
PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
[&]() {
builder->UnsafeAppend(data_);
data_ += descr_->type_length();
},
[&]() { builder->UnsafeAppendNull(); });
num_values_ -= values_decoded;
len_ -= descr_->type_length() * values_decoded;
return values_decoded;
}
template <>
inline int PlainDecoder<FLBAType>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<FLBAType>::DictAccumulator* builder) {
int values_decoded = num_values - null_count;
if (ARROW_PREDICT_FALSE(len_ < descr_->type_length() * values_decoded)) {
ParquetException::EofException();
}
PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
[&]() {
PARQUET_THROW_NOT_OK(builder->Append(data_));
data_ += descr_->type_length();
},
[&]() { PARQUET_THROW_NOT_OK(builder->AppendNull()); });
num_values_ -= values_decoded;
len_ -= descr_->type_length() * values_decoded;
return values_decoded;
}
class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType>,
virtual public ByteArrayDecoder {
public:
using Base = PlainDecoder<ByteArrayType>;
using Base::DecodeSpaced;
using Base::PlainDecoder;
// ----------------------------------------------------------------------
// Dictionary read paths
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
arrow::BinaryDictionary32Builder* builder) override {
int result = 0;
PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits,
valid_bits_offset, builder, &result));
return result;
}
// ----------------------------------------------------------------------
// Optimized dense binary read paths
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<ByteArrayType>::Accumulator* out) override {
int result = 0;
PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits,
valid_bits_offset, out, &result));
return result;
}
private:
Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<ByteArrayType>::Accumulator* out,
int* out_values_decoded) {
ArrowBinaryHelper helper(out);
int values_decoded = 0;
RETURN_NOT_OK(helper.builder->Reserve(num_values));
RETURN_NOT_OK(helper.builder->ReserveData(
std::min<int64_t>(len_, helper.chunk_space_remaining)));
int i = 0;
RETURN_NOT_OK(VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
[&]() {
if (ARROW_PREDICT_FALSE(len_ < 4)) {
ParquetException::EofException();
}
auto value_len = arrow::util::SafeLoadAs<int32_t>(data_);
if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4)) {
return Status::Invalid("Invalid or corrupted value_len '", value_len, "'");
}
auto increment = value_len + 4;
if (ARROW_PREDICT_FALSE(len_ < increment)) {
ParquetException::EofException();
}
if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) {
// This element would exceed the capacity of a chunk
RETURN_NOT_OK(helper.PushChunk());
RETURN_NOT_OK(helper.builder->Reserve(num_values - i));
RETURN_NOT_OK(helper.builder->ReserveData(
std::min<int64_t>(len_, helper.chunk_space_remaining)));
}
helper.UnsafeAppend(data_ + 4, value_len);
data_ += increment;
len_ -= increment;
++values_decoded;
++i;
return Status::OK();
},
[&]() {
helper.UnsafeAppendNull();
++i;
return Status::OK();
}));
num_values_ -= values_decoded;
*out_values_decoded = values_decoded;
return Status::OK();
}
template <typename BuilderType>
Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset, BuilderType* builder,
int* out_values_decoded) {
RETURN_NOT_OK(builder->Reserve(num_values));
int values_decoded = 0;
RETURN_NOT_OK(VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
[&]() {
if (ARROW_PREDICT_FALSE(len_ < 4)) {
ParquetException::EofException();
}
auto value_len = arrow::util::SafeLoadAs<int32_t>(data_);
if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4)) {
return Status::Invalid("Invalid or corrupted value_len '", value_len, "'");
}
auto increment = value_len + 4;
if (ARROW_PREDICT_FALSE(len_ < increment)) {
ParquetException::EofException();
}
RETURN_NOT_OK(builder->Append(data_ + 4, value_len));
data_ += increment;
len_ -= increment;
++values_decoded;
return Status::OK();
},
[&]() { return builder->AppendNull(); }));
num_values_ -= values_decoded;
*out_values_decoded = values_decoded;
return Status::OK();
}
};
class PlainFLBADecoder : public PlainDecoder<FLBAType>, virtual public FLBADecoder {
public:
using Base = PlainDecoder<FLBAType>;
using Base::PlainDecoder;
};
// ----------------------------------------------------------------------
// Dictionary encoding and decoding
template <typename Type>
class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder<Type> {
public:
typedef typename Type::c_type T;
// Initializes the dictionary with values from 'dictionary'. The data in
// dictionary is not guaranteed to persist in memory after this call so the
// dictionary decoder needs to copy the data out if necessary.
explicit DictDecoderImpl(const ColumnDescriptor* descr,
MemoryPool* pool = arrow::default_memory_pool())
: DecoderImpl(descr, Encoding::RLE_DICTIONARY),
dictionary_(AllocateBuffer(pool, 0)),
dictionary_length_(0),
byte_array_data_(AllocateBuffer(pool, 0)),
byte_array_offsets_(AllocateBuffer(pool, 0)),
indices_scratch_space_(AllocateBuffer(pool, 0)) {}
// Perform type-specific initiatialization
void SetDict(TypedDecoder<Type>* dictionary) override;
void SetData(int num_values, const uint8_t* data, int len) override {
num_values_ = num_values;
if (len == 0) {
// Initialize dummy decoder to avoid crashes later on
idx_decoder_ = arrow::util::RleDecoder(data, len, /*bit_width=*/1);
return;
}
uint8_t bit_width = *data;
if (ARROW_PREDICT_FALSE(bit_width >= 64)) {
throw ParquetException("Invalid or corrupted bit_width");
}
idx_decoder_ = arrow::util::RleDecoder(++data, --len, bit_width);
}
int Decode(T* buffer, int num_values) override {
num_values = std::min(num_values, num_values_);
int decoded_values =
idx_decoder_.GetBatchWithDict(reinterpret_cast<const T*>(dictionary_->data()),
dictionary_length_, buffer, num_values);
if (decoded_values != num_values) {
ParquetException::EofException();
}
num_values_ -= num_values;
return num_values;
}
int DecodeSpaced(T* buffer, int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset) override {
num_values = std::min(num_values, num_values_);
if (num_values != idx_decoder_.GetBatchWithDictSpaced(
reinterpret_cast<const T*>(dictionary_->data()),
dictionary_length_, buffer, num_values, null_count, valid_bits,
valid_bits_offset)) {
ParquetException::EofException();
}
num_values_ -= num_values;
return num_values;
}
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<Type>::Accumulator* out) override;
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<Type>::DictAccumulator* out) override;
void InsertDictionary(arrow::ArrayBuilder* builder) override;
int DecodeIndicesSpaced(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
arrow::ArrayBuilder* builder) override {
if (num_values > 0) {
// TODO(wesm): Refactor to batch reads for improved memory use. It is not
// trivial because the null_count is relative to the entire bitmap
PARQUET_THROW_NOT_OK(indices_scratch_space_->TypedResize<int32_t>(
num_values, /*shrink_to_fit=*/false));
}
auto indices_buffer =
reinterpret_cast<int32_t*>(indices_scratch_space_->mutable_data());
if (num_values != idx_decoder_.GetBatchSpaced(num_values, null_count, valid_bits,
valid_bits_offset, indices_buffer)) {
ParquetException::EofException();
}
/// XXX(wesm): Cannot append "valid bits" directly to the builder
std::vector<uint8_t> valid_bytes(num_values);
arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
for (int64_t i = 0; i < num_values; ++i) {
valid_bytes[i] = static_cast<uint8_t>(bit_reader.IsSet());
bit_reader.Next();
}
auto binary_builder = checked_cast<arrow::BinaryDictionary32Builder*>(builder);
PARQUET_THROW_NOT_OK(
binary_builder->AppendIndices(indices_buffer, num_values, valid_bytes.data()));
num_values_ -= num_values - null_count;
return num_values - null_count;
}
int DecodeIndices(int num_values, arrow::ArrayBuilder* builder) override {
num_values = std::min(num_values, num_values_);
if (num_values > 0) {
// TODO(wesm): Refactor to batch reads for improved memory use. This is
// relatively simple here because we don't have to do any bookkeeping of
// nulls
PARQUET_THROW_NOT_OK(indices_scratch_space_->TypedResize<int32_t>(
num_values, /*shrink_to_fit=*/false));
}
auto indices_buffer =
reinterpret_cast<int32_t*>(indices_scratch_space_->mutable_data());
if (num_values != idx_decoder_.GetBatch(indices_buffer, num_values)) {
ParquetException::EofException();
}
auto binary_builder = checked_cast<arrow::BinaryDictionary32Builder*>(builder);
PARQUET_THROW_NOT_OK(binary_builder->AppendIndices(indices_buffer, num_values));
num_values_ -= num_values;
return num_values;
}
protected:
Status IndexInBounds(int32_t index) {
if (ARROW_PREDICT_TRUE(0 <= index && index < dictionary_length_)) {
return Status::OK();
}
return Status::Invalid("Index not in dictionary bounds");
}
inline void DecodeDict(TypedDecoder<Type>* dictionary) {
dictionary_length_ = static_cast<int32_t>(dictionary->values_left());
PARQUET_THROW_NOT_OK(dictionary_->Resize(dictionary_length_ * sizeof(T),
/*shrink_to_fit=*/false));
dictionary->Decode(reinterpret_cast<T*>(dictionary_->mutable_data()),
dictionary_length_);
}
// Only one is set.
std::shared_ptr<ResizableBuffer> dictionary_;
int32_t dictionary_length_;
// Data that contains the byte array data (byte_array_dictionary_ just has the
// pointers).
std::shared_ptr<ResizableBuffer> byte_array_data_;
// Arrow-style byte offsets for each dictionary value. We maintain two
// representations of the dictionary, one as ByteArray* for non-Arrow
// consumers and this one for Arrow consumers. Since dictionaries are
// generally pretty small to begin with this doesn't mean too much extra
// memory use in most cases
std::shared_ptr<ResizableBuffer> byte_array_offsets_;
// Reusable buffer for decoding dictionary indices to be appended to a
// BinaryDictionary32Builder
std::shared_ptr<ResizableBuffer> indices_scratch_space_;
arrow::util::RleDecoder idx_decoder_;
};
template <typename Type>
void DictDecoderImpl<Type>::SetDict(TypedDecoder<Type>* dictionary) {
DecodeDict(dictionary);
}
template <>
void DictDecoderImpl<BooleanType>::SetDict(TypedDecoder<BooleanType>* dictionary) {
ParquetException::NYI("Dictionary encoding is not implemented for boolean values");
}
template <>
void DictDecoderImpl<ByteArrayType>::SetDict(TypedDecoder<ByteArrayType>* dictionary) {
DecodeDict(dictionary);
auto dict_values = reinterpret_cast<ByteArray*>(dictionary_->mutable_data());
int total_size = 0;
for (int i = 0; i < dictionary_length_; ++i) {
total_size += dict_values[i].len;
}
PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size,
/*shrink_to_fit=*/false));
PARQUET_THROW_NOT_OK(
byte_array_offsets_->Resize((dictionary_length_ + 1) * sizeof(int32_t),
/*shrink_to_fit=*/false));
int32_t offset = 0;
uint8_t* bytes_data = byte_array_data_->mutable_data();
int32_t* bytes_offsets =
reinterpret_cast<int32_t*>(byte_array_offsets_->mutable_data());
for (int i = 0; i < dictionary_length_; ++i) {
memcpy(bytes_data + offset, dict_values[i].ptr, dict_values[i].len);
bytes_offsets[i] = offset;
dict_values[i].ptr = bytes_data + offset;
offset += dict_values[i].len;
}
bytes_offsets[dictionary_length_] = offset;
}
template <>
inline void DictDecoderImpl<FLBAType>::SetDict(TypedDecoder<FLBAType>* dictionary) {
DecodeDict(dictionary);
auto dict_values = reinterpret_cast<FLBA*>(dictionary_->mutable_data());
int fixed_len = descr_->type_length();
int total_size = dictionary_length_ * fixed_len;
PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size,
/*shrink_to_fit=*/false));
uint8_t* bytes_data = byte_array_data_->mutable_data();
for (int32_t i = 0, offset = 0; i < dictionary_length_; ++i, offset += fixed_len) {
memcpy(bytes_data + offset, dict_values[i].ptr, fixed_len);
dict_values[i].ptr = bytes_data + offset;
}
}
template <>
inline int DictDecoderImpl<Int96Type>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<Int96Type>::Accumulator* builder) {
ParquetException::NYI("DecodeArrow to Int96Type");
}
template <>
inline int DictDecoderImpl<Int96Type>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<Int96Type>::DictAccumulator* builder) {
ParquetException::NYI("DecodeArrow to Int96Type");
}
template <>
inline int DictDecoderImpl<ByteArrayType>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<ByteArrayType>::Accumulator* builder) {
ParquetException::NYI("DecodeArrow implemented elsewhere");
}
template <>
inline int DictDecoderImpl<ByteArrayType>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<ByteArrayType>::DictAccumulator* builder) {
ParquetException::NYI("DecodeArrow implemented elsewhere");
}
template <typename DType>
int DictDecoderImpl<DType>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<DType>::DictAccumulator* builder) {
PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
auto dict_values = reinterpret_cast<const typename DType::c_type*>(dictionary_->data());
VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
[&]() {
int32_t index;
if (ARROW_PREDICT_FALSE(!idx_decoder_.Get(&index))) {
throw ParquetException("");
}
PARQUET_THROW_NOT_OK(IndexInBounds(index));
PARQUET_THROW_NOT_OK(builder->Append(dict_values[index]));
},
[&]() { PARQUET_THROW_NOT_OK(builder->AppendNull()); });
return num_values - null_count;
}
template <>
int DictDecoderImpl<BooleanType>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<BooleanType>::DictAccumulator* builder) {
ParquetException::NYI("No dictionary encoding for BooleanType");
}
template <>
inline int DictDecoderImpl<FLBAType>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<FLBAType>::Accumulator* builder) {
if (builder->byte_width() != descr_->type_length()) {
throw ParquetException("Byte width mismatch: builder was " +
std::to_string(builder->byte_width()) + " but decoder was " +
std::to_string(descr_->type_length()));
}
PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
auto dict_values = reinterpret_cast<const FLBA*>(dictionary_->data());
VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
[&]() {
int32_t index;
if (ARROW_PREDICT_FALSE(!idx_decoder_.Get(&index))) {
throw ParquetException("");
}
PARQUET_THROW_NOT_OK(IndexInBounds(index));
builder->UnsafeAppend(dict_values[index].ptr);
},
[&]() { builder->UnsafeAppendNull(); });
return num_values - null_count;
}
template <>
int DictDecoderImpl<FLBAType>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<FLBAType>::DictAccumulator* builder) {
auto value_type =
checked_cast<const arrow::DictionaryType&>(*builder->type()).value_type();
auto byte_width =
checked_cast<const arrow::FixedSizeBinaryType&>(*value_type).byte_width();
if (byte_width != descr_->type_length()) {
throw ParquetException("Byte width mismatch: builder was " +
std::to_string(byte_width) + " but decoder was " +
std::to_string(descr_->type_length()));
}
PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
auto dict_values = reinterpret_cast<const FLBA*>(dictionary_->data());
VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
[&]() {
int32_t index;
if (ARROW_PREDICT_FALSE(!idx_decoder_.Get(&index))) {
throw ParquetException("");
}
PARQUET_THROW_NOT_OK(IndexInBounds(index));
PARQUET_THROW_NOT_OK(builder->Append(dict_values[index].ptr));
},
[&]() { PARQUET_THROW_NOT_OK(builder->AppendNull()); });
return num_values - null_count;
}
template <typename Type>
int DictDecoderImpl<Type>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<Type>::Accumulator* builder) {
PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
using value_type = typename Type::c_type;
auto dict_values = reinterpret_cast<const value_type*>(dictionary_->data());
VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
[&]() {
int32_t index;
if (ARROW_PREDICT_FALSE(!idx_decoder_.Get(&index))) {
throw ParquetException("");
}
PARQUET_THROW_NOT_OK(IndexInBounds(index));
builder->UnsafeAppend(dict_values[index]);
},
[&]() { builder->UnsafeAppendNull(); });
return num_values - null_count;
}
template <typename Type>
void DictDecoderImpl<Type>::InsertDictionary(arrow::ArrayBuilder* builder) {
ParquetException::NYI("InsertDictionary only implemented for BYTE_ARRAY types");
}
template <>
void DictDecoderImpl<ByteArrayType>::InsertDictionary(arrow::ArrayBuilder* builder) {
auto binary_builder = checked_cast<arrow::BinaryDictionary32Builder*>(builder);
// Make a BinaryArray referencing the internal dictionary data
auto arr = std::make_shared<arrow::BinaryArray>(dictionary_length_, byte_array_offsets_,
byte_array_data_);
PARQUET_THROW_NOT_OK(binary_builder->InsertMemoValues(*arr));
}
class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
virtual public ByteArrayDecoder {
public:
using BASE = DictDecoderImpl<ByteArrayType>;
using BASE::DictDecoderImpl;
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
arrow::BinaryDictionary32Builder* builder) override {
int result = 0;
if (null_count == 0) {
PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, builder, &result));
} else {
PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits,
valid_bits_offset, builder, &result));
}
return result;
}
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<ByteArrayType>::Accumulator* out) override {
int result = 0;
if (null_count == 0) {
PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull(num_values, out, &result));
} else {
PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits,
valid_bits_offset, out, &result));
}
return result;
}
private:
Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<ByteArrayType>::Accumulator* out,
int* out_num_values) {
constexpr int32_t kBufferSize = 1024;
int32_t indices[kBufferSize];
ArrowBinaryHelper helper(out);
arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
int values_decoded = 0;
int num_appended = 0;
while (num_appended < num_values) {
bool is_valid = bit_reader.IsSet();
bit_reader.Next();
if (is_valid) {
int32_t batch_size =
std::min<int32_t>(kBufferSize, num_values - num_appended - null_count);
int num_indices = idx_decoder_.GetBatch(indices, batch_size);
if (ARROW_PREDICT_FALSE(num_indices < 1)) {
return Status::Invalid("Invalid number of indices '", num_indices, "'");
}
int i = 0;
while (true) {
// Consume all indices
if (is_valid) {
auto idx = indices[i];
RETURN_NOT_OK(IndexInBounds(idx));
const auto& val = dict_values[idx];
if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
RETURN_NOT_OK(helper.PushChunk());
}
RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
++i;
++values_decoded;
} else {
RETURN_NOT_OK(helper.AppendNull());
--null_count;
}
++num_appended;
if (i == num_indices) {
// Do not advance the bit_reader if we have fulfilled the decode
// request
break;
}
is_valid = bit_reader.IsSet();
bit_reader.Next();
}
} else {
RETURN_NOT_OK(helper.AppendNull());
--null_count;
++num_appended;
}
}
*out_num_values = values_decoded;
return Status::OK();
}
Status DecodeArrowDenseNonNull(int num_values,
typename EncodingTraits<ByteArrayType>::Accumulator* out,
int* out_num_values) {
constexpr int32_t kBufferSize = 2048;
int32_t indices[kBufferSize];
int values_decoded = 0;
ArrowBinaryHelper helper(out);
auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
while (values_decoded < num_values) {
int32_t batch_size = std::min<int32_t>(kBufferSize, num_values - values_decoded);
int num_indices = idx_decoder_.GetBatch(indices, batch_size);
if (num_indices == 0) ParquetException::EofException();
for (int i = 0; i < num_indices; ++i) {
auto idx = indices[i];
RETURN_NOT_OK(IndexInBounds(idx));
const auto& val = dict_values[idx];
if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
RETURN_NOT_OK(helper.PushChunk());
}
RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
}
values_decoded += num_indices;
}
*out_num_values = values_decoded;
return Status::OK();
}
template <typename BuilderType>
Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset, BuilderType* builder,
int* out_num_values) {
constexpr int32_t kBufferSize = 1024;
int32_t indices[kBufferSize];
RETURN_NOT_OK(builder->Reserve(num_values));
arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
int values_decoded = 0;
int num_appended = 0;
while (num_appended < num_values) {
bool is_valid = bit_reader.IsSet();
bit_reader.Next();
if (is_valid) {
int32_t batch_size =
std::min<int32_t>(kBufferSize, num_values - num_appended - null_count);
int num_indices = idx_decoder_.GetBatch(indices, batch_size);
int i = 0;
while (true) {
// Consume all indices
if (is_valid) {
auto idx = indices[i];
RETURN_NOT_OK(IndexInBounds(idx));
const auto& val = dict_values[idx];
RETURN_NOT_OK(builder->Append(val.ptr, val.len));
++i;
++values_decoded;
} else {
RETURN_NOT_OK(builder->AppendNull());
--null_count;
}
++num_appended;
if (i == num_indices) {
// Do not advance the bit_reader if we have fulfilled the decode
// request
break;
}
is_valid = bit_reader.IsSet();
bit_reader.Next();
}
} else {
RETURN_NOT_OK(builder->AppendNull());
--null_count;
++num_appended;
}
}
*out_num_values = values_decoded;
return Status::OK();
}
template <typename BuilderType>
Status DecodeArrowNonNull(int num_values, BuilderType* builder, int* out_num_values) {
constexpr int32_t kBufferSize = 2048;
int32_t indices[kBufferSize];
RETURN_NOT_OK(builder->Reserve(num_values));
auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
int values_decoded = 0;
while (values_decoded < num_values) {
int32_t batch_size = std::min<int32_t>(kBufferSize, num_values - values_decoded);
int num_indices = idx_decoder_.GetBatch(indices, batch_size);
if (num_indices == 0) ParquetException::EofException();
for (int i = 0; i < num_indices; ++i) {
auto idx = indices[i];
RETURN_NOT_OK(IndexInBounds(idx));
const auto& val = dict_values[idx];
RETURN_NOT_OK(builder->Append(val.ptr, val.len));
}
values_decoded += num_indices;
}
*out_num_values = values_decoded;
return Status::OK();
}
};
// ----------------------------------------------------------------------
// DeltaBitPackDecoder
template <typename DType>
class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DType> {
public:
typedef typename DType::c_type T;
explicit DeltaBitPackDecoder(const ColumnDescriptor* descr,
MemoryPool* pool = arrow::default_memory_pool())
: DecoderImpl(descr, Encoding::DELTA_BINARY_PACKED), pool_(pool) {
if (DType::type_num != Type::INT32 && DType::type_num != Type::INT64) {
throw ParquetException("Delta bit pack encoding should only be for integer data.");
}
}
void SetData(int num_values, const uint8_t* data, int len) override {
this->num_values_ = num_values;
decoder_ = arrow::BitUtil::BitReader(data, len);
values_current_block_ = 0;
values_current_mini_block_ = 0;
}
int Decode(T* buffer, int max_values) override {
return GetInternal(buffer, max_values);
}
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<DType>::Accumulator* out) override {
if (null_count != 0) {
ParquetException::NYI("Delta bit pack DecodeArrow with null slots");
}
std::vector<T> values(num_values);
GetInternal(values.data(), num_values);
PARQUET_THROW_NOT_OK(out->AppendValues(values));
return num_values;
}
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<DType>::DictAccumulator* out) override {
if (null_count != 0) {
ParquetException::NYI("Delta bit pack DecodeArrow with null slots");
}
std::vector<T> values(num_values);
GetInternal(values.data(), num_values);
PARQUET_THROW_NOT_OK(out->Reserve(num_values));
for (T value : values) {
PARQUET_THROW_NOT_OK(out->Append(value));
}
return num_values;
}
private:
void InitBlock() {
// The number of values per block.
uint32_t block_size;
if (!decoder_.GetVlqInt(&block_size)) ParquetException::EofException();
if (!decoder_.GetVlqInt(&num_mini_blocks_)) ParquetException::EofException();
if (!decoder_.GetVlqInt(&values_current_block_)) {
ParquetException::EofException();
}
if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException();
delta_bit_widths_ = AllocateBuffer(pool_, num_mini_blocks_);
uint8_t* bit_width_data = delta_bit_widths_->mutable_data();
if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException();
for (uint32_t i = 0; i < num_mini_blocks_; ++i) {
if (!decoder_.GetAligned<uint8_t>(1, bit_width_data + i)) {
ParquetException::EofException();
}
}
values_per_mini_block_ = block_size / num_mini_blocks_;
mini_block_idx_ = 0;
delta_bit_width_ = bit_width_data[0];
values_current_mini_block_ = values_per_mini_block_;
}
template <typename T>
int GetInternal(T* buffer, int max_values) {
max_values = std::min(max_values, this->num_values_);
const uint8_t* bit_width_data = delta_bit_widths_->data();
for (int i = 0; i < max_values; ++i) {
if (ARROW_PREDICT_FALSE(values_current_mini_block_ == 0)) {
++mini_block_idx_;
if (mini_block_idx_ < static_cast<size_t>(delta_bit_widths_->size())) {
delta_bit_width_ = bit_width_data[mini_block_idx_];
values_current_mini_block_ = values_per_mini_block_;
} else {
InitBlock();
buffer[i] = last_value_;
continue;
}
}
// TODO: the key to this algorithm is to decode the entire miniblock at once.
int64_t delta;
if (!decoder_.GetValue(delta_bit_width_, &delta)) ParquetException::EofException();
delta += min_delta_;
last_value_ += static_cast<int32_t>(delta);
buffer[i] = last_value_;
--values_current_mini_block_;
}
this->num_values_ -= max_values;
return max_values;
}
MemoryPool* pool_;
arrow::BitUtil::BitReader decoder_;
uint32_t values_current_block_;
uint32_t num_mini_blocks_;
uint64_t values_per_mini_block_;
uint64_t values_current_mini_block_;
int32_t min_delta_;
size_t mini_block_idx_;
std::shared_ptr<ResizableBuffer> delta_bit_widths_;
int delta_bit_width_;
int32_t last_value_;
};
// ----------------------------------------------------------------------
// DELTA_LENGTH_BYTE_ARRAY
class DeltaLengthByteArrayDecoder : public DecoderImpl,
virtual public TypedDecoder<ByteArrayType> {
public:
explicit DeltaLengthByteArrayDecoder(const ColumnDescriptor* descr,
MemoryPool* pool = arrow::default_memory_pool())
: DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY),
len_decoder_(nullptr, pool),
pool_(pool) {}
void SetData(int num_values, const uint8_t* data, int len) override {
num_values_ = num_values;
if (len == 0) return;
int total_lengths_len = arrow::util::SafeLoadAs<int32_t>(data);
data += 4;
this->len_decoder_.SetData(num_values, data, total_lengths_len);
data_ = data + total_lengths_len;
this->len_ = len - 4 - total_lengths_len;
}
int Decode(ByteArray* buffer, int max_values) override {
using VectorT = ArrowPoolVector<int>;
max_values = std::min(max_values, num_values_);
VectorT lengths(max_values, 0, ::arrow::stl::allocator<int>(pool_));
len_decoder_.Decode(lengths.data(), max_values);
for (int i = 0; i < max_values; ++i) {
buffer[i].len = lengths[i];
buffer[i].ptr = data_;
this->data_ += lengths[i];
this->len_ -= lengths[i];
}
this->num_values_ -= max_values;
return max_values;
}
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<ByteArrayType>::Accumulator* out) override {
ParquetException::NYI("DecodeArrow for DeltaLengthByteArrayDecoder");
}
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<ByteArrayType>::DictAccumulator* out) override {
ParquetException::NYI("DecodeArrow for DeltaLengthByteArrayDecoder");
}
private:
DeltaBitPackDecoder<Int32Type> len_decoder_;
::arrow::MemoryPool* pool_;
};
// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY
class DeltaByteArrayDecoder : public DecoderImpl,
virtual public TypedDecoder<ByteArrayType> {
public:
explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
MemoryPool* pool = arrow::default_memory_pool())
: DecoderImpl(descr, Encoding::DELTA_BYTE_ARRAY),
prefix_len_decoder_(nullptr, pool),
suffix_decoder_(nullptr, pool),
last_value_(0, nullptr) {}
virtual void SetData(int num_values, const uint8_t* data, int len) {
num_values_ = num_values;
if (len == 0) return;
int prefix_len_length = arrow::util::SafeLoadAs<int32_t>(data);
data += 4;
len -= 4;
prefix_len_decoder_.SetData(num_values, data, prefix_len_length);
data += prefix_len_length;
len -= prefix_len_length;
suffix_decoder_.SetData(num_values, data, len);
}
// TODO: this doesn't work and requires memory management. We need to allocate
// new strings to store the results.
virtual int Decode(ByteArray* buffer, int max_values) {
max_values = std::min(max_values, this->num_values_);
for (int i = 0; i < max_values; ++i) {
int prefix_len = 0;
prefix_len_decoder_.Decode(&prefix_len, 1);
ByteArray suffix = {0, nullptr};
suffix_decoder_.Decode(&suffix, 1);
buffer[i].len = prefix_len + suffix.len;
uint8_t* result = reinterpret_cast<uint8_t*>(malloc(buffer[i].len));
memcpy(result, last_value_.ptr, prefix_len);
memcpy(result + prefix_len, suffix.ptr, suffix.len);
buffer[i].ptr = result;
last_value_ = buffer[i];
}
this->num_values_ -= max_values;
return max_values;
}
private:
DeltaBitPackDecoder<Int32Type> prefix_len_decoder_;
DeltaLengthByteArrayDecoder suffix_decoder_;
ByteArray last_value_;
};
// ----------------------------------------------------------------------
// BYTE_STREAM_SPLIT
template <typename DType>
class ByteStreamSplitDecoder : public DecoderImpl, virtual public TypedDecoder<DType> {
public:
using T = typename DType::c_type;
explicit ByteStreamSplitDecoder(const ColumnDescriptor* descr);
int Decode(T* buffer, int max_values) override;
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<DType>::Accumulator* builder) override;
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<DType>::DictAccumulator* builder) override;
void SetData(int num_values, const uint8_t* data, int len) override;
T* EnsureDecodeBuffer(int64_t min_values) {
const int64_t size = sizeof(T) * min_values;
if (!decode_buffer_ || decode_buffer_->size() < size) {
PARQUET_ASSIGN_OR_THROW(decode_buffer_, ::arrow::AllocateBuffer(size));
}
return reinterpret_cast<T*>(decode_buffer_->mutable_data());
}
private:
int num_values_in_buffer_{0};
std::shared_ptr<Buffer> decode_buffer_;
static constexpr size_t kNumStreams = sizeof(T);
};
template <typename DType>
ByteStreamSplitDecoder<DType>::ByteStreamSplitDecoder(const ColumnDescriptor* descr)
: DecoderImpl(descr, Encoding::BYTE_STREAM_SPLIT) {}
template <typename DType>
void ByteStreamSplitDecoder<DType>::SetData(int num_values, const uint8_t* data,
int len) {
DecoderImpl::SetData(num_values, data, len);
if (num_values * static_cast<int64_t>(sizeof(T)) > len) {
throw ParquetException("Data size too small for number of values (corrupted file?)");
}
num_values_in_buffer_ = num_values;
}
template <typename DType>
int ByteStreamSplitDecoder<DType>::Decode(T* buffer, int max_values) {
const int values_to_decode = std::min(num_values_, max_values);
const int num_decoded_previously = num_values_in_buffer_ - num_values_;
const uint8_t* data = data_ + num_decoded_previously;
arrow::util::internal::ByteStreamSplitDecode<T>(data, values_to_decode,
num_values_in_buffer_, buffer);
num_values_ -= values_to_decode;
len_ -= sizeof(T) * values_to_decode;
return values_to_decode;
}
template <typename DType>
int ByteStreamSplitDecoder<DType>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<DType>::Accumulator* builder) {
constexpr int value_size = static_cast<int>(kNumStreams);
int values_decoded = num_values - null_count;
if (ARROW_PREDICT_FALSE(len_ < value_size * values_decoded)) {
ParquetException::EofException();
}
PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
const int num_decoded_previously = num_values_in_buffer_ - num_values_;
const uint8_t* data = data_ + num_decoded_previously;
int offset = 0;
#if defined(ARROW_HAVE_SIMD_SPLIT)
// Use fast decoding into intermediate buffer. This will also decode
// some null values, but it's fast enough that we don't care.
T* decode_out = EnsureDecodeBuffer(values_decoded);
arrow::util::internal::ByteStreamSplitDecode<T>(data, values_decoded,
num_values_in_buffer_, decode_out);
// XXX If null_count is 0, we could even append in bulk or decode directly into
// builder
VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
[&]() {
builder->UnsafeAppend(decode_out[offset]);
++offset;
},
[&]() { builder->UnsafeAppendNull(); });
#else
VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
[&]() {
uint8_t gathered_byte_data[kNumStreams];
for (size_t b = 0; b < kNumStreams; ++b) {
const size_t byte_index = b * num_values_in_buffer_ + offset;
gathered_byte_data[b] = data[byte_index];
}
builder->UnsafeAppend(arrow::util::SafeLoadAs<T>(&gathered_byte_data[0]));
++offset;
},
[&]() { builder->UnsafeAppendNull(); });
#endif
num_values_ -= values_decoded;
len_ -= sizeof(T) * values_decoded;
return values_decoded;
}
template <typename DType>
int ByteStreamSplitDecoder<DType>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<DType>::DictAccumulator* builder) {
ParquetException::NYI("DecodeArrow for ByteStreamSplitDecoder");
}
// ----------------------------------------------------------------------
// Encoder and decoder factory functions
std::unique_ptr<Encoder> MakeEncoder(Type::type type_num, Encoding::type encoding,
bool use_dictionary, const ColumnDescriptor* descr,
MemoryPool* pool) {
if (use_dictionary) {
switch (type_num) {
case Type::INT32:
return std::unique_ptr<Encoder>(new DictEncoderImpl<Int32Type>(descr, pool));
case Type::INT64:
return std::unique_ptr<Encoder>(new DictEncoderImpl<Int64Type>(descr, pool));
case Type::INT96:
return std::unique_ptr<Encoder>(new DictEncoderImpl<Int96Type>(descr, pool));
case Type::FLOAT:
return std::unique_ptr<Encoder>(new DictEncoderImpl<FloatType>(descr, pool));
case Type::DOUBLE:
return std::unique_ptr<Encoder>(new DictEncoderImpl<DoubleType>(descr, pool));
case Type::BYTE_ARRAY:
return std::unique_ptr<Encoder>(new DictEncoderImpl<ByteArrayType>(descr, pool));
case Type::FIXED_LEN_BYTE_ARRAY:
return std::unique_ptr<Encoder>(new DictEncoderImpl<FLBAType>(descr, pool));
default:
DCHECK(false) << "Encoder not implemented";
break;
}
} else if (encoding == Encoding::PLAIN) {
switch (type_num) {
case Type::BOOLEAN:
return std::unique_ptr<Encoder>(new PlainEncoder<BooleanType>(descr, pool));
case Type::INT32:
return std::unique_ptr<Encoder>(new PlainEncoder<Int32Type>(descr, pool));
case Type::INT64:
return std::unique_ptr<Encoder>(new PlainEncoder<Int64Type>(descr, pool));
case Type::INT96:
return std::unique_ptr<Encoder>(new PlainEncoder<Int96Type>(descr, pool));
case Type::FLOAT:
return std::unique_ptr<Encoder>(new PlainEncoder<FloatType>(descr, pool));
case Type::DOUBLE:
return std::unique_ptr<Encoder>(new PlainEncoder<DoubleType>(descr, pool));
case Type::BYTE_ARRAY:
return std::unique_ptr<Encoder>(new PlainEncoder<ByteArrayType>(descr, pool));
case Type::FIXED_LEN_BYTE_ARRAY:
return std::unique_ptr<Encoder>(new PlainEncoder<FLBAType>(descr, pool));
default:
DCHECK(false) << "Encoder not implemented";
break;
}
} else if (encoding == Encoding::BYTE_STREAM_SPLIT) {
switch (type_num) {
case Type::FLOAT:
return std::unique_ptr<Encoder>(
new ByteStreamSplitEncoder<FloatType>(descr, pool));
case Type::DOUBLE:
return std::unique_ptr<Encoder>(
new ByteStreamSplitEncoder<DoubleType>(descr, pool));
default:
throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE");
break;
}
} else {
ParquetException::NYI("Selected encoding is not supported");
}
DCHECK(false) << "Should not be able to reach this code";
return nullptr;
}
std::unique_ptr<Decoder> MakeDecoder(Type::type type_num, Encoding::type encoding,
const ColumnDescriptor* descr) {
if (encoding == Encoding::PLAIN) {
switch (type_num) {
case Type::BOOLEAN:
return std::unique_ptr<Decoder>(new PlainBooleanDecoder(descr));
case Type::INT32:
return std::unique_ptr<Decoder>(new PlainDecoder<Int32Type>(descr));
case Type::INT64:
return std::unique_ptr<Decoder>(new PlainDecoder<Int64Type>(descr));
case Type::INT96:
return std::unique_ptr<Decoder>(new PlainDecoder<Int96Type>(descr));
case Type::FLOAT:
return std::unique_ptr<Decoder>(new PlainDecoder<FloatType>(descr));
case Type::DOUBLE:
return std::unique_ptr<Decoder>(new PlainDecoder<DoubleType>(descr));
case Type::BYTE_ARRAY:
return std::unique_ptr<Decoder>(new PlainByteArrayDecoder(descr));
case Type::FIXED_LEN_BYTE_ARRAY:
return std::unique_ptr<Decoder>(new PlainFLBADecoder(descr));
default:
break;
}
} else if (encoding == Encoding::BYTE_STREAM_SPLIT) {
switch (type_num) {
case Type::FLOAT:
return std::unique_ptr<Decoder>(new ByteStreamSplitDecoder<FloatType>(descr));
case Type::DOUBLE:
return std::unique_ptr<Decoder>(new ByteStreamSplitDecoder<DoubleType>(descr));
default:
throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE");
break;
}
} else {
ParquetException::NYI("Selected encoding is not supported");
}
DCHECK(false) << "Should not be able to reach this code";
return nullptr;
}
namespace detail {
std::unique_ptr<Decoder> MakeDictDecoder(Type::type type_num,
const ColumnDescriptor* descr,
MemoryPool* pool) {
switch (type_num) {
case Type::BOOLEAN:
ParquetException::NYI("Dictionary encoding not implemented for boolean type");
case Type::INT32:
return std::unique_ptr<Decoder>(new DictDecoderImpl<Int32Type>(descr, pool));
case Type::INT64:
return std::unique_ptr<Decoder>(new DictDecoderImpl<Int64Type>(descr, pool));
case Type::INT96:
return std::unique_ptr<Decoder>(new DictDecoderImpl<Int96Type>(descr, pool));
case Type::FLOAT:
return std::unique_ptr<Decoder>(new DictDecoderImpl<FloatType>(descr, pool));
case Type::DOUBLE:
return std::unique_ptr<Decoder>(new DictDecoderImpl<DoubleType>(descr, pool));
case Type::BYTE_ARRAY:
return std::unique_ptr<Decoder>(new DictByteArrayDecoderImpl(descr, pool));
case Type::FIXED_LEN_BYTE_ARRAY:
return std::unique_ptr<Decoder>(new DictDecoderImpl<FLBAType>(descr, pool));
default:
break;
}
DCHECK(false) << "Should not be able to reach this code";
return nullptr;
}
} // namespace detail
} // namespace parquet