blob: d473ac528bd40fbc00fcd35403a9e95ffe8e08eb [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "parquet/encoding.h"
#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <memory>
#include <utility>
#include <vector>
#include "arrow/array.h"
#include "arrow/builder.h"
#include "arrow/util/bit_stream_utils.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/hashing.h"
#include "arrow/util/logging.h"
#include "arrow/util/rle_encoding.h"
#include "arrow/util/ubsan.h"
#include "parquet/exception.h"
#include "parquet/platform.h"
#include "parquet/schema.h"
#include "parquet/types.h"
using arrow::Status;
using arrow::internal::checked_cast;
namespace parquet {
constexpr int64_t kInMemoryDefaultCapacity = 1024;
class EncoderImpl : virtual public Encoder {
public:
EncoderImpl(const ColumnDescriptor* descr, Encoding::type encoding, MemoryPool* pool)
: descr_(descr),
encoding_(encoding),
pool_(pool),
type_length_(descr ? descr->type_length() : -1) {}
Encoding::type encoding() const override { return encoding_; }
MemoryPool* memory_pool() const override { return pool_; }
protected:
// For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
const ColumnDescriptor* descr_;
const Encoding::type encoding_;
MemoryPool* pool_;
/// Type length from descr
int type_length_;
};
// ----------------------------------------------------------------------
// Plain encoder implementation
template <typename DType>
class PlainEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
public:
using T = typename DType::c_type;
explicit PlainEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
: EncoderImpl(descr, Encoding::PLAIN, pool), sink_(pool) {}
int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
std::shared_ptr<Buffer> FlushValues() override {
std::shared_ptr<Buffer> buffer;
PARQUET_THROW_NOT_OK(sink_.Finish(&buffer));
return buffer;
}
void Put(const T* buffer, int num_values) override;
void Put(const arrow::Array& values) override;
void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) override {
std::shared_ptr<ResizableBuffer> buffer;
PARQUET_THROW_NOT_OK(arrow::AllocateResizableBuffer(this->memory_pool(),
num_values * sizeof(T), &buffer));
int32_t num_valid_values = 0;
arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
num_values);
T* data = reinterpret_cast<T*>(buffer->mutable_data());
for (int32_t i = 0; i < num_values; i++) {
if (valid_bits_reader.IsSet()) {
data[num_valid_values++] = src[i];
}
valid_bits_reader.Next();
}
Put(data, num_valid_values);
}
void UnsafePutByteArray(const void* data, uint32_t length) {
DCHECK(length == 0 || data != nullptr) << "Value ptr cannot be NULL";
sink_.UnsafeAppend(&length, sizeof(uint32_t));
sink_.UnsafeAppend(data, static_cast<int64_t>(length));
}
void Put(const ByteArray& val) {
// Write the result to the output stream
const int64_t increment = static_cast<int64_t>(val.len + sizeof(uint32_t));
if (ARROW_PREDICT_FALSE(sink_.length() + increment > sink_.capacity())) {
PARQUET_THROW_NOT_OK(sink_.Reserve(increment));
}
UnsafePutByteArray(val.ptr, val.len);
}
protected:
arrow::BufferBuilder sink_;
};
template <typename DType>
void PlainEncoder<DType>::Put(const T* buffer, int num_values) {
if (num_values > 0) {
PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T)));
}
}
template <>
inline void PlainEncoder<ByteArrayType>::Put(const ByteArray* src, int num_values) {
for (int i = 0; i < num_values; ++i) {
Put(src[i]);
}
}
template <typename DType>
void PlainEncoder<DType>::Put(const arrow::Array& values) {
ParquetException::NYI(values.type()->ToString());
}
void AssertBinary(const arrow::Array& values) {
if (values.type_id() != arrow::Type::BINARY &&
values.type_id() != arrow::Type::STRING) {
throw ParquetException("Only BinaryArray and subclasses supported");
}
}
template <>
void PlainEncoder<ByteArrayType>::Put(const arrow::Array& values) {
AssertBinary(values);
const auto& data = checked_cast<const arrow::BinaryArray&>(values);
const int64_t total_bytes = data.value_offset(data.length()) - data.value_offset(0);
PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes + data.length() * sizeof(uint32_t)));
if (data.null_count() == 0) {
// no nulls, just dump the data
for (int64_t i = 0; i < data.length(); i++) {
auto view = data.GetView(i);
UnsafePutByteArray(view.data(), static_cast<uint32_t>(view.size()));
}
} else {
for (int64_t i = 0; i < data.length(); i++) {
if (data.IsValid(i)) {
auto view = data.GetView(i);
UnsafePutByteArray(view.data(), static_cast<uint32_t>(view.size()));
}
}
}
}
template <>
inline void PlainEncoder<FLBAType>::Put(const FixedLenByteArray* src, int num_values) {
if (descr_->type_length() == 0) {
return;
}
for (int i = 0; i < num_values; ++i) {
// Write the result to the output stream
DCHECK(src[i].ptr != nullptr) << "Value ptr cannot be NULL";
PARQUET_THROW_NOT_OK(sink_.Append(src[i].ptr, descr_->type_length()));
}
}
class PlainFLBAEncoder : public PlainEncoder<FLBAType>, virtual public FLBAEncoder {
public:
using BASE = PlainEncoder<FLBAType>;
using BASE::PlainEncoder;
};
class PlainBooleanEncoder : public EncoderImpl,
virtual public TypedEncoder<BooleanType>,
virtual public BooleanEncoder {
public:
explicit PlainBooleanEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
: EncoderImpl(descr, Encoding::PLAIN, pool),
bits_available_(kInMemoryDefaultCapacity * 8),
bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
sink_(pool),
bit_writer_(bits_buffer_->mutable_data(),
static_cast<int>(bits_buffer_->size())) {}
int64_t EstimatedDataEncodedSize() override;
std::shared_ptr<Buffer> FlushValues() override;
void Put(const bool* src, int num_values) override;
void Put(const std::vector<bool>& src, int num_values) override;
void PutSpaced(const bool* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) override {
std::shared_ptr<ResizableBuffer> buffer;
PARQUET_THROW_NOT_OK(arrow::AllocateResizableBuffer(this->memory_pool(),
num_values * sizeof(T), &buffer));
int32_t num_valid_values = 0;
arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
num_values);
T* data = reinterpret_cast<T*>(buffer->mutable_data());
for (int32_t i = 0; i < num_values; i++) {
if (valid_bits_reader.IsSet()) {
data[num_valid_values++] = src[i];
}
valid_bits_reader.Next();
}
Put(data, num_valid_values);
}
void Put(const arrow::Array& values) override {
ParquetException::NYI("Direct Arrow to Boolean writes not implemented");
}
private:
int bits_available_;
std::shared_ptr<ResizableBuffer> bits_buffer_;
arrow::BufferBuilder sink_;
arrow::BitUtil::BitWriter bit_writer_;
template <typename SequenceType>
void PutImpl(const SequenceType& src, int num_values);
};
template <typename SequenceType>
void PlainBooleanEncoder::PutImpl(const SequenceType& src, int num_values) {
int bit_offset = 0;
if (bits_available_ > 0) {
int bits_to_write = std::min(bits_available_, num_values);
for (int i = 0; i < bits_to_write; i++) {
bit_writer_.PutValue(src[i], 1);
}
bits_available_ -= bits_to_write;
bit_offset = bits_to_write;
if (bits_available_ == 0) {
bit_writer_.Flush();
PARQUET_THROW_NOT_OK(
sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
bit_writer_.Clear();
}
}
int bits_remaining = num_values - bit_offset;
while (bit_offset < num_values) {
bits_available_ = static_cast<int>(bits_buffer_->size()) * 8;
int bits_to_write = std::min(bits_available_, bits_remaining);
for (int i = bit_offset; i < bit_offset + bits_to_write; i++) {
bit_writer_.PutValue(src[i], 1);
}
bit_offset += bits_to_write;
bits_available_ -= bits_to_write;
bits_remaining -= bits_to_write;
if (bits_available_ == 0) {
bit_writer_.Flush();
PARQUET_THROW_NOT_OK(
sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
bit_writer_.Clear();
}
}
}
int64_t PlainBooleanEncoder::EstimatedDataEncodedSize() {
int64_t position = sink_.length();
return position + bit_writer_.bytes_written();
}
std::shared_ptr<Buffer> PlainBooleanEncoder::FlushValues() {
if (bits_available_ > 0) {
bit_writer_.Flush();
PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
bit_writer_.Clear();
bits_available_ = static_cast<int>(bits_buffer_->size()) * 8;
}
std::shared_ptr<Buffer> buffer;
PARQUET_THROW_NOT_OK(sink_.Finish(&buffer));
return buffer;
}
void PlainBooleanEncoder::Put(const bool* src, int num_values) {
PutImpl(src, num_values);
}
void PlainBooleanEncoder::Put(const std::vector<bool>& src, int num_values) {
PutImpl(src, num_values);
}
// ----------------------------------------------------------------------
// DictEncoder<T> implementations
template <typename DType>
struct DictEncoderTraits {
using c_type = typename DType::c_type;
using MemoTableType = arrow::internal::ScalarMemoTable<c_type>;
};
template <>
struct DictEncoderTraits<ByteArrayType> {
using MemoTableType = arrow::internal::BinaryMemoTable;
};
template <>
struct DictEncoderTraits<FLBAType> {
using MemoTableType = arrow::internal::BinaryMemoTable;
};
// Initially 1024 elements
static constexpr int32_t kInitialHashTableSize = 1 << 10;
/// See the dictionary encoding section of
/// https://github.com/Parquet/parquet-format. The encoding supports
/// streaming encoding. Values are encoded as they are added while the
/// dictionary is being constructed. At any time, the buffered values
/// can be written out with the current dictionary size. More values
/// can then be added to the encoder, including new dictionary
/// entries.
template <typename DType>
class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder<DType> {
using MemoTableType = typename DictEncoderTraits<DType>::MemoTableType;
public:
typedef typename DType::c_type T;
explicit DictEncoderImpl(const ColumnDescriptor* desc, MemoryPool* pool)
: EncoderImpl(desc, Encoding::PLAIN_DICTIONARY, pool),
dict_encoded_size_(0),
memo_table_(pool, kInitialHashTableSize) {}
~DictEncoderImpl() override { DCHECK(buffered_indices_.empty()); }
int dict_encoded_size() override { return dict_encoded_size_; }
int WriteIndices(uint8_t* buffer, int buffer_len) override {
// Write bit width in first byte
*buffer = static_cast<uint8_t>(bit_width());
++buffer;
--buffer_len;
arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width());
for (int32_t index : buffered_indices_) {
if (!encoder.Put(index)) return -1;
}
encoder.Flush();
ClearIndices();
return 1 + encoder.len();
}
void set_type_length(int type_length) { this->type_length_ = type_length; }
/// Returns a conservative estimate of the number of bytes needed to encode the buffered
/// indices. Used to size the buffer passed to WriteIndices().
int64_t EstimatedDataEncodedSize() override {
// Note: because of the way RleEncoder::CheckBufferFull() is called, we have to
// reserve
// an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used
// but not reserving them would cause the encoder to fail.
return 1 +
arrow::util::RleEncoder::MaxBufferSize(
bit_width(), static_cast<int>(buffered_indices_.size())) +
arrow::util::RleEncoder::MinBufferSize(bit_width());
}
/// The minimum bit width required to encode the currently buffered indices.
int bit_width() const override {
if (ARROW_PREDICT_FALSE(num_entries() == 0)) return 0;
if (ARROW_PREDICT_FALSE(num_entries() == 1)) return 1;
return BitUtil::Log2(num_entries());
}
/// Encode value. Note that this does not actually write any data, just
/// buffers the value's index to be written later.
inline void Put(const T& value);
// Not implemented for other data types
inline void PutByteArray(const void* ptr, int32_t length);
void Put(const T* src, int num_values) override {
for (int32_t i = 0; i < num_values; i++) {
Put(src[i]);
}
}
void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) override {
arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
num_values);
for (int32_t i = 0; i < num_values; i++) {
if (valid_bits_reader.IsSet()) {
Put(src[i]);
}
valid_bits_reader.Next();
}
}
void Put(const arrow::Array& values) override;
void PutDictionary(const arrow::Array& values) override;
template <typename ArrowType>
void PutIndicesTyped(const arrow::Array& data) {
using ArrayType = typename arrow::TypeTraits<ArrowType>::ArrayType;
const auto& indices = checked_cast<const ArrayType&>(data);
auto values = indices.raw_values();
size_t buffer_position = buffered_indices_.size();
buffered_indices_.resize(
buffer_position + static_cast<size_t>(indices.length() - indices.null_count()));
if (indices.null_count() > 0) {
arrow::internal::BitmapReader valid_bits_reader(indices.null_bitmap_data(),
indices.offset(), indices.length());
for (int64_t i = 0; i < indices.length(); ++i) {
if (valid_bits_reader.IsSet()) {
buffered_indices_[buffer_position++] = static_cast<int32_t>(values[i]);
}
valid_bits_reader.Next();
}
} else {
for (int64_t i = 0; i < indices.length(); ++i) {
buffered_indices_[buffer_position++] = static_cast<int32_t>(values[i]);
}
}
}
void PutIndices(const arrow::Array& data) override {
switch (data.type()->id()) {
case arrow::Type::INT8:
return PutIndicesTyped<arrow::Int8Type>(data);
case arrow::Type::INT16:
return PutIndicesTyped<arrow::Int16Type>(data);
case arrow::Type::INT32:
return PutIndicesTyped<arrow::Int32Type>(data);
case arrow::Type::INT64:
return PutIndicesTyped<arrow::Int64Type>(data);
default:
throw ParquetException("Dictionary indices were not signed integer");
}
}
std::shared_ptr<Buffer> FlushValues() override {
std::shared_ptr<ResizableBuffer> buffer =
AllocateBuffer(this->pool_, EstimatedDataEncodedSize());
int result_size = WriteIndices(buffer->mutable_data(),
static_cast<int>(EstimatedDataEncodedSize()));
PARQUET_THROW_NOT_OK(buffer->Resize(result_size, false));
return std::move(buffer);
}
/// Writes out the encoded dictionary to buffer. buffer must be preallocated to
/// dict_encoded_size() bytes.
void WriteDict(uint8_t* buffer) override;
/// The number of entries in the dictionary.
int num_entries() const override { return memo_table_.size(); }
private:
/// Clears all the indices (but leaves the dictionary).
void ClearIndices() { buffered_indices_.clear(); }
/// Indices that have not yet be written out by WriteIndices().
std::vector<int32_t> buffered_indices_;
/// The number of bytes needed to encode the dictionary.
int dict_encoded_size_;
MemoTableType memo_table_;
};
template <typename DType>
void DictEncoderImpl<DType>::WriteDict(uint8_t* buffer) {
// For primitive types, only a memcpy
DCHECK_EQ(static_cast<size_t>(dict_encoded_size_), sizeof(T) * memo_table_.size());
memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast<T*>(buffer));
}
// ByteArray and FLBA already have the dictionary encoded in their data heaps
template <>
void DictEncoderImpl<ByteArrayType>::WriteDict(uint8_t* buffer) {
memo_table_.VisitValues(0, [&buffer](const arrow::util::string_view& v) {
uint32_t len = static_cast<uint32_t>(v.length());
memcpy(buffer, &len, sizeof(len));
buffer += sizeof(len);
memcpy(buffer, v.data(), len);
buffer += len;
});
}
template <>
void DictEncoderImpl<FLBAType>::WriteDict(uint8_t* buffer) {
memo_table_.VisitValues(0, [&](const arrow::util::string_view& v) {
DCHECK_EQ(v.length(), static_cast<size_t>(type_length_));
memcpy(buffer, v.data(), type_length_);
buffer += type_length_;
});
}
template <typename DType>
inline void DictEncoderImpl<DType>::Put(const T& v) {
// Put() implementation for primitive types
auto on_found = [](int32_t memo_index) {};
auto on_not_found = [this](int32_t memo_index) {
dict_encoded_size_ += static_cast<int>(sizeof(T));
};
auto memo_index = memo_table_.GetOrInsert(v, on_found, on_not_found);
buffered_indices_.push_back(memo_index);
}
template <typename DType>
inline void DictEncoderImpl<DType>::PutByteArray(const void* ptr, int32_t length) {
DCHECK(false);
}
template <>
inline void DictEncoderImpl<ByteArrayType>::PutByteArray(const void* ptr,
int32_t length) {
static const uint8_t empty[] = {0};
auto on_found = [](int32_t memo_index) {};
auto on_not_found = [&](int32_t memo_index) {
dict_encoded_size_ += static_cast<int>(length + sizeof(uint32_t));
};
DCHECK(ptr != nullptr || length == 0);
ptr = (ptr != nullptr) ? ptr : empty;
auto memo_index = memo_table_.GetOrInsert(ptr, length, on_found, on_not_found);
buffered_indices_.push_back(memo_index);
}
template <>
inline void DictEncoderImpl<ByteArrayType>::Put(const ByteArray& val) {
return PutByteArray(val.ptr, static_cast<int32_t>(val.len));
}
template <>
inline void DictEncoderImpl<FLBAType>::Put(const FixedLenByteArray& v) {
static const uint8_t empty[] = {0};
auto on_found = [](int32_t memo_index) {};
auto on_not_found = [this](int32_t memo_index) { dict_encoded_size_ += type_length_; };
DCHECK(v.ptr != nullptr || type_length_ == 0);
const void* ptr = (v.ptr != nullptr) ? v.ptr : empty;
auto memo_index = memo_table_.GetOrInsert(ptr, type_length_, on_found, on_not_found);
buffered_indices_.push_back(memo_index);
}
template <typename DType>
void DictEncoderImpl<DType>::Put(const arrow::Array& values) {
ParquetException::NYI(values.type()->ToString());
}
template <>
void DictEncoderImpl<ByteArrayType>::Put(const arrow::Array& values) {
AssertBinary(values);
const auto& data = checked_cast<const arrow::BinaryArray&>(values);
if (data.null_count() == 0) {
// no nulls, just dump the data
for (int64_t i = 0; i < data.length(); i++) {
auto view = data.GetView(i);
PutByteArray(view.data(), static_cast<int32_t>(view.size()));
}
} else {
for (int64_t i = 0; i < data.length(); i++) {
if (data.IsValid(i)) {
auto view = data.GetView(i);
PutByteArray(view.data(), static_cast<int32_t>(view.size()));
}
}
}
}
template <typename DType>
void DictEncoderImpl<DType>::PutDictionary(const arrow::Array& values) {
ParquetException::NYI(values.type()->ToString());
}
template <>
void DictEncoderImpl<ByteArrayType>::PutDictionary(const arrow::Array& values) {
AssertBinary(values);
if (this->num_entries() > 0) {
throw ParquetException("Can only call PutDictionary on an empty DictEncoder");
}
const auto& data = checked_cast<const arrow::BinaryArray&>(values);
if (data.null_count() > 0) {
throw ParquetException("Inserted binary dictionary cannot cannot contain nulls");
}
for (int64_t i = 0; i < data.length(); i++) {
auto v = data.GetView(i);
dict_encoded_size_ += static_cast<int>(v.size() + sizeof(uint32_t));
ARROW_IGNORE_EXPR(
memo_table_.GetOrInsert(v.data(), static_cast<int32_t>(v.size()),
/*on_found=*/[](int32_t memo_index) {},
/*on_not_found=*/[](int32_t memo_index) {}));
}
}
// ----------------------------------------------------------------------
// Encoder and decoder factory functions
std::unique_ptr<Encoder> MakeEncoder(Type::type type_num, Encoding::type encoding,
bool use_dictionary, const ColumnDescriptor* descr,
MemoryPool* pool) {
if (use_dictionary) {
switch (type_num) {
case Type::INT32:
return std::unique_ptr<Encoder>(new DictEncoderImpl<Int32Type>(descr, pool));
case Type::INT64:
return std::unique_ptr<Encoder>(new DictEncoderImpl<Int64Type>(descr, pool));
case Type::INT96:
return std::unique_ptr<Encoder>(new DictEncoderImpl<Int96Type>(descr, pool));
case Type::FLOAT:
return std::unique_ptr<Encoder>(new DictEncoderImpl<FloatType>(descr, pool));
case Type::DOUBLE:
return std::unique_ptr<Encoder>(new DictEncoderImpl<DoubleType>(descr, pool));
case Type::BYTE_ARRAY:
return std::unique_ptr<Encoder>(new DictEncoderImpl<ByteArrayType>(descr, pool));
case Type::FIXED_LEN_BYTE_ARRAY:
return std::unique_ptr<Encoder>(new DictEncoderImpl<FLBAType>(descr, pool));
default:
DCHECK(false) << "Encoder not implemented";
break;
}
} else if (encoding == Encoding::PLAIN) {
switch (type_num) {
case Type::BOOLEAN:
return std::unique_ptr<Encoder>(new PlainBooleanEncoder(descr, pool));
case Type::INT32:
return std::unique_ptr<Encoder>(new PlainEncoder<Int32Type>(descr, pool));
case Type::INT64:
return std::unique_ptr<Encoder>(new PlainEncoder<Int64Type>(descr, pool));
case Type::INT96:
return std::unique_ptr<Encoder>(new PlainEncoder<Int96Type>(descr, pool));
case Type::FLOAT:
return std::unique_ptr<Encoder>(new PlainEncoder<FloatType>(descr, pool));
case Type::DOUBLE:
return std::unique_ptr<Encoder>(new PlainEncoder<DoubleType>(descr, pool));
case Type::BYTE_ARRAY:
return std::unique_ptr<Encoder>(new PlainEncoder<ByteArrayType>(descr, pool));
case Type::FIXED_LEN_BYTE_ARRAY:
return std::unique_ptr<Encoder>(new PlainEncoder<FLBAType>(descr, pool));
default:
DCHECK(false) << "Encoder not implemented";
break;
}
} else {
ParquetException::NYI("Selected encoding is not supported");
}
DCHECK(false) << "Should not be able to reach this code";
return nullptr;
}
class DecoderImpl : virtual public Decoder {
public:
void SetData(int num_values, const uint8_t* data, int len) override {
num_values_ = num_values;
data_ = data;
len_ = len;
}
int values_left() const override { return num_values_; }
Encoding::type encoding() const override { return encoding_; }
protected:
explicit DecoderImpl(const ColumnDescriptor* descr, Encoding::type encoding)
: descr_(descr), encoding_(encoding), num_values_(0), data_(NULLPTR), len_(0) {}
// For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
const ColumnDescriptor* descr_;
const Encoding::type encoding_;
int num_values_;
const uint8_t* data_;
int len_;
int type_length_;
};
template <typename DType>
class PlainDecoder : public DecoderImpl, virtual public TypedDecoder<DType> {
public:
using T = typename DType::c_type;
explicit PlainDecoder(const ColumnDescriptor* descr);
int Decode(T* buffer, int max_values) override;
};
template <typename DType>
PlainDecoder<DType>::PlainDecoder(const ColumnDescriptor* descr)
: DecoderImpl(descr, Encoding::PLAIN) {
if (descr_ && descr_->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
type_length_ = descr_->type_length();
} else {
type_length_ = -1;
}
}
// Decode routine templated on C++ type rather than type enum
template <typename T>
inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values,
int type_length, T* out) {
int bytes_to_decode = num_values * static_cast<int>(sizeof(T));
if (data_size < bytes_to_decode) {
ParquetException::EofException();
}
// If bytes_to_decode == 0, data could be null
if (bytes_to_decode > 0) {
memcpy(out, data, bytes_to_decode);
}
return bytes_to_decode;
}
// Template specialization for BYTE_ARRAY. The written values do not own their
// own data.
template <>
inline int DecodePlain<ByteArray>(const uint8_t* data, int64_t data_size, int num_values,
int type_length, ByteArray* out) {
int bytes_decoded = 0;
int increment;
for (int i = 0; i < num_values; ++i) {
uint32_t len = out[i].len = arrow::util::SafeLoadAs<uint32_t>(data);
increment = static_cast<int>(sizeof(uint32_t) + len);
if (data_size < increment) ParquetException::EofException();
out[i].ptr = data + sizeof(uint32_t);
data += increment;
data_size -= increment;
bytes_decoded += increment;
}
return bytes_decoded;
}
// Template specialization for FIXED_LEN_BYTE_ARRAY. The written values do not
// own their own data.
template <>
inline int DecodePlain<FixedLenByteArray>(const uint8_t* data, int64_t data_size,
int num_values, int type_length,
FixedLenByteArray* out) {
int bytes_to_decode = type_length * num_values;
if (data_size < bytes_to_decode) {
ParquetException::EofException();
}
for (int i = 0; i < num_values; ++i) {
out[i].ptr = data;
data += type_length;
data_size -= type_length;
}
return bytes_to_decode;
}
template <typename DType>
int PlainDecoder<DType>::Decode(T* buffer, int max_values) {
max_values = std::min(max_values, num_values_);
int bytes_consumed = DecodePlain<T>(data_, len_, max_values, type_length_, buffer);
data_ += bytes_consumed;
len_ -= bytes_consumed;
num_values_ -= max_values;
return max_values;
}
class PlainBooleanDecoder : public DecoderImpl,
virtual public TypedDecoder<BooleanType>,
virtual public BooleanDecoder {
public:
explicit PlainBooleanDecoder(const ColumnDescriptor* descr);
void SetData(int num_values, const uint8_t* data, int len) override;
// Two flavors of bool decoding
int Decode(uint8_t* buffer, int max_values) override;
int Decode(bool* buffer, int max_values) override;
private:
std::unique_ptr<arrow::BitUtil::BitReader> bit_reader_;
};
PlainBooleanDecoder::PlainBooleanDecoder(const ColumnDescriptor* descr)
: DecoderImpl(descr, Encoding::PLAIN) {}
void PlainBooleanDecoder::SetData(int num_values, const uint8_t* data, int len) {
num_values_ = num_values;
bit_reader_.reset(new BitUtil::BitReader(data, len));
}
int PlainBooleanDecoder::Decode(uint8_t* buffer, int max_values) {
max_values = std::min(max_values, num_values_);
bool val;
arrow::internal::BitmapWriter bit_writer(buffer, 0, max_values);
for (int i = 0; i < max_values; ++i) {
if (!bit_reader_->GetValue(1, &val)) {
ParquetException::EofException();
}
if (val) {
bit_writer.Set();
}
bit_writer.Next();
}
bit_writer.Finish();
num_values_ -= max_values;
return max_values;
}
int PlainBooleanDecoder::Decode(bool* buffer, int max_values) {
max_values = std::min(max_values, num_values_);
if (bit_reader_->GetBatch(1, buffer, max_values) != max_values) {
ParquetException::EofException();
}
num_values_ -= max_values;
return max_values;
}
struct ArrowBinaryHelper {
explicit ArrowBinaryHelper(ArrowBinaryAccumulator* out) {
this->out = out;
this->builder = out->builder.get();
this->chunk_space_remaining =
::arrow::kBinaryMemoryLimit - this->builder->value_data_length();
}
Status PushChunk() {
std::shared_ptr<::arrow::Array> result;
RETURN_NOT_OK(builder->Finish(&result));
out->chunks.push_back(result);
chunk_space_remaining = ::arrow::kBinaryMemoryLimit;
return Status::OK();
}
bool CanFit(int64_t length) const { return length <= chunk_space_remaining; }
void UnsafeAppend(const uint8_t* data, int32_t length) {
chunk_space_remaining -= length;
builder->UnsafeAppend(data, length);
}
void UnsafeAppendNull() { builder->UnsafeAppendNull(); }
Status Append(const uint8_t* data, int32_t length) {
chunk_space_remaining -= length;
return builder->Append(data, length);
}
Status AppendNull() { return builder->AppendNull(); }
ArrowBinaryAccumulator* out;
arrow::BinaryBuilder* builder;
int64_t chunk_space_remaining;
};
class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType>,
virtual public ByteArrayDecoder {
public:
using Base = PlainDecoder<ByteArrayType>;
using Base::DecodeSpaced;
using Base::PlainDecoder;
// ----------------------------------------------------------------------
// Dictionary read paths
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
arrow::BinaryDictionary32Builder* builder) override {
int result = 0;
PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits,
valid_bits_offset, builder, &result));
return result;
}
int DecodeArrowNonNull(int num_values,
arrow::BinaryDictionary32Builder* builder) override {
int result = 0;
PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, builder, &result));
return result;
}
// ----------------------------------------------------------------------
// Optimized dense binary read paths
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset, ArrowBinaryAccumulator* out) override {
int result = 0;
PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits,
valid_bits_offset, out, &result));
return result;
}
int DecodeArrowNonNull(int num_values, ArrowBinaryAccumulator* out) override {
int result = 0;
PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull(num_values, out, &result));
return result;
}
private:
Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset, ArrowBinaryAccumulator* out,
int* out_values_decoded) {
ArrowBinaryHelper helper(out);
arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
int values_decoded = 0;
RETURN_NOT_OK(helper.builder->Reserve(num_values));
RETURN_NOT_OK(helper.builder->ReserveData(
std::min<int64_t>(len_, helper.chunk_space_remaining)));
for (int i = 0; i < num_values; ++i) {
if (bit_reader.IsSet()) {
auto value_len = static_cast<int32_t>(arrow::util::SafeLoadAs<uint32_t>(data_));
int increment = static_cast<int>(sizeof(uint32_t) + value_len);
if (ARROW_PREDICT_FALSE(len_ < increment)) ParquetException::EofException();
if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) {
// This element would exceed the capacity of a chunk
RETURN_NOT_OK(helper.PushChunk());
RETURN_NOT_OK(helper.builder->Reserve(num_values - i));
RETURN_NOT_OK(helper.builder->ReserveData(
std::min<int64_t>(len_, helper.chunk_space_remaining)));
}
helper.UnsafeAppend(data_ + sizeof(uint32_t), value_len);
data_ += increment;
len_ -= increment;
++values_decoded;
} else {
helper.UnsafeAppendNull();
}
bit_reader.Next();
}
num_values_ -= values_decoded;
*out_values_decoded = values_decoded;
return Status::OK();
}
Status DecodeArrowDenseNonNull(int num_values, ArrowBinaryAccumulator* out,
int* values_decoded) {
ArrowBinaryHelper helper(out);
num_values = std::min(num_values, num_values_);
RETURN_NOT_OK(helper.builder->Reserve(num_values));
RETURN_NOT_OK(helper.builder->ReserveData(
std::min<int64_t>(len_, helper.chunk_space_remaining)));
for (int i = 0; i < num_values; ++i) {
int32_t value_len = static_cast<int32_t>(arrow::util::SafeLoadAs<uint32_t>(data_));
int increment = static_cast<int>(sizeof(uint32_t) + value_len);
if (ARROW_PREDICT_FALSE(len_ < increment)) ParquetException::EofException();
if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) {
// This element would exceed the capacity of a chunk
RETURN_NOT_OK(helper.PushChunk());
RETURN_NOT_OK(helper.builder->Reserve(num_values - i));
RETURN_NOT_OK(helper.builder->ReserveData(
std::min<int64_t>(len_, helper.chunk_space_remaining)));
}
helper.UnsafeAppend(data_ + sizeof(uint32_t), value_len);
data_ += increment;
len_ -= increment;
}
num_values_ -= num_values;
*values_decoded = num_values;
return Status::OK();
}
template <typename BuilderType>
Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset, BuilderType* builder,
int* out_values_decoded) {
RETURN_NOT_OK(builder->Reserve(num_values));
arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
int values_decoded = 0;
for (int i = 0; i < num_values; ++i) {
if (bit_reader.IsSet()) {
uint32_t value_len = arrow::util::SafeLoadAs<uint32_t>(data_);
int increment = static_cast<int>(sizeof(uint32_t) + value_len);
if (len_ < increment) {
ParquetException::EofException();
}
RETURN_NOT_OK(builder->Append(data_ + sizeof(uint32_t), value_len));
data_ += increment;
len_ -= increment;
++values_decoded;
} else {
RETURN_NOT_OK(builder->AppendNull());
}
bit_reader.Next();
}
num_values_ -= values_decoded;
*out_values_decoded = values_decoded;
return Status::OK();
}
template <typename BuilderType>
Status DecodeArrowNonNull(int num_values, BuilderType* builder, int* values_decoded) {
num_values = std::min(num_values, num_values_);
RETURN_NOT_OK(builder->Reserve(num_values));
for (int i = 0; i < num_values; ++i) {
uint32_t value_len = arrow::util::SafeLoadAs<uint32_t>(data_);
int increment = static_cast<int>(sizeof(uint32_t) + value_len);
if (len_ < increment) ParquetException::EofException();
RETURN_NOT_OK(builder->Append(data_ + sizeof(uint32_t), value_len));
data_ += increment;
len_ -= increment;
}
num_values_ -= num_values;
*values_decoded = num_values;
return Status::OK();
}
};
class PlainFLBADecoder : public PlainDecoder<FLBAType>, virtual public FLBADecoder {
public:
using Base = PlainDecoder<FLBAType>;
using Base::PlainDecoder;
};
// ----------------------------------------------------------------------
// Dictionary encoding and decoding
template <typename Type>
class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder<Type> {
public:
typedef typename Type::c_type T;
// Initializes the dictionary with values from 'dictionary'. The data in
// dictionary is not guaranteed to persist in memory after this call so the
// dictionary decoder needs to copy the data out if necessary.
explicit DictDecoderImpl(const ColumnDescriptor* descr,
MemoryPool* pool = arrow::default_memory_pool())
: DecoderImpl(descr, Encoding::RLE_DICTIONARY),
dictionary_(AllocateBuffer(pool, 0)),
dictionary_length_(0),
byte_array_data_(AllocateBuffer(pool, 0)),
byte_array_offsets_(AllocateBuffer(pool, 0)),
indices_scratch_space_(AllocateBuffer(pool, 0)) {}
// Perform type-specific initiatialization
void SetDict(TypedDecoder<Type>* dictionary) override;
void SetData(int num_values, const uint8_t* data, int len) override {
num_values_ = num_values;
if (len == 0) return;
uint8_t bit_width = *data;
++data;
--len;
idx_decoder_ = arrow::util::RleDecoder(data, len, bit_width);
}
int Decode(T* buffer, int num_values) override {
num_values = std::min(num_values, num_values_);
int decoded_values = idx_decoder_.GetBatchWithDict(
reinterpret_cast<const T*>(dictionary_->data()), buffer, num_values);
if (decoded_values != num_values) {
ParquetException::EofException();
}
num_values_ -= num_values;
return num_values;
}
int DecodeSpaced(T* buffer, int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset) override {
num_values = std::min(num_values, num_values_);
if (num_values != idx_decoder_.GetBatchWithDictSpaced(
reinterpret_cast<const T*>(dictionary_->data()), buffer,
num_values, null_count, valid_bits, valid_bits_offset)) {
ParquetException::EofException();
}
num_values_ -= num_values;
return num_values;
}
void InsertDictionary(arrow::ArrayBuilder* builder) override;
int DecodeIndicesSpaced(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
arrow::ArrayBuilder* builder) override {
if (num_values > 0) {
// TODO(wesm): Refactor to batch reads for improved memory use. It is not
// trivial because the null_count is relative to the entire bitmap
PARQUET_THROW_NOT_OK(indices_scratch_space_->TypedResize<int32_t>(
num_values, /*shrink_to_fit=*/false));
}
auto indices_buffer =
reinterpret_cast<int32_t*>(indices_scratch_space_->mutable_data());
if (num_values != idx_decoder_.GetBatchSpaced(num_values, null_count, valid_bits,
valid_bits_offset, indices_buffer)) {
ParquetException::EofException();
}
/// XXX(wesm): Cannot append "valid bits" directly to the builder
std::vector<uint8_t> valid_bytes(num_values);
arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
for (int64_t i = 0; i < num_values; ++i) {
valid_bytes[i] = static_cast<uint8_t>(bit_reader.IsSet());
bit_reader.Next();
}
auto binary_builder = checked_cast<arrow::BinaryDictionary32Builder*>(builder);
PARQUET_THROW_NOT_OK(
binary_builder->AppendIndices(indices_buffer, num_values, valid_bytes.data()));
num_values_ -= num_values - null_count;
return num_values - null_count;
}
int DecodeIndices(int num_values, arrow::ArrayBuilder* builder) override {
num_values = std::min(num_values, num_values_);
num_values = std::min(num_values, num_values_);
if (num_values > 0) {
// TODO(wesm): Refactor to batch reads for improved memory use. This is
// relatively simple here because we don't have to do any bookkeeping of
// nulls
PARQUET_THROW_NOT_OK(indices_scratch_space_->TypedResize<int32_t>(
num_values, /*shrink_to_fit=*/false));
}
auto indices_buffer =
reinterpret_cast<int32_t*>(indices_scratch_space_->mutable_data());
if (num_values != idx_decoder_.GetBatch(indices_buffer, num_values)) {
ParquetException::EofException();
}
auto binary_builder = checked_cast<arrow::BinaryDictionary32Builder*>(builder);
PARQUET_THROW_NOT_OK(binary_builder->AppendIndices(indices_buffer, num_values));
num_values_ -= num_values;
return num_values;
}
protected:
inline void DecodeDict(TypedDecoder<Type>* dictionary) {
dictionary_length_ = static_cast<int32_t>(dictionary->values_left());
PARQUET_THROW_NOT_OK(dictionary_->Resize(dictionary_length_ * sizeof(T),
/*shrink_to_fit=*/false));
dictionary->Decode(reinterpret_cast<T*>(dictionary_->mutable_data()),
dictionary_length_);
}
// Only one is set.
std::shared_ptr<ResizableBuffer> dictionary_;
int32_t dictionary_length_;
// Data that contains the byte array data (byte_array_dictionary_ just has the
// pointers).
std::shared_ptr<ResizableBuffer> byte_array_data_;
// Arrow-style byte offsets for each dictionary value. We maintain two
// representations of the dictionary, one as ByteArray* for non-Arrow
// consumers and this one for Arrow conumers. Since dictionaries are
// generally pretty small to begin with this doesn't mean too much extra
// memory use in most cases
std::shared_ptr<ResizableBuffer> byte_array_offsets_;
// Reusable buffer for decoding dictionary indices to be appended to a
// BinaryDictionary32Builder
std::shared_ptr<ResizableBuffer> indices_scratch_space_;
arrow::util::RleDecoder idx_decoder_;
};
template <typename Type>
void DictDecoderImpl<Type>::SetDict(TypedDecoder<Type>* dictionary) {
DecodeDict(dictionary);
}
template <>
void DictDecoderImpl<BooleanType>::SetDict(TypedDecoder<BooleanType>* dictionary) {
ParquetException::NYI("Dictionary encoding is not implemented for boolean values");
}
template <>
void DictDecoderImpl<ByteArrayType>::SetDict(TypedDecoder<ByteArrayType>* dictionary) {
DecodeDict(dictionary);
auto dict_values = reinterpret_cast<ByteArray*>(dictionary_->mutable_data());
int total_size = 0;
for (int i = 0; i < dictionary_length_; ++i) {
total_size += dict_values[i].len;
}
if (total_size > 0) {
PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size,
/*shrink_to_fit=*/false));
PARQUET_THROW_NOT_OK(
byte_array_offsets_->Resize((dictionary_length_ + 1) * sizeof(int32_t),
/*shrink_to_fit=*/false));
}
int32_t offset = 0;
uint8_t* bytes_data = byte_array_data_->mutable_data();
int32_t* bytes_offsets =
reinterpret_cast<int32_t*>(byte_array_offsets_->mutable_data());
for (int i = 0; i < dictionary_length_; ++i) {
memcpy(bytes_data + offset, dict_values[i].ptr, dict_values[i].len);
bytes_offsets[i] = offset;
dict_values[i].ptr = bytes_data + offset;
offset += dict_values[i].len;
}
bytes_offsets[dictionary_length_] = offset;
}
template <>
inline void DictDecoderImpl<FLBAType>::SetDict(TypedDecoder<FLBAType>* dictionary) {
DecodeDict(dictionary);
auto dict_values = reinterpret_cast<FLBA*>(dictionary_->mutable_data());
int fixed_len = descr_->type_length();
int total_size = dictionary_length_ * fixed_len;
PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size,
/*shrink_to_fit=*/false));
uint8_t* bytes_data = byte_array_data_->mutable_data();
for (int32_t i = 0, offset = 0; i < dictionary_length_; ++i, offset += fixed_len) {
memcpy(bytes_data + offset, dict_values[i].ptr, fixed_len);
dict_values[i].ptr = bytes_data + offset;
}
}
template <typename Type>
void DictDecoderImpl<Type>::InsertDictionary(arrow::ArrayBuilder* builder) {
ParquetException::NYI("InsertDictionary only implemented for BYTE_ARRAY types");
}
template <>
void DictDecoderImpl<ByteArrayType>::InsertDictionary(arrow::ArrayBuilder* builder) {
auto binary_builder = checked_cast<arrow::BinaryDictionary32Builder*>(builder);
// Make an BinaryArray referencing the internal dictionary data
auto arr = std::make_shared<arrow::BinaryArray>(dictionary_length_, byte_array_offsets_,
byte_array_data_);
PARQUET_THROW_NOT_OK(binary_builder->InsertMemoValues(*arr));
}
class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
virtual public ByteArrayDecoder {
public:
using BASE = DictDecoderImpl<ByteArrayType>;
using BASE::DictDecoderImpl;
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
arrow::BinaryDictionary32Builder* builder) override {
int result = 0;
PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits,
valid_bits_offset, builder, &result));
return result;
}
int DecodeArrowNonNull(int num_values,
arrow::BinaryDictionary32Builder* builder) override {
int result = 0;
PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, builder, &result));
return result;
}
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset, ArrowBinaryAccumulator* out) override {
int result = 0;
PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits,
valid_bits_offset, out, &result));
return result;
}
int DecodeArrowNonNull(int num_values, ArrowBinaryAccumulator* out) override {
int result = 0;
PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull(num_values, out, &result));
return result;
}
private:
Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset, ArrowBinaryAccumulator* out,
int* out_num_values) {
constexpr int32_t buffer_size = 1024;
int32_t indices_buffer[buffer_size];
ArrowBinaryHelper helper(out);
arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
int values_decoded = 0;
int num_appended = 0;
while (num_appended < num_values) {
bool is_valid = bit_reader.IsSet();
bit_reader.Next();
if (is_valid) {
int32_t batch_size =
std::min<int32_t>(buffer_size, num_values - num_appended - null_count);
int num_indices = idx_decoder_.GetBatch(indices_buffer, batch_size);
int i = 0;
while (true) {
// Consume all indices
if (is_valid) {
const auto& val = dict_values[indices_buffer[i]];
if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
RETURN_NOT_OK(helper.PushChunk());
}
RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
++i;
++values_decoded;
} else {
RETURN_NOT_OK(helper.AppendNull());
--null_count;
}
++num_appended;
if (i == num_indices) {
// Do not advance the bit_reader if we have fulfilled the decode
// request
break;
}
is_valid = bit_reader.IsSet();
bit_reader.Next();
}
} else {
RETURN_NOT_OK(helper.AppendNull());
--null_count;
++num_appended;
}
}
*out_num_values = values_decoded;
return Status::OK();
}
Status DecodeArrowDenseNonNull(int num_values, ArrowBinaryAccumulator* out,
int* out_num_values) {
constexpr int32_t buffer_size = 2048;
int32_t indices_buffer[buffer_size];
int values_decoded = 0;
ArrowBinaryHelper helper(out);
auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
while (values_decoded < num_values) {
int32_t batch_size = std::min<int32_t>(buffer_size, num_values - values_decoded);
int num_indices = idx_decoder_.GetBatch(indices_buffer, batch_size);
if (num_indices == 0) ParquetException::EofException();
for (int i = 0; i < num_indices; ++i) {
const auto& val = dict_values[indices_buffer[i]];
if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
RETURN_NOT_OK(helper.PushChunk());
}
RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
}
values_decoded += num_indices;
}
*out_num_values = values_decoded;
return Status::OK();
}
template <typename BuilderType>
Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset, BuilderType* builder,
int* out_num_values) {
constexpr int32_t buffer_size = 1024;
int32_t indices_buffer[buffer_size];
RETURN_NOT_OK(builder->Reserve(num_values));
arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
int values_decoded = 0;
int num_appended = 0;
while (num_appended < num_values) {
bool is_valid = bit_reader.IsSet();
bit_reader.Next();
if (is_valid) {
int32_t batch_size =
std::min<int32_t>(buffer_size, num_values - num_appended - null_count);
int num_indices = idx_decoder_.GetBatch(indices_buffer, batch_size);
int i = 0;
while (true) {
// Consume all indices
if (is_valid) {
const auto& val = dict_values[indices_buffer[i]];
RETURN_NOT_OK(builder->Append(val.ptr, val.len));
++i;
++values_decoded;
} else {
RETURN_NOT_OK(builder->AppendNull());
--null_count;
}
++num_appended;
if (i == num_indices) {
// Do not advance the bit_reader if we have fulfilled the decode
// request
break;
}
is_valid = bit_reader.IsSet();
bit_reader.Next();
}
} else {
RETURN_NOT_OK(builder->AppendNull());
--null_count;
++num_appended;
}
}
*out_num_values = values_decoded;
return Status::OK();
}
template <typename BuilderType>
Status DecodeArrowNonNull(int num_values, BuilderType* builder, int* out_num_values) {
constexpr int32_t buffer_size = 2048;
int32_t indices_buffer[buffer_size];
int values_decoded = 0;
RETURN_NOT_OK(builder->Reserve(num_values));
auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
while (values_decoded < num_values) {
int32_t batch_size = std::min<int32_t>(buffer_size, num_values - values_decoded);
int num_indices = idx_decoder_.GetBatch(indices_buffer, batch_size);
if (num_indices == 0) ParquetException::EofException();
for (int i = 0; i < num_indices; ++i) {
const auto& val = dict_values[indices_buffer[i]];
RETURN_NOT_OK(builder->Append(val.ptr, val.len));
}
values_decoded += num_indices;
}
*out_num_values = values_decoded;
return Status::OK();
}
};
class DictFLBADecoder : public DictDecoderImpl<FLBAType>, virtual public FLBADecoder {
public:
using BASE = DictDecoderImpl<FLBAType>;
using BASE::DictDecoderImpl;
};
// ----------------------------------------------------------------------
// DeltaBitPackDecoder
template <typename DType>
class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DType> {
public:
typedef typename DType::c_type T;
explicit DeltaBitPackDecoder(const ColumnDescriptor* descr,
MemoryPool* pool = arrow::default_memory_pool())
: DecoderImpl(descr, Encoding::DELTA_BINARY_PACKED), pool_(pool) {
if (DType::type_num != Type::INT32 && DType::type_num != Type::INT64) {
throw ParquetException("Delta bit pack encoding should only be for integer data.");
}
}
virtual void SetData(int num_values, const uint8_t* data, int len) {
this->num_values_ = num_values;
decoder_ = arrow::BitUtil::BitReader(data, len);
values_current_block_ = 0;
values_current_mini_block_ = 0;
}
virtual int Decode(T* buffer, int max_values) {
return GetInternal(buffer, max_values);
}
private:
void InitBlock() {
int32_t block_size;
if (!decoder_.GetVlqInt(&block_size)) ParquetException::EofException();
if (!decoder_.GetVlqInt(&num_mini_blocks_)) ParquetException::EofException();
if (!decoder_.GetVlqInt(&values_current_block_)) {
ParquetException::EofException();
}
if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException();
delta_bit_widths_ = AllocateBuffer(pool_, num_mini_blocks_);
uint8_t* bit_width_data = delta_bit_widths_->mutable_data();
if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException();
for (int i = 0; i < num_mini_blocks_; ++i) {
if (!decoder_.GetAligned<uint8_t>(1, bit_width_data + i)) {
ParquetException::EofException();
}
}
values_per_mini_block_ = block_size / num_mini_blocks_;
mini_block_idx_ = 0;
delta_bit_width_ = bit_width_data[0];
values_current_mini_block_ = values_per_mini_block_;
}
template <typename T>
int GetInternal(T* buffer, int max_values) {
max_values = std::min(max_values, this->num_values_);
const uint8_t* bit_width_data = delta_bit_widths_->data();
for (int i = 0; i < max_values; ++i) {
if (ARROW_PREDICT_FALSE(values_current_mini_block_ == 0)) {
++mini_block_idx_;
if (mini_block_idx_ < static_cast<size_t>(delta_bit_widths_->size())) {
delta_bit_width_ = bit_width_data[mini_block_idx_];
values_current_mini_block_ = values_per_mini_block_;
} else {
InitBlock();
buffer[i] = last_value_;
continue;
}
}
// TODO: the key to this algorithm is to decode the entire miniblock at once.
int64_t delta;
if (!decoder_.GetValue(delta_bit_width_, &delta)) ParquetException::EofException();
delta += min_delta_;
last_value_ += static_cast<int32_t>(delta);
buffer[i] = last_value_;
--values_current_mini_block_;
}
this->num_values_ -= max_values;
return max_values;
}
MemoryPool* pool_;
arrow::BitUtil::BitReader decoder_;
int32_t values_current_block_;
int32_t num_mini_blocks_;
uint64_t values_per_mini_block_;
uint64_t values_current_mini_block_;
int32_t min_delta_;
size_t mini_block_idx_;
std::shared_ptr<ResizableBuffer> delta_bit_widths_;
int delta_bit_width_;
int32_t last_value_;
};
// ----------------------------------------------------------------------
// DELTA_LENGTH_BYTE_ARRAY
class DeltaLengthByteArrayDecoder : public DecoderImpl,
virtual public TypedDecoder<ByteArrayType> {
public:
explicit DeltaLengthByteArrayDecoder(const ColumnDescriptor* descr,
MemoryPool* pool = arrow::default_memory_pool())
: DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY),
len_decoder_(nullptr, pool) {}
virtual void SetData(int num_values, const uint8_t* data, int len) {
num_values_ = num_values;
if (len == 0) return;
int total_lengths_len = arrow::util::SafeLoadAs<int32_t>(data);
data += 4;
this->len_decoder_.SetData(num_values, data, total_lengths_len);
data_ = data + total_lengths_len;
this->len_ = len - 4 - total_lengths_len;
}
virtual int Decode(ByteArray* buffer, int max_values) {
max_values = std::min(max_values, num_values_);
std::vector<int> lengths(max_values);
len_decoder_.Decode(lengths.data(), max_values);
for (int i = 0; i < max_values; ++i) {
buffer[i].len = lengths[i];
buffer[i].ptr = data_;
this->data_ += lengths[i];
this->len_ -= lengths[i];
}
this->num_values_ -= max_values;
return max_values;
}
private:
DeltaBitPackDecoder<Int32Type> len_decoder_;
};
// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY
class DeltaByteArrayDecoder : public DecoderImpl,
virtual public TypedDecoder<ByteArrayType> {
public:
explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
MemoryPool* pool = arrow::default_memory_pool())
: DecoderImpl(descr, Encoding::DELTA_BYTE_ARRAY),
prefix_len_decoder_(nullptr, pool),
suffix_decoder_(nullptr, pool),
last_value_(0, nullptr) {}
virtual void SetData(int num_values, const uint8_t* data, int len) {
num_values_ = num_values;
if (len == 0) return;
int prefix_len_length = arrow::util::SafeLoadAs<int32_t>(data);
data += 4;
len -= 4;
prefix_len_decoder_.SetData(num_values, data, prefix_len_length);
data += prefix_len_length;
len -= prefix_len_length;
suffix_decoder_.SetData(num_values, data, len);
}
// TODO: this doesn't work and requires memory management. We need to allocate
// new strings to store the results.
virtual int Decode(ByteArray* buffer, int max_values) {
max_values = std::min(max_values, this->num_values_);
for (int i = 0; i < max_values; ++i) {
int prefix_len = 0;
prefix_len_decoder_.Decode(&prefix_len, 1);
ByteArray suffix = {0, nullptr};
suffix_decoder_.Decode(&suffix, 1);
buffer[i].len = prefix_len + suffix.len;
uint8_t* result = reinterpret_cast<uint8_t*>(malloc(buffer[i].len));
memcpy(result, last_value_.ptr, prefix_len);
memcpy(result + prefix_len, suffix.ptr, suffix.len);
buffer[i].ptr = result;
last_value_ = buffer[i];
}
this->num_values_ -= max_values;
return max_values;
}
private:
DeltaBitPackDecoder<Int32Type> prefix_len_decoder_;
DeltaLengthByteArrayDecoder suffix_decoder_;
ByteArray last_value_;
};
// ----------------------------------------------------------------------
std::unique_ptr<Decoder> MakeDecoder(Type::type type_num, Encoding::type encoding,
const ColumnDescriptor* descr) {
if (encoding == Encoding::PLAIN) {
switch (type_num) {
case Type::BOOLEAN:
return std::unique_ptr<Decoder>(new PlainBooleanDecoder(descr));
case Type::INT32:
return std::unique_ptr<Decoder>(new PlainDecoder<Int32Type>(descr));
case Type::INT64:
return std::unique_ptr<Decoder>(new PlainDecoder<Int64Type>(descr));
case Type::INT96:
return std::unique_ptr<Decoder>(new PlainDecoder<Int96Type>(descr));
case Type::FLOAT:
return std::unique_ptr<Decoder>(new PlainDecoder<FloatType>(descr));
case Type::DOUBLE:
return std::unique_ptr<Decoder>(new PlainDecoder<DoubleType>(descr));
case Type::BYTE_ARRAY:
return std::unique_ptr<Decoder>(new PlainByteArrayDecoder(descr));
case Type::FIXED_LEN_BYTE_ARRAY:
return std::unique_ptr<Decoder>(new PlainFLBADecoder(descr));
default:
break;
}
} else {
ParquetException::NYI("Selected encoding is not supported");
}
DCHECK(false) << "Should not be able to reach this code";
return nullptr;
}
namespace detail {
std::unique_ptr<Decoder> MakeDictDecoder(Type::type type_num,
const ColumnDescriptor* descr,
MemoryPool* pool) {
switch (type_num) {
case Type::BOOLEAN:
ParquetException::NYI("Dictionary encoding not implemented for boolean type");
case Type::INT32:
return std::unique_ptr<Decoder>(new DictDecoderImpl<Int32Type>(descr, pool));
case Type::INT64:
return std::unique_ptr<Decoder>(new DictDecoderImpl<Int64Type>(descr, pool));
case Type::INT96:
return std::unique_ptr<Decoder>(new DictDecoderImpl<Int96Type>(descr, pool));
case Type::FLOAT:
return std::unique_ptr<Decoder>(new DictDecoderImpl<FloatType>(descr, pool));
case Type::DOUBLE:
return std::unique_ptr<Decoder>(new DictDecoderImpl<DoubleType>(descr, pool));
case Type::BYTE_ARRAY:
return std::unique_ptr<Decoder>(new DictByteArrayDecoderImpl(descr, pool));
case Type::FIXED_LEN_BYTE_ARRAY:
return std::unique_ptr<Decoder>(new DictFLBADecoder(descr, pool));
default:
break;
}
DCHECK(false) << "Should not be able to reach this code";
return nullptr;
}
} // namespace detail
} // namespace parquet