blob: 0e8c0ba32b6339902dc612bf50c78f1aa4a5b8f9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "parquet/encoding.h"
#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <limits>
#include <memory>
#include <string>
#include <string_view>
#include <type_traits>
#include <utility>
#include <vector>
#include "arrow/array.h"
#include "arrow/stl_allocator.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_stream_utils_internal.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/byte_stream_split_internal.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/hashing.h"
#include "arrow/util/int_util_overflow.h"
#include "arrow/util/logging_internal.h"
#include "arrow/util/rle_encoding_internal.h"
#include "arrow/util/spaced_internal.h"
#include "arrow/util/ubsan.h"
#include "arrow/visit_data_inline.h"
#include "parquet/exception.h"
#include "parquet/platform.h"
#include "parquet/schema.h"
#include "parquet/types.h"
#ifdef _MSC_VER
// disable warning about inheritance via dominance in the diamond pattern
# pragma warning(disable : 4250)
#endif
namespace bit_util = arrow::bit_util;
using arrow::Status;
using arrow::internal::AddWithOverflow;
using arrow::internal::checked_cast;
using arrow::internal::SafeSignedSubtract;
using arrow::util::SafeLoad;
using arrow::util::SafeLoadAs;
template <typename T>
using ArrowPoolVector = std::vector<T, ::arrow::stl::allocator<T>>;
namespace parquet {
namespace {
// The Parquet spec isn't very clear whether ByteArray lengths are signed or
// unsigned, but the Java implementation uses signed ints.
constexpr size_t kMaxByteArraySize = std::numeric_limits<int32_t>::max();
// Get the data size of a Array binary-like array
template <typename ArrayType>
int64_t GetBinaryDataSize(const ArrayType& array) {
return array.value_offset(array.length()) - array.value_offset(0);
}
template <>
int64_t GetBinaryDataSize(const ::arrow::BinaryViewArray& array) {
int64_t total_size = 0;
::arrow::VisitArraySpanInline<::arrow::BinaryViewType>(
*array.data(),
[&](std::string_view v) { total_size += static_cast<int64_t>(v.size()); }, [] {});
return total_size;
}
class EncoderImpl : virtual public Encoder {
public:
EncoderImpl(const ColumnDescriptor* descr, Encoding::type encoding, MemoryPool* pool)
: descr_(descr),
encoding_(encoding),
pool_(pool),
type_length_(descr ? descr->type_length() : -1) {}
Encoding::type encoding() const override { return encoding_; }
MemoryPool* memory_pool() const override { return pool_; }
int64_t ReportUnencodedDataBytes() override {
if (descr_->physical_type() != Type::BYTE_ARRAY) {
throw ParquetException("ReportUnencodedDataBytes is only supported for BYTE_ARRAY");
}
int64_t bytes = unencoded_byte_array_data_bytes_;
unencoded_byte_array_data_bytes_ = 0;
return bytes;
}
protected:
// For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
const ColumnDescriptor* descr_;
const Encoding::type encoding_;
MemoryPool* pool_;
/// Type length from descr
const int type_length_;
/// Number of unencoded bytes written to the encoder. Used for ByteArray type only.
int64_t unencoded_byte_array_data_bytes_ = 0;
};
// ----------------------------------------------------------------------
// PLAIN encoder
template <typename DType>
class PlainEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
public:
using T = typename DType::c_type;
explicit PlainEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
: EncoderImpl(descr, Encoding::PLAIN, pool), sink_(pool) {}
int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
std::shared_ptr<Buffer> FlushValues() override {
std::shared_ptr<Buffer> buffer;
PARQUET_THROW_NOT_OK(sink_.Finish(&buffer));
return buffer;
}
using TypedEncoder<DType>::Put;
void Put(const T* buffer, int num_values) override;
void Put(const ::arrow::Array& values) override;
void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) override {
if (valid_bits != NULLPTR) {
PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
this->memory_pool()));
T* data = buffer->template mutable_data_as<T>();
int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
src, num_values, valid_bits, valid_bits_offset, data);
Put(data, num_valid_values);
} else {
Put(src, num_values);
}
}
void UnsafePutByteArray(const void* data, uint32_t length) {
DCHECK(length == 0 || data != nullptr) << "Value ptr cannot be NULL";
sink_.UnsafeAppend(&length, sizeof(uint32_t));
sink_.UnsafeAppend(data, static_cast<int64_t>(length));
unencoded_byte_array_data_bytes_ += length;
}
void Put(const ByteArray& val) {
// Write the result to the output stream
const int64_t increment = static_cast<int64_t>(val.len + sizeof(uint32_t));
if (ARROW_PREDICT_FALSE(sink_.length() + increment > sink_.capacity())) {
PARQUET_THROW_NOT_OK(sink_.Reserve(increment));
}
UnsafePutByteArray(val.ptr, val.len);
}
protected:
template <typename ArrayType>
void PutBinaryArray(const ArrayType& array) {
const int64_t total_bytes = GetBinaryDataSize(array);
PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes + array.length() * sizeof(uint32_t)));
PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename ArrayType::TypeClass>(
*array.data(),
[&](::std::string_view view) {
if (ARROW_PREDICT_FALSE(view.size() > kMaxByteArraySize)) {
return Status::Invalid(
"Parquet cannot store strings with size 2GB or more, got: ", view.size());
}
UnsafePutByteArray(view.data(), static_cast<uint32_t>(view.size()));
return Status::OK();
},
[]() { return Status::OK(); }));
}
::arrow::BufferBuilder sink_;
};
template <typename DType>
void PlainEncoder<DType>::Put(const T* buffer, int num_values) {
if (num_values > 0) {
PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T)));
}
}
template <>
inline void PlainEncoder<ByteArrayType>::Put(const ByteArray* src, int num_values) {
for (int i = 0; i < num_values; ++i) {
Put(src[i]);
}
}
template <typename ArrayType>
void DirectPutImpl(const ::arrow::Array& values, ::arrow::BufferBuilder* sink) {
if (values.type_id() != ArrayType::TypeClass::type_id) {
std::string type_name = ArrayType::TypeClass::type_name();
throw ParquetException("direct put to " + type_name + " from " +
values.type()->ToString() + " not supported");
}
using value_type = typename ArrayType::value_type;
constexpr auto value_size = sizeof(value_type);
auto raw_values = checked_cast<const ArrayType&>(values).raw_values();
if (values.null_count() == 0) {
// no nulls, just dump the data
PARQUET_THROW_NOT_OK(sink->Append(raw_values, values.length() * value_size));
} else {
PARQUET_THROW_NOT_OK(
sink->Reserve((values.length() - values.null_count()) * value_size));
for (int64_t i = 0; i < values.length(); i++) {
if (values.IsValid(i)) {
sink->UnsafeAppend(&raw_values[i], value_size);
}
}
}
}
template <>
void PlainEncoder<Int32Type>::Put(const ::arrow::Array& values) {
DirectPutImpl<::arrow::Int32Array>(values, &sink_);
}
template <>
void PlainEncoder<Int64Type>::Put(const ::arrow::Array& values) {
DirectPutImpl<::arrow::Int64Array>(values, &sink_);
}
template <>
void PlainEncoder<Int96Type>::Put(const ::arrow::Array& values) {
ParquetException::NYI("direct put to Int96");
}
template <>
void PlainEncoder<FloatType>::Put(const ::arrow::Array& values) {
DirectPutImpl<::arrow::FloatArray>(values, &sink_);
}
template <>
void PlainEncoder<DoubleType>::Put(const ::arrow::Array& values) {
DirectPutImpl<::arrow::DoubleArray>(values, &sink_);
}
template <typename DType>
void PlainEncoder<DType>::Put(const ::arrow::Array& values) {
ParquetException::NYI("direct put of " + values.type()->ToString());
}
void AssertVarLengthBinary(const ::arrow::Array& values) {
if (!::arrow::is_base_binary_like(values.type_id()) &&
!::arrow::is_binary_view_like(values.type_id())) {
throw ParquetException("Only binary-like data supported");
}
}
template <>
inline void PlainEncoder<ByteArrayType>::Put(const ::arrow::Array& values) {
AssertVarLengthBinary(values);
if (::arrow::is_binary_like(values.type_id())) {
PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
} else if (::arrow::is_large_binary_like(values.type_id())) {
PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
} else {
DCHECK(::arrow::is_binary_view_like(values.type_id()));
PutBinaryArray(checked_cast<const ::arrow::BinaryViewArray&>(values));
}
}
void AssertFixedSizeBinary(const ::arrow::Array& values, int type_length) {
if (!::arrow::is_fixed_size_binary(values.type_id())) {
throw ParquetException("Only FixedSizeBinaryArray and subclasses supported");
}
if (checked_cast<const ::arrow::FixedSizeBinaryType&>(*values.type()).byte_width() !=
type_length) {
throw ParquetException("Size mismatch: " + values.type()->ToString() +
" should have been " + std::to_string(type_length) + " wide");
}
}
template <>
inline void PlainEncoder<FLBAType>::Put(const ::arrow::Array& values) {
AssertFixedSizeBinary(values, descr_->type_length());
const auto& data = checked_cast<const ::arrow::FixedSizeBinaryArray&>(values);
if (data.null_count() == 0) {
// no nulls, just dump the data
PARQUET_THROW_NOT_OK(
sink_.Append(data.raw_values(), data.length() * data.byte_width()));
} else {
const int64_t total_bytes =
data.length() * data.byte_width() - data.null_count() * data.byte_width();
PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes));
for (int64_t i = 0; i < data.length(); i++) {
if (data.IsValid(i)) {
sink_.UnsafeAppend(data.Value(i), data.byte_width());
}
}
}
}
template <>
inline void PlainEncoder<FLBAType>::Put(const FixedLenByteArray* src, int num_values) {
if (descr_->type_length() == 0) {
return;
}
for (int i = 0; i < num_values; ++i) {
// Write the result to the output stream
DCHECK(src[i].ptr != nullptr) << "Value ptr cannot be NULL";
PARQUET_THROW_NOT_OK(sink_.Append(src[i].ptr, descr_->type_length()));
}
}
template <>
class PlainEncoder<BooleanType> : public EncoderImpl, virtual public BooleanEncoder {
public:
explicit PlainEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
: EncoderImpl(descr, Encoding::PLAIN, pool), sink_(pool) {}
int64_t EstimatedDataEncodedSize() override;
std::shared_ptr<Buffer> FlushValues() override;
void Put(const bool* src, int num_values) override;
void Put(const std::vector<bool>& src, int num_values) override;
void PutSpaced(const bool* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) override {
if (valid_bits != NULLPTR) {
PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
this->memory_pool()));
T* data = buffer->mutable_data_as<T>();
int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
src, num_values, valid_bits, valid_bits_offset, data);
Put(data, num_valid_values);
} else {
Put(src, num_values);
}
}
void Put(const ::arrow::Array& values) override {
if (values.type_id() != ::arrow::Type::BOOL) {
throw ParquetException("direct put to boolean from " + values.type()->ToString() +
" not supported");
}
const auto& data = checked_cast<const ::arrow::BooleanArray&>(values);
if (data.null_count() == 0) {
// no nulls, just dump the data
PARQUET_THROW_NOT_OK(sink_.Reserve(data.length()));
sink_.UnsafeAppend(data.data()->GetValues<uint8_t>(1, 0), data.offset(),
data.length());
} else {
PARQUET_THROW_NOT_OK(sink_.Reserve(data.length() - data.null_count()));
for (int64_t i = 0; i < data.length(); i++) {
if (data.IsValid(i)) {
sink_.UnsafeAppend(data.Value(i));
}
}
}
}
private:
::arrow::TypedBufferBuilder<bool> sink_;
template <typename SequenceType>
void PutImpl(const SequenceType& src, int num_values);
};
template <typename SequenceType>
void PlainEncoder<BooleanType>::PutImpl(const SequenceType& src, int num_values) {
PARQUET_THROW_NOT_OK(sink_.Reserve(num_values));
for (int i = 0; i < num_values; ++i) {
sink_.UnsafeAppend(src[i]);
}
}
int64_t PlainEncoder<BooleanType>::EstimatedDataEncodedSize() {
return ::arrow::bit_util::BytesForBits(sink_.length());
}
std::shared_ptr<Buffer> PlainEncoder<BooleanType>::FlushValues() {
std::shared_ptr<Buffer> buffer;
PARQUET_THROW_NOT_OK(sink_.Finish(&buffer));
return buffer;
}
void PlainEncoder<BooleanType>::Put(const bool* src, int num_values) {
PutImpl(src, num_values);
}
void PlainEncoder<BooleanType>::Put(const std::vector<bool>& src, int num_values) {
PutImpl(src, num_values);
}
// ----------------------------------------------------------------------
// DictEncoder<T> implementations
template <typename DType>
struct DictEncoderTraits {
using c_type = typename DType::c_type;
using MemoTableType = ::arrow::internal::ScalarMemoTable<c_type>;
};
template <>
struct DictEncoderTraits<ByteArrayType> {
using MemoTableType = ::arrow::internal::BinaryMemoTable<::arrow::BinaryBuilder>;
};
template <>
struct DictEncoderTraits<FLBAType> {
using MemoTableType = ::arrow::internal::BinaryMemoTable<::arrow::BinaryBuilder>;
};
// Initially 1024 elements
static constexpr int32_t kInitialHashTableSize = 1 << 10;
int64_t RlePreserveBufferSize(int64_t num_values, int bit_width) {
// Note: because of the way RleEncoder::CheckBufferFull()
// is called, we have to reserve an extra "RleEncoder::MinBufferSize"
// bytes. These extra bytes won't be used but not reserving them
// would cause the encoder to fail.
return ::arrow::util::RleBitPackedEncoder::MaxBufferSize(bit_width, num_values) +
::arrow::util::RleBitPackedEncoder::MinBufferSize(bit_width);
}
/// See the dictionary encoding section of
/// https://github.com/apache/parquet-format/blob/master/Encodings.md. The encoding
/// supports streaming encoding. Values are encoded as they are added while the dictionary
/// is being constructed. At any time, the buffered values can be written out with the
/// current dictionary size. More values can then be added to the encoder, including new
/// dictionary entries.
template <typename DType>
class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder<DType> {
using MemoTableType = typename DictEncoderTraits<DType>::MemoTableType;
public:
typedef typename DType::c_type T;
/// In data page, the bit width used to encode the entry
/// ids stored as 1 byte (max bit width = 32).
constexpr static int32_t kDataPageBitWidthBytes = 1;
explicit DictEncoderImpl(const ColumnDescriptor* desc, MemoryPool* pool)
: EncoderImpl(desc, Encoding::RLE_DICTIONARY, pool),
buffered_indices_(::arrow::stl::allocator<int32_t>(pool)),
dict_encoded_size_(0),
memo_table_(pool, kInitialHashTableSize) {}
~DictEncoderImpl() override = default;
int dict_encoded_size() const override { return dict_encoded_size_; }
int WriteIndices(uint8_t* buffer, int buffer_len) override {
// Write bit width in first byte
*buffer = static_cast<uint8_t>(bit_width());
++buffer;
--buffer_len;
::arrow::util::RleBitPackedEncoder encoder(buffer, buffer_len, bit_width());
for (int32_t index : buffered_indices_) {
if (ARROW_PREDICT_FALSE(!encoder.Put(index))) return -1;
}
encoder.Flush();
ClearIndices();
return kDataPageBitWidthBytes + encoder.len();
}
/// Returns a conservative estimate of the number of bytes needed to encode the buffered
/// indices. Used to size the buffer passed to WriteIndices().
int64_t EstimatedDataEncodedSize() override {
return kDataPageBitWidthBytes +
RlePreserveBufferSize(static_cast<int64_t>(buffered_indices_.size()),
bit_width());
}
/// The minimum bit width required to encode the currently buffered indices.
int bit_width() const override {
if (ARROW_PREDICT_FALSE(num_entries() == 0)) return 0;
if (ARROW_PREDICT_FALSE(num_entries() == 1)) return 1;
return bit_util::Log2(num_entries());
}
/// Encode value. Note that this does not actually write any data, just
/// buffers the value's index to be written later.
inline void Put(const T& value);
// Not implemented for other data types
inline void PutByteArray(const void* ptr, int32_t length);
void Put(const T* src, int num_values) override {
for (int32_t i = 0; i < num_values; i++) {
Put(SafeLoad(src + i));
}
}
void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) override {
::arrow::internal::VisitSetBitRunsVoid(valid_bits, valid_bits_offset, num_values,
[&](int64_t position, int64_t length) {
for (int64_t i = 0; i < length; i++) {
Put(SafeLoad(src + i + position));
}
});
}
using TypedEncoder<DType>::Put;
void Put(const ::arrow::Array& values) override;
void PutDictionary(const ::arrow::Array& values) override;
template <typename ArrowType, typename T = typename ArrowType::c_type>
void PutIndicesTyped(const ::arrow::Array& data) {
auto values = data.data()->GetValues<T>(1);
size_t buffer_position = buffered_indices_.size();
buffered_indices_.resize(buffer_position +
static_cast<size_t>(data.length() - data.null_count()));
::arrow::internal::VisitSetBitRunsVoid(
data.null_bitmap_data(), data.offset(), data.length(),
[&](int64_t position, int64_t length) {
for (int64_t i = 0; i < length; ++i) {
buffered_indices_[buffer_position++] =
static_cast<int32_t>(values[i + position]);
}
});
// Track unencoded bytes based on dictionary value type
if constexpr (std::is_same_v<DType, ByteArrayType>) {
// For ByteArray, need to look up actual lengths from dictionary
for (size_t idx =
buffer_position - static_cast<size_t>(data.length() - data.null_count());
idx < buffer_position; ++idx) {
memo_table_.VisitValue(buffered_indices_[idx], [&](std::string_view value) {
unencoded_byte_array_data_bytes_ += value.length();
});
}
}
}
void PutIndices(const ::arrow::Array& data) override {
switch (data.type()->id()) {
case ::arrow::Type::UINT8:
case ::arrow::Type::INT8:
return PutIndicesTyped<::arrow::UInt8Type>(data);
case ::arrow::Type::UINT16:
case ::arrow::Type::INT16:
return PutIndicesTyped<::arrow::UInt16Type>(data);
case ::arrow::Type::UINT32:
case ::arrow::Type::INT32:
return PutIndicesTyped<::arrow::UInt32Type>(data);
case ::arrow::Type::UINT64:
case ::arrow::Type::INT64:
return PutIndicesTyped<::arrow::UInt64Type>(data);
default:
throw ParquetException("Passed non-integer array to PutIndices");
}
}
std::shared_ptr<Buffer> FlushValues() override {
const int64_t buffer_size = EstimatedDataEncodedSize();
if (buffer_size > std::numeric_limits<int>::max()) {
std::stringstream ss;
ss << "Buffer size for DictEncoder (" << buffer_size
<< ") exceeds maximum int value";
throw ParquetException(ss.str());
}
std::shared_ptr<ResizableBuffer> buffer = AllocateBuffer(this->pool_, buffer_size);
int result_size = WriteIndices(buffer->mutable_data(), static_cast<int>(buffer_size));
PARQUET_THROW_NOT_OK(buffer->Resize(result_size, false));
return buffer;
}
/// Writes out the encoded dictionary to buffer. buffer must be preallocated to
/// dict_encoded_size() bytes.
void WriteDict(uint8_t* buffer) const override;
/// The number of entries in the dictionary.
int num_entries() const override { return memo_table_.size(); }
private:
/// Clears all the indices (but leaves the dictionary).
void ClearIndices() { buffered_indices_.clear(); }
/// Indices that have not yet be written out by WriteIndices().
ArrowPoolVector<int32_t> buffered_indices_;
template <typename ArrayType>
void PutBinaryArray(const ArrayType& array) {
PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename ArrayType::TypeClass>(
*array.data(),
[&](::std::string_view view) {
if (ARROW_PREDICT_FALSE(view.size() > kMaxByteArraySize)) {
return Status::Invalid(
"Parquet cannot store strings with size 2GB or more, got: ", view.size());
}
PutByteArray(view.data(), static_cast<uint32_t>(view.size()));
return Status::OK();
},
[]() { return Status::OK(); }));
}
template <typename ArrayType>
void PutBinaryDictionaryArray(const ArrayType& array) {
DCHECK_EQ(array.null_count(), 0);
for (int64_t i = 0; i < array.length(); i++) {
auto v = array.GetView(i);
if (ARROW_PREDICT_FALSE(v.size() > kMaxByteArraySize)) {
throw ParquetException(
"Parquet cannot store strings with size 2GB or more, got: ", v.size());
}
dict_encoded_size_ += static_cast<int>(v.size() + sizeof(uint32_t));
int32_t unused_memo_index;
PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert(
v.data(), static_cast<int32_t>(v.size()), &unused_memo_index));
}
}
/// The number of bytes needed to encode the dictionary.
int dict_encoded_size_;
MemoTableType memo_table_;
};
template <typename DType>
void DictEncoderImpl<DType>::WriteDict(uint8_t* buffer) const {
// For primitive types, only a memcpy
DCHECK_EQ(static_cast<size_t>(dict_encoded_size_), sizeof(T) * memo_table_.size());
memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast<T*>(buffer));
}
// ByteArray and FLBA already have the dictionary encoded in their data heaps
template <>
void DictEncoderImpl<ByteArrayType>::WriteDict(uint8_t* buffer) const {
memo_table_.VisitValues(0, [&buffer](::std::string_view v) {
uint32_t len = static_cast<uint32_t>(v.length());
memcpy(buffer, &len, sizeof(len));
buffer += sizeof(len);
memcpy(buffer, v.data(), len);
buffer += len;
});
}
template <>
void DictEncoderImpl<FLBAType>::WriteDict(uint8_t* buffer) const {
memo_table_.VisitValues(0, [&](::std::string_view v) {
DCHECK_EQ(v.length(), static_cast<size_t>(type_length_));
memcpy(buffer, v.data(), type_length_);
buffer += type_length_;
});
}
template <typename DType>
inline void DictEncoderImpl<DType>::Put(const T& v) {
// Put() implementation for primitive types
auto on_found = [](int32_t memo_index) {};
auto on_not_found = [this](int32_t memo_index) {
dict_encoded_size_ += static_cast<int>(sizeof(T));
};
int32_t memo_index;
PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert(v, on_found, on_not_found, &memo_index));
buffered_indices_.push_back(memo_index);
}
template <typename DType>
inline void DictEncoderImpl<DType>::PutByteArray(const void* ptr, int32_t length) {
DCHECK(false);
}
template <>
inline void DictEncoderImpl<ByteArrayType>::PutByteArray(const void* ptr,
int32_t length) {
static const uint8_t empty[] = {0};
auto on_found = [](int32_t memo_index) {};
auto on_not_found = [&](int32_t memo_index) {
dict_encoded_size_ += static_cast<int>(length + sizeof(uint32_t));
};
DCHECK(ptr != nullptr || length == 0);
ptr = (ptr != nullptr) ? ptr : empty;
int32_t memo_index;
PARQUET_THROW_NOT_OK(
memo_table_.GetOrInsert(ptr, length, on_found, on_not_found, &memo_index));
buffered_indices_.push_back(memo_index);
unencoded_byte_array_data_bytes_ += length;
}
template <>
inline void DictEncoderImpl<ByteArrayType>::Put(const ByteArray& val) {
return PutByteArray(val.ptr, static_cast<int32_t>(val.len));
}
template <>
inline void DictEncoderImpl<FLBAType>::Put(const FixedLenByteArray& v) {
static const uint8_t empty[] = {0};
auto on_found = [](int32_t memo_index) {};
auto on_not_found = [this](int32_t memo_index) { dict_encoded_size_ += type_length_; };
DCHECK(v.ptr != nullptr || type_length_ == 0);
const void* ptr = (v.ptr != nullptr) ? v.ptr : empty;
int32_t memo_index;
PARQUET_THROW_NOT_OK(
memo_table_.GetOrInsert(ptr, type_length_, on_found, on_not_found, &memo_index));
buffered_indices_.push_back(memo_index);
}
template <>
void DictEncoderImpl<Int96Type>::Put(const ::arrow::Array& values) {
ParquetException::NYI("Direct put to Int96");
}
template <>
void DictEncoderImpl<Int96Type>::PutDictionary(const ::arrow::Array& values) {
ParquetException::NYI("Direct put to Int96");
}
template <typename DType>
void DictEncoderImpl<DType>::Put(const ::arrow::Array& values) {
using ArrayType = typename ::arrow::CTypeTraits<typename DType::c_type>::ArrayType;
const auto& data = checked_cast<const ArrayType&>(values);
if (data.null_count() == 0) {
// no nulls, just dump the data
for (int64_t i = 0; i < data.length(); i++) {
Put(data.Value(i));
}
} else {
for (int64_t i = 0; i < data.length(); i++) {
if (data.IsValid(i)) {
Put(data.Value(i));
}
}
}
}
template <>
void DictEncoderImpl<FLBAType>::Put(const ::arrow::Array& values) {
AssertFixedSizeBinary(values, type_length_);
const auto& data = checked_cast<const ::arrow::FixedSizeBinaryArray&>(values);
if (data.null_count() == 0) {
// no nulls, just dump the data
for (int64_t i = 0; i < data.length(); i++) {
Put(FixedLenByteArray(data.Value(i)));
}
} else {
std::vector<uint8_t> empty(type_length_, 0);
for (int64_t i = 0; i < data.length(); i++) {
if (data.IsValid(i)) {
Put(FixedLenByteArray(data.Value(i)));
}
}
}
}
template <>
void DictEncoderImpl<ByteArrayType>::Put(const ::arrow::Array& values) {
AssertVarLengthBinary(values);
if (::arrow::is_binary_like(values.type_id())) {
PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
} else if (::arrow::is_large_binary_like(values.type_id())) {
PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
} else {
DCHECK(::arrow::is_binary_view_like(values.type_id()));
PutBinaryArray(checked_cast<const ::arrow::BinaryViewArray&>(values));
}
}
template <typename DType>
void AssertCanPutDictionary(DictEncoderImpl<DType>* encoder, const ::arrow::Array& dict) {
if (dict.null_count() > 0) {
throw ParquetException("Inserted dictionary cannot contain nulls");
}
if (encoder->num_entries() > 0) {
throw ParquetException("Can only call PutDictionary on an empty DictEncoder");
}
}
template <typename DType>
void DictEncoderImpl<DType>::PutDictionary(const ::arrow::Array& values) {
AssertCanPutDictionary(this, values);
using ArrayType = typename ::arrow::CTypeTraits<typename DType::c_type>::ArrayType;
const auto& data = checked_cast<const ArrayType&>(values);
dict_encoded_size_ += static_cast<int>(sizeof(typename DType::c_type) * data.length());
for (int64_t i = 0; i < data.length(); i++) {
int32_t unused_memo_index;
PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert(data.Value(i), &unused_memo_index));
}
}
template <>
void DictEncoderImpl<FLBAType>::PutDictionary(const ::arrow::Array& values) {
AssertFixedSizeBinary(values, type_length_);
AssertCanPutDictionary(this, values);
const auto& data = checked_cast<const ::arrow::FixedSizeBinaryArray&>(values);
dict_encoded_size_ += static_cast<int>(type_length_ * data.length());
for (int64_t i = 0; i < data.length(); i++) {
int32_t unused_memo_index;
PARQUET_THROW_NOT_OK(
memo_table_.GetOrInsert(data.Value(i), type_length_, &unused_memo_index));
}
}
template <>
void DictEncoderImpl<ByteArrayType>::PutDictionary(const ::arrow::Array& values) {
AssertVarLengthBinary(values);
AssertCanPutDictionary(this, values);
if (::arrow::is_binary_like(values.type_id())) {
PutBinaryDictionaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
} else if (::arrow::is_large_binary_like(values.type_id())) {
PutBinaryDictionaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
} else {
DCHECK(::arrow::is_binary_view_like(values.type_id()));
PutBinaryDictionaryArray(checked_cast<const ::arrow::BinaryViewArray&>(values));
}
}
// ----------------------------------------------------------------------
// BYTE_STREAM_SPLIT encoder
// Common base class for all types
template <typename DType>
class ByteStreamSplitEncoderBase : public EncoderImpl,
virtual public TypedEncoder<DType> {
public:
using T = typename DType::c_type;
using TypedEncoder<DType>::Put;
ByteStreamSplitEncoderBase(const ColumnDescriptor* descr, int byte_width,
::arrow::MemoryPool* pool)
: EncoderImpl(descr, Encoding::BYTE_STREAM_SPLIT, pool),
sink_{pool},
byte_width_(byte_width),
num_values_in_buffer_{0} {}
int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
std::shared_ptr<Buffer> FlushValues() override {
if (byte_width_ == 1) {
// Special-cased fast path
PARQUET_ASSIGN_OR_THROW(auto buf, sink_.Finish());
return buf;
}
auto output_buffer = AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize());
uint8_t* output_buffer_raw = output_buffer->mutable_data();
const uint8_t* raw_values = sink_.data();
::arrow::util::internal::ByteStreamSplitEncode(
raw_values, /*width=*/byte_width_, num_values_in_buffer_, output_buffer_raw);
sink_.Reset();
num_values_in_buffer_ = 0;
return output_buffer;
}
void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) override {
if (valid_bits != NULLPTR) {
PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
this->memory_pool()));
T* data = buffer->template mutable_data_as<T>();
int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
src, num_values, valid_bits, valid_bits_offset, data);
Put(data, num_valid_values);
} else {
Put(src, num_values);
}
}
protected:
::arrow::BufferBuilder sink_;
// Required because type_length_ is only filled in for FLBA
const int byte_width_;
int64_t num_values_in_buffer_;
};
// BYTE_STREAM_SPLIT encoder implementation for FLOAT, DOUBLE, INT32, INT64
template <typename DType>
class ByteStreamSplitEncoder : public ByteStreamSplitEncoderBase<DType> {
public:
using T = typename DType::c_type;
using ArrowType = typename EncodingTraits<DType>::ArrowType;
ByteStreamSplitEncoder(const ColumnDescriptor* descr,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
: ByteStreamSplitEncoderBase<DType>(descr,
/*byte_width=*/static_cast<int>(sizeof(T)),
pool) {}
// Inherit Put(const std::vector<T>&...)
using TypedEncoder<DType>::Put;
void Put(const T* buffer, int num_values) override {
if (num_values > 0) {
PARQUET_THROW_NOT_OK(
this->sink_.Append(reinterpret_cast<const uint8_t*>(buffer),
num_values * static_cast<int64_t>(sizeof(T))));
this->num_values_in_buffer_ += num_values;
}
}
void Put(const ::arrow::Array& values) override {
if (values.type_id() != ArrowType::type_id) {
throw ParquetException(std::string() + "direct put from " +
values.type()->ToString() + " not supported");
}
const auto& data = *values.data();
this->PutSpaced(data.GetValues<typename ArrowType::c_type>(1),
static_cast<int>(data.length), data.GetValues<uint8_t>(0, 0),
data.offset);
}
};
// BYTE_STREAM_SPLIT encoder implementation for FLBA
template <>
class ByteStreamSplitEncoder<FLBAType> : public ByteStreamSplitEncoderBase<FLBAType> {
public:
using DType = FLBAType;
using T = FixedLenByteArray;
using ArrowType = ::arrow::FixedSizeBinaryArray;
ByteStreamSplitEncoder(const ColumnDescriptor* descr,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
: ByteStreamSplitEncoderBase<DType>(descr,
/*byte_width=*/descr->type_length(), pool) {}
// Inherit Put(const std::vector<T>&...)
using TypedEncoder<DType>::Put;
void Put(const T* buffer, int num_values) override {
if (byte_width_ > 0) {
const int64_t total_bytes = static_cast<int64_t>(num_values) * byte_width_;
PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes));
for (int i = 0; i < num_values; ++i) {
// Write the result to the output stream
DCHECK(buffer[i].ptr != nullptr) << "Value ptr cannot be NULL";
sink_.UnsafeAppend(buffer[i].ptr, byte_width_);
}
}
this->num_values_in_buffer_ += num_values;
}
void Put(const ::arrow::Array& values) override {
AssertFixedSizeBinary(values, byte_width_);
const auto& data = checked_cast<const ::arrow::FixedSizeBinaryArray&>(values);
if (data.null_count() == 0) {
// no nulls, just buffer the data
PARQUET_THROW_NOT_OK(sink_.Append(data.raw_values(), data.length() * byte_width_));
this->num_values_in_buffer_ += data.length();
} else {
const int64_t num_values = data.length() - data.null_count();
const int64_t total_bytes = num_values * byte_width_;
PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes));
// TODO use VisitSetBitRunsVoid
for (int64_t i = 0; i < data.length(); i++) {
if (data.IsValid(i)) {
sink_.UnsafeAppend(data.Value(i), byte_width_);
}
}
this->num_values_in_buffer_ += num_values;
}
}
};
// ----------------------------------------------------------------------
// DELTA_BINARY_PACKED encoder
/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
/// as per the parquet spec. See:
/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
///
/// Consists of a header followed by blocks of delta encoded values binary packed.
///
/// Format
/// [header] [block 1] [block 2] ... [block N]
///
/// Header
/// [block size] [number of mini blocks per block] [total value count] [first value]
///
/// Block
/// [min delta] [list of bitwidths of the mini blocks] [miniblocks]
///
/// Sets aside bytes at the start of the internal buffer where the header will be written,
/// and only writes the header when FlushValues is called before returning it.
///
/// To encode a block, we will:
///
/// 1. Compute the differences between consecutive elements. For the first element in the
/// block, use the last element in the previous block or, in the case of the first block,
/// use the first value of the whole sequence, stored in the header.
///
/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract
/// this min delta from all deltas in the block. This guarantees that all values are
/// non-negative.
///
/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the
/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed
/// per mini block.
///
/// Supports only INT32 and INT64.
template <typename DType>
class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
// Maximum possible header size
static constexpr uint32_t kMaxPageHeaderWriterSize = 32;
static constexpr uint32_t kValuesPerBlock =
std::is_same_v<int32_t, typename DType::c_type> ? 128 : 256;
static constexpr uint32_t kMiniBlocksPerBlock = 4;
public:
using T = typename DType::c_type;
using UT = std::make_unsigned_t<T>;
using TypedEncoder<DType>::Put;
explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
const uint32_t values_per_block = kValuesPerBlock,
const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
: EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
values_per_block_(values_per_block),
mini_blocks_per_block_(mini_blocks_per_block),
values_per_mini_block_(values_per_block / mini_blocks_per_block),
deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
bits_buffer_(
AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * sizeof(T))),
sink_(pool),
bit_writer_(bits_buffer_->mutable_data(),
static_cast<int>(bits_buffer_->size())) {
if (values_per_block_ % 128 != 0) {
throw ParquetException(
"the number of values in a block must be multiple of 128, but it's " +
std::to_string(values_per_block_));
}
if (values_per_mini_block_ % 32 != 0) {
throw ParquetException(
"the number of values in a miniblock must be multiple of 32, but it's " +
std::to_string(values_per_mini_block_));
}
if (values_per_block % mini_blocks_per_block != 0) {
throw ParquetException(
"the number of values per block % number of miniblocks per block must be 0, "
"but it's " +
std::to_string(values_per_block % mini_blocks_per_block));
}
// Reserve enough space at the beginning of the buffer for largest possible header.
PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));
}
std::shared_ptr<Buffer> FlushValues() override;
int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
void Put(const ::arrow::Array& values) override;
void Put(const T* buffer, int num_values) override;
void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) override;
void FlushBlock();
private:
const uint32_t values_per_block_;
const uint32_t mini_blocks_per_block_;
const uint32_t values_per_mini_block_;
uint32_t values_current_block_{0};
uint32_t total_value_count_{0};
T first_value_{0};
T current_value_{0};
ArrowPoolVector<T> deltas_;
std::shared_ptr<ResizableBuffer> bits_buffer_;
::arrow::BufferBuilder sink_;
::arrow::bit_util::BitWriter bit_writer_;
};
template <typename DType>
void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
if (num_values == 0) {
return;
}
int idx = 0;
if (total_value_count_ == 0) {
current_value_ = src[0];
first_value_ = current_value_;
idx = 1;
}
total_value_count_ += num_values;
while (idx < num_values) {
T value = src[idx];
// Calculate deltas. The possible overflow is handled by use of unsigned integers
// making subtraction operations well-defined and correct even in case of overflow.
// Encoded integers will wrap back around on decoding.
// See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
current_value_ = value;
idx++;
values_current_block_++;
if (values_current_block_ == values_per_block_) {
FlushBlock();
}
}
}
template <typename DType>
void DeltaBitPackEncoder<DType>::FlushBlock() {
if (values_current_block_ == 0) {
return;
}
// Calculate the frame of reference for this miniblock. This value will be subtracted
// from all deltas to guarantee all deltas are positive for encoding.
const T min_delta =
*std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
bit_writer_.PutZigZagVlqInt(min_delta);
// Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to write
// bit widths of miniblocks as they become known during the encoding.
uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
DCHECK(bit_width_data != nullptr);
const uint32_t num_miniblocks =
static_cast<uint32_t>(std::ceil(static_cast<double>(values_current_block_) /
static_cast<double>(values_per_mini_block_)));
for (uint32_t i = 0; i < num_miniblocks; i++) {
const uint32_t values_current_mini_block =
std::min(values_per_mini_block_, values_current_block_);
const uint32_t start = i * values_per_mini_block_;
const T max_delta = *std::max_element(
deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block);
// The minimum number of bits required to write any of values in deltas_ vector.
// See overflow comment above.
const auto bit_width = bit_width_data[i] = bit_util::NumRequiredBits(
static_cast<UT>(max_delta) - static_cast<UT>(min_delta));
for (uint32_t j = start; j < start + values_current_mini_block; j++) {
// Convert delta to frame of reference. See overflow comment above.
const UT value = static_cast<UT>(deltas_[j]) - static_cast<UT>(min_delta);
bit_writer_.PutValue(value, bit_width);
}
// If there are not enough values to fill the last mini block, we pad the mini block
// with zeroes so that its length is the number of values in a full mini block
// multiplied by the bit width.
for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) {
bit_writer_.PutValue(0, bit_width);
}
values_current_block_ -= values_current_mini_block;
}
// If, in the last block, less than <number of miniblocks in a block> miniblocks are
// needed to store the values, the bytes storing the bit widths of the unneeded
// miniblocks are still present, their value should be zero, but readers must accept
// arbitrary values as well.
for (uint32_t i = num_miniblocks; i < mini_blocks_per_block_; i++) {
bit_width_data[i] = 0;
}
DCHECK_EQ(values_current_block_, 0);
bit_writer_.Flush();
PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
bit_writer_.Clear();
}
template <typename DType>
std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
if (values_current_block_ > 0) {
FlushBlock();
}
PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/true));
uint8_t header_buffer_[kMaxPageHeaderWriterSize] = {};
bit_util::BitWriter header_writer(header_buffer_, sizeof(header_buffer_));
if (!header_writer.PutVlqInt(values_per_block_) ||
!header_writer.PutVlqInt(mini_blocks_per_block_) ||
!header_writer.PutVlqInt(total_value_count_) ||
!header_writer.PutZigZagVlqInt(static_cast<T>(first_value_))) {
throw ParquetException("header writing error");
}
header_writer.Flush();
// We reserved enough space at the beginning of the buffer for largest possible header
// and data was written immediately after. We now write the header data immediately
// before the end of reserved space.
const size_t offset_bytes = kMaxPageHeaderWriterSize - header_writer.bytes_written();
std::memcpy(buffer->mutable_data() + offset_bytes, header_buffer_,
header_writer.bytes_written());
// Reset counter of cached values
total_value_count_ = 0;
// Reserve enough space at the beginning of the buffer for largest possible header.
PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));
// Excess bytes at the beginning are sliced off and ignored.
return SliceBuffer(std::move(buffer), offset_bytes);
}
template <>
void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {
const ::arrow::ArrayData& data = *values.data();
if (values.type_id() != ::arrow::Type::INT32) {
throw ParquetException("Expected Int32TArray, got ", values.type()->ToString());
}
if (data.length > std::numeric_limits<int32_t>::max()) {
throw ParquetException("Array cannot be longer than ",
std::numeric_limits<int32_t>::max());
}
if (values.null_count() == 0) {
Put(data.GetValues<int32_t>(1), static_cast<int>(data.length));
} else {
PutSpaced(data.GetValues<int32_t>(1), static_cast<int>(data.length),
data.GetValues<uint8_t>(0, 0), data.offset);
}
}
template <>
void DeltaBitPackEncoder<Int64Type>::Put(const ::arrow::Array& values) {
const ::arrow::ArrayData& data = *values.data();
if (values.type_id() != ::arrow::Type::INT64) {
throw ParquetException("Expected Int64TArray, got ", values.type()->ToString());
}
if (data.length > std::numeric_limits<int32_t>::max()) {
throw ParquetException("Array cannot be longer than ",
std::numeric_limits<int32_t>::max());
}
if (values.null_count() == 0) {
Put(data.GetValues<int64_t>(1), static_cast<int>(data.length));
} else {
PutSpaced(data.GetValues<int64_t>(1), static_cast<int>(data.length),
data.GetValues<uint8_t>(0, 0), data.offset);
}
}
template <typename DType>
void DeltaBitPackEncoder<DType>::PutSpaced(const T* src, int num_values,
const uint8_t* valid_bits,
int64_t valid_bits_offset) {
if (valid_bits != NULLPTR) {
PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
this->memory_pool()));
T* data = buffer->template mutable_data_as<T>();
int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
src, num_values, valid_bits, valid_bits_offset, data);
Put(data, num_valid_values);
} else {
Put(src, num_values);
}
}
// ----------------------------------------------------------------------
// DELTA_LENGTH_BYTE_ARRAY encoder
class DeltaLengthByteArrayEncoder : public EncoderImpl,
virtual public TypedEncoder<ByteArrayType> {
public:
explicit DeltaLengthByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
: EncoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY,
pool = ::arrow::default_memory_pool()),
sink_(pool),
length_encoder_(nullptr, pool) {}
std::shared_ptr<Buffer> FlushValues() override;
int64_t EstimatedDataEncodedSize() override {
return sink_.length() + length_encoder_.EstimatedDataEncodedSize();
}
using TypedEncoder<ByteArrayType>::Put;
void Put(const ::arrow::Array& values) override;
void Put(const T* buffer, int num_values) override;
void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) override;
protected:
template <typename ArrayType>
void PutBinaryArray(const ArrayType& array) {
PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename ArrayType::TypeClass>(
*array.data(),
[&](::std::string_view view) {
if (ARROW_PREDICT_FALSE(view.size() > kMaxByteArraySize)) {
return Status::Invalid(
"Parquet cannot store strings with size 2GB or more, got: ", view.size());
}
if (ARROW_PREDICT_FALSE(
view.size() + sink_.length() >
static_cast<size_t>(std::numeric_limits<int32_t>::max()))) {
return Status::Invalid("excess expansion in DELTA_LENGTH_BYTE_ARRAY");
}
length_encoder_.Put({static_cast<int32_t>(view.length())}, 1);
PARQUET_THROW_NOT_OK(sink_.Append(view.data(), view.length()));
unencoded_byte_array_data_bytes_ += view.size();
return Status::OK();
},
[]() { return Status::OK(); }));
}
::arrow::BufferBuilder sink_;
DeltaBitPackEncoder<Int32Type> length_encoder_;
};
void DeltaLengthByteArrayEncoder::Put(const ::arrow::Array& values) {
AssertVarLengthBinary(values);
if (::arrow::is_binary_like(values.type_id())) {
PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
} else if (::arrow::is_large_binary_like(values.type_id())) {
PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
} else if (::arrow::is_binary_view_like(values.type_id())) {
PutBinaryArray(checked_cast<const ::arrow::BinaryViewArray&>(values));
} else {
throw ParquetException("Only binary-like data supported, got " +
values.type()->ToString());
}
}
void DeltaLengthByteArrayEncoder::Put(const T* src, int num_values) {
if (num_values == 0) {
return;
}
constexpr int kBatchSize = 256;
std::array<int32_t, kBatchSize> lengths;
uint32_t total_increment_size = 0;
for (int idx = 0; idx < num_values; idx += kBatchSize) {
const int batch_size = std::min(kBatchSize, num_values - idx);
for (int j = 0; j < batch_size; ++j) {
const int32_t len = src[idx + j].len;
if (ARROW_PREDICT_FALSE(
AddWithOverflow(total_increment_size, len, &total_increment_size))) {
throw ParquetException("excess expansion in DELTA_LENGTH_BYTE_ARRAY");
}
lengths[j] = len;
}
length_encoder_.Put(lengths.data(), batch_size);
}
if (sink_.length() + total_increment_size > std::numeric_limits<int32_t>::max()) {
throw ParquetException("excess expansion in DELTA_LENGTH_BYTE_ARRAY");
}
PARQUET_THROW_NOT_OK(sink_.Reserve(total_increment_size));
for (int idx = 0; idx < num_values; idx++) {
sink_.UnsafeAppend(src[idx].ptr, src[idx].len);
}
unencoded_byte_array_data_bytes_ += total_increment_size;
}
void DeltaLengthByteArrayEncoder::PutSpaced(const T* src, int num_values,
const uint8_t* valid_bits,
int64_t valid_bits_offset) {
if (valid_bits != NULLPTR) {
PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
this->memory_pool()));
T* data = buffer->template mutable_data_as<T>();
int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
src, num_values, valid_bits, valid_bits_offset, data);
Put(data, num_valid_values);
} else {
Put(src, num_values);
}
}
std::shared_ptr<Buffer> DeltaLengthByteArrayEncoder::FlushValues() {
std::shared_ptr<Buffer> encoded_lengths = length_encoder_.FlushValues();
std::shared_ptr<Buffer> data;
PARQUET_THROW_NOT_OK(sink_.Finish(&data));
sink_.Reset();
PARQUET_THROW_NOT_OK(sink_.Resize(encoded_lengths->size() + data->size()));
PARQUET_THROW_NOT_OK(sink_.Append(encoded_lengths->data(), encoded_lengths->size()));
PARQUET_THROW_NOT_OK(sink_.Append(data->data(), data->size()));
std::shared_ptr<Buffer> buffer;
PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true));
return buffer;
}
// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY encoder
/// Delta Byte Array encoding also known as incremental encoding or front compression:
/// for each element in a sequence of strings, store the prefix length of the previous
/// entry plus the suffix.
///
/// This is stored as a sequence of delta-encoded prefix lengths (DELTA_BINARY_PACKED),
/// followed by the suffixes encoded as delta length byte arrays
/// (DELTA_LENGTH_BYTE_ARRAY).
template <typename DType>
class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
static constexpr std::string_view kEmpty = "";
public:
using T = typename DType::c_type;
explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
MemoryPool* pool = ::arrow::default_memory_pool())
: EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
sink_(pool),
prefix_length_encoder_(/*descr=*/nullptr, pool),
suffix_encoder_(descr, pool),
last_value_(""),
empty_(static_cast<uint32_t>(kEmpty.size()),
reinterpret_cast<const uint8_t*>(kEmpty.data())) {}
std::shared_ptr<Buffer> FlushValues() override;
int64_t EstimatedDataEncodedSize() override {
return prefix_length_encoder_.EstimatedDataEncodedSize() +
suffix_encoder_.EstimatedDataEncodedSize();
}
using TypedEncoder<DType>::Put;
void Put(const ::arrow::Array& values) override;
void Put(const T* buffer, int num_values) override;
void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) override {
if (valid_bits != nullptr) {
if (buffer_ == nullptr) {
PARQUET_ASSIGN_OR_THROW(buffer_,
::arrow::AllocateResizableBuffer(num_values * sizeof(T),
this->memory_pool()));
} else {
PARQUET_THROW_NOT_OK(buffer_->Resize(num_values * sizeof(T), false));
}
T* data = buffer_->mutable_data_as<T>();
int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
src, num_values, valid_bits, valid_bits_offset, data);
Put(data, num_valid_values);
} else {
Put(src, num_values);
}
}
protected:
template <typename VisitorType>
void PutInternal(const T* src, int num_values, const VisitorType visitor) {
if (num_values == 0) {
return;
}
std::string_view last_value_view = last_value_;
constexpr int kBatchSize = 256;
std::array<int32_t, kBatchSize> prefix_lengths;
std::array<ByteArray, kBatchSize> suffixes;
for (int i = 0; i < num_values; i += kBatchSize) {
const int batch_size = std::min(kBatchSize, num_values - i);
for (int j = 0; j < batch_size; ++j) {
const int idx = i + j;
const auto view = visitor[idx];
const auto len = static_cast<const uint32_t>(view.length());
uint32_t common_prefix_length = 0;
const uint32_t maximum_common_prefix_length =
std::min(len, static_cast<uint32_t>(last_value_view.length()));
while (common_prefix_length < maximum_common_prefix_length) {
if (last_value_view[common_prefix_length] != view[common_prefix_length]) {
break;
}
common_prefix_length++;
}
last_value_view = view;
prefix_lengths[j] = common_prefix_length;
const uint32_t suffix_length = len - common_prefix_length;
const uint8_t* suffix_ptr = src[idx].ptr + common_prefix_length;
// Convert to ByteArray, so it can be passed to the suffix_encoder_.
const ByteArray suffix(suffix_length, suffix_ptr);
suffixes[j] = suffix;
unencoded_byte_array_data_bytes_ += len;
}
suffix_encoder_.Put(suffixes.data(), batch_size);
prefix_length_encoder_.Put(prefix_lengths.data(), batch_size);
}
last_value_ = last_value_view;
}
template <typename ArrayType>
void PutBinaryArray(const ArrayType& array) {
auto previous_len = static_cast<uint32_t>(last_value_.length());
std::string_view last_value_view = last_value_;
PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename ArrayType::TypeClass>(
*array.data(),
[&](::std::string_view view) {
if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) {
return Status::Invalid(
"Parquet cannot store strings with size 2GB or more, got: ", view.size());
}
const ByteArray src{view};
uint32_t common_prefix_length = 0;
const uint32_t len = src.len;
const uint32_t maximum_common_prefix_length = std::min(previous_len, len);
while (common_prefix_length < maximum_common_prefix_length) {
if (last_value_view[common_prefix_length] != view[common_prefix_length]) {
break;
}
common_prefix_length++;
}
previous_len = len;
prefix_length_encoder_.Put({static_cast<int32_t>(common_prefix_length)}, 1);
last_value_view = view;
const auto suffix_length = static_cast<uint32_t>(len - common_prefix_length);
if (suffix_length == 0) {
suffix_encoder_.Put(&empty_, 1);
return Status::OK();
}
const uint8_t* suffix_ptr = src.ptr + common_prefix_length;
// Convert to ByteArray, so it can be passed to the suffix_encoder_.
const ByteArray suffix(suffix_length, suffix_ptr);
suffix_encoder_.Put(&suffix, 1);
unencoded_byte_array_data_bytes_ += len;
return Status::OK();
},
[]() { return Status::OK(); }));
last_value_ = last_value_view;
}
::arrow::BufferBuilder sink_;
DeltaBitPackEncoder<Int32Type> prefix_length_encoder_;
DeltaLengthByteArrayEncoder suffix_encoder_;
std::string last_value_;
const ByteArray empty_;
std::unique_ptr<ResizableBuffer> buffer_;
};
struct ByteArrayVisitor {
const ByteArray* src;
std::string_view operator[](int i) const {
if (ARROW_PREDICT_FALSE(src[i].len >= kMaxByteArraySize)) {
throw ParquetException("Parquet cannot store strings with size 2GB or more, got: ",
src[i].len);
}
return std::string_view{src[i]};
}
uint32_t len(int i) const { return src[i].len; }
};
struct FLBAVisitor {
const FLBA* src;
const uint32_t type_length;
std::string_view operator[](int i) const {
return std::string_view{reinterpret_cast<const char*>(src[i].ptr), type_length};
}
uint32_t len(int i) const { return type_length; }
};
template <>
void DeltaByteArrayEncoder<ByteArrayType>::Put(const ByteArray* src, int num_values) {
auto visitor = ByteArrayVisitor{src};
PutInternal<ByteArrayVisitor>(src, num_values, visitor);
}
template <>
void DeltaByteArrayEncoder<FLBAType>::Put(const FLBA* src, int num_values) {
auto visitor = FLBAVisitor{src, static_cast<uint32_t>(descr_->type_length())};
PutInternal<FLBAVisitor>(src, num_values, visitor);
}
template <typename DType>
void DeltaByteArrayEncoder<DType>::Put(const ::arrow::Array& values) {
if (::arrow::is_binary_like(values.type_id())) {
PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
} else if (::arrow::is_large_binary_like(values.type_id())) {
PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
} else if (::arrow::is_binary_view_like(values.type_id())) {
PutBinaryArray(checked_cast<const ::arrow::BinaryViewArray&>(values));
} else if (::arrow::is_fixed_size_binary(values.type_id())) {
PutBinaryArray(checked_cast<const ::arrow::FixedSizeBinaryArray&>(values));
} else {
throw ParquetException("Only binary-like data supported");
}
}
template <typename DType>
std::shared_ptr<Buffer> DeltaByteArrayEncoder<DType>::FlushValues() {
PARQUET_THROW_NOT_OK(sink_.Resize(EstimatedDataEncodedSize(), false));
std::shared_ptr<Buffer> prefix_lengths = prefix_length_encoder_.FlushValues();
PARQUET_THROW_NOT_OK(sink_.Append(prefix_lengths->data(), prefix_lengths->size()));
std::shared_ptr<Buffer> suffixes = suffix_encoder_.FlushValues();
PARQUET_THROW_NOT_OK(sink_.Append(suffixes->data(), suffixes->size()));
std::shared_ptr<Buffer> buffer;
PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true));
last_value_.clear();
return buffer;
}
// ----------------------------------------------------------------------
// RLE encoder for BOOLEAN
class RleBooleanEncoder final : public EncoderImpl, virtual public BooleanEncoder {
public:
explicit RleBooleanEncoder(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
: EncoderImpl(descr, Encoding::RLE, pool),
buffered_append_values_(::arrow::stl::allocator<T>(pool)) {}
int64_t EstimatedDataEncodedSize() override {
return kRleLengthInBytes + MaxRleBufferSize();
}
std::shared_ptr<Buffer> FlushValues() override;
void Put(const T* buffer, int num_values) override;
void Put(const ::arrow::Array& values) override {
if (values.type_id() != ::arrow::Type::BOOL) {
throw ParquetException("RleBooleanEncoder expects BooleanArray, got ",
values.type()->ToString());
}
const auto& boolean_array = checked_cast<const ::arrow::BooleanArray&>(values);
if (values.null_count() == 0) {
for (int i = 0; i < boolean_array.length(); ++i) {
// null_count == 0, so just call Value directly is ok.
buffered_append_values_.push_back(boolean_array.Value(i));
}
} else {
PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<::arrow::BooleanType>(
*boolean_array.data(),
[&](bool value) {
buffered_append_values_.push_back(value);
return Status::OK();
},
[]() { return Status::OK(); }));
}
}
void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) override {
if (valid_bits != NULLPTR) {
PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
this->memory_pool()));
T* data = buffer->mutable_data_as<T>();
int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
src, num_values, valid_bits, valid_bits_offset, data);
Put(data, num_valid_values);
} else {
Put(src, num_values);
}
}
void Put(const std::vector<bool>& src, int num_values) override;
protected:
template <typename SequenceType>
void PutImpl(const SequenceType& src, int num_values);
int64_t MaxRleBufferSize() const noexcept {
return RlePreserveBufferSize(static_cast<int64_t>(buffered_append_values_.size()),
kBitWidth);
}
constexpr static int32_t kBitWidth = 1;
/// 4 bytes in little-endian, which indicates the length.
constexpr static int32_t kRleLengthInBytes = 4;
// std::vector<bool> in C++ is tricky, because it's a bitmap.
// Here RleBooleanEncoder will only append values into it, and
// dump values into Buffer, so using it here is ok.
ArrowPoolVector<bool> buffered_append_values_;
};
void RleBooleanEncoder::Put(const bool* src, int num_values) { PutImpl(src, num_values); }
void RleBooleanEncoder::Put(const std::vector<bool>& src, int num_values) {
PutImpl(src, num_values);
}
template <typename SequenceType>
void RleBooleanEncoder::PutImpl(const SequenceType& src, int num_values) {
for (int i = 0; i < num_values; ++i) {
buffered_append_values_.push_back(src[i]);
}
}
std::shared_ptr<Buffer> RleBooleanEncoder::FlushValues() {
int64_t rle_buffer_size_max = MaxRleBufferSize();
if (rle_buffer_size_max > std::numeric_limits<int>::max()) {
std::stringstream ss;
ss << "Buffer size for RleBooleanEncoder (" << rle_buffer_size_max
<< ") exceeds maximum int value";
throw ParquetException(ss.str());
}
std::shared_ptr<ResizableBuffer> buffer =
AllocateBuffer(this->pool_, rle_buffer_size_max + kRleLengthInBytes);
::arrow::util::RleBitPackedEncoder encoder(buffer->mutable_data() + kRleLengthInBytes,
static_cast<int>(rle_buffer_size_max),
/*bit_width*/ kBitWidth);
for (bool value : buffered_append_values_) {
encoder.Put(value ? 1 : 0);
}
encoder.Flush();
::arrow::util::SafeStore(buffer->mutable_data(),
::arrow::bit_util::ToLittleEndian(encoder.len()));
PARQUET_THROW_NOT_OK(buffer->Resize(kRleLengthInBytes + encoder.len()));
buffered_append_values_.clear();
return buffer;
}
} // namespace
// ----------------------------------------------------------------------
// Factory function
std::unique_ptr<Encoder> MakeEncoder(Type::type type_num, Encoding::type encoding,
bool use_dictionary, const ColumnDescriptor* descr,
MemoryPool* pool) {
if (use_dictionary) {
switch (type_num) {
case Type::INT32:
return std::make_unique<DictEncoderImpl<Int32Type>>(descr, pool);
case Type::INT64:
return std::make_unique<DictEncoderImpl<Int64Type>>(descr, pool);
case Type::INT96:
return std::make_unique<DictEncoderImpl<Int96Type>>(descr, pool);
case Type::FLOAT:
return std::make_unique<DictEncoderImpl<FloatType>>(descr, pool);
case Type::DOUBLE:
return std::make_unique<DictEncoderImpl<DoubleType>>(descr, pool);
case Type::BYTE_ARRAY:
return std::make_unique<DictEncoderImpl<ByteArrayType>>(descr, pool);
case Type::FIXED_LEN_BYTE_ARRAY:
return std::make_unique<DictEncoderImpl<FLBAType>>(descr, pool);
default:
DCHECK(false) << "Encoder not implemented";
break;
}
} else if (encoding == Encoding::PLAIN) {
switch (type_num) {
case Type::BOOLEAN:
return std::make_unique<PlainEncoder<BooleanType>>(descr, pool);
case Type::INT32:
return std::make_unique<PlainEncoder<Int32Type>>(descr, pool);
case Type::INT64:
return std::make_unique<PlainEncoder<Int64Type>>(descr, pool);
case Type::INT96:
return std::make_unique<PlainEncoder<Int96Type>>(descr, pool);
case Type::FLOAT:
return std::make_unique<PlainEncoder<FloatType>>(descr, pool);
case Type::DOUBLE:
return std::make_unique<PlainEncoder<DoubleType>>(descr, pool);
case Type::BYTE_ARRAY:
return std::make_unique<PlainEncoder<ByteArrayType>>(descr, pool);
case Type::FIXED_LEN_BYTE_ARRAY:
return std::make_unique<PlainEncoder<FLBAType>>(descr, pool);
default:
DCHECK(false) << "Encoder not implemented";
break;
}
} else if (encoding == Encoding::BYTE_STREAM_SPLIT) {
switch (type_num) {
case Type::INT32:
return std::make_unique<ByteStreamSplitEncoder<Int32Type>>(descr, pool);
case Type::INT64:
return std::make_unique<ByteStreamSplitEncoder<Int64Type>>(descr, pool);
case Type::FLOAT:
return std::make_unique<ByteStreamSplitEncoder<FloatType>>(descr, pool);
case Type::DOUBLE:
return std::make_unique<ByteStreamSplitEncoder<DoubleType>>(descr, pool);
case Type::FIXED_LEN_BYTE_ARRAY:
return std::make_unique<ByteStreamSplitEncoder<FLBAType>>(descr, pool);
default:
throw ParquetException(
"BYTE_STREAM_SPLIT only supports FLOAT, DOUBLE, INT32, INT64 "
"and FIXED_LEN_BYTE_ARRAY");
}
} else if (encoding == Encoding::DELTA_BINARY_PACKED) {
switch (type_num) {
case Type::INT32:
return std::make_unique<DeltaBitPackEncoder<Int32Type>>(descr, pool);
case Type::INT64:
return std::make_unique<DeltaBitPackEncoder<Int64Type>>(descr, pool);
default:
throw ParquetException(
"DELTA_BINARY_PACKED encoder only supports INT32 and INT64");
}
} else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) {
switch (type_num) {
case Type::BYTE_ARRAY:
return std::make_unique<DeltaLengthByteArrayEncoder>(descr, pool);
default:
throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY");
}
} else if (encoding == Encoding::RLE) {
switch (type_num) {
case Type::BOOLEAN:
return std::make_unique<RleBooleanEncoder>(descr, pool);
default:
throw ParquetException("RLE only supports BOOLEAN");
}
} else if (encoding == Encoding::DELTA_BYTE_ARRAY) {
switch (type_num) {
case Type::BYTE_ARRAY:
return std::make_unique<DeltaByteArrayEncoder<ByteArrayType>>(descr, pool);
case Type::FIXED_LEN_BYTE_ARRAY:
return std::make_unique<DeltaByteArrayEncoder<FLBAType>>(descr, pool);
default:
throw ParquetException(
"DELTA_BYTE_ARRAY only supports BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY");
}
} else {
ParquetException::NYI("Selected encoding is not supported");
}
DCHECK(false) << "Should not be able to reach this code";
return nullptr;
}
} // namespace parquet