blob: 797d435e73e816b3c83885b806b18398f8b84416 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "parquet/column_writer.h"
#include <algorithm>
#include <cstdint>
#include <cstring>
#include <map>
#include <memory>
#include <utility>
#include <vector>
#include "arrow/array.h"
#include "arrow/buffer_builder.h"
#include "arrow/compute/api.h"
#include "arrow/io/memory.h"
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_stream_utils_internal.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/compression.h"
#include "arrow/util/crc32.h"
#include "arrow/util/endian.h"
#include "arrow/util/float16.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging_internal.h"
#include "arrow/util/rle_encoding_internal.h"
#include "arrow/util/type_traits.h"
#include "arrow/util/unreachable.h"
#include "arrow/visit_array_inline.h"
#include "arrow/visit_data_inline.h"
#include "parquet/bloom_filter_writer.h"
#include "parquet/chunker_internal.h"
#include "parquet/column_page.h"
#include "parquet/encoding.h"
#include "parquet/encryption/encryption_internal.h"
#include "parquet/encryption/internal_file_encryptor.h"
#include "parquet/level_conversion.h"
#include "parquet/metadata.h"
#include "parquet/page_index.h"
#include "parquet/platform.h"
#include "parquet/properties.h"
#include "parquet/schema.h"
#include "parquet/size_statistics.h"
#include "parquet/statistics.h"
#include "parquet/thrift_internal.h"
#include "parquet/types.h"
using arrow::Array;
using arrow::ArrayData;
using arrow::Datum;
using arrow::Result;
using arrow::Status;
using arrow::bit_util::BitWriter;
using arrow::internal::checked_cast;
using arrow::internal::checked_pointer_cast;
using arrow::util::Float16;
using arrow::util::RleBitPackedEncoder;
namespace bit_util = arrow::bit_util;
namespace parquet {
namespace {
// Visitor that extracts the value buffer from a FlatArray at a given offset.
struct ValueBufferSlicer {
template <typename T>
::arrow::enable_if_base_binary<typename T::TypeClass, Status> Visit(
const T& array, std::shared_ptr<Buffer>* buffer) {
auto data = array.data();
*buffer =
SliceBuffer(data->buffers[1], data->offset * sizeof(typename T::offset_type),
data->length * sizeof(typename T::offset_type));
return Status::OK();
}
template <typename T>
::arrow::enable_if_fixed_size_binary<typename T::TypeClass, Status> Visit(
const T& array, std::shared_ptr<Buffer>* buffer) {
auto data = array.data();
*buffer = SliceBuffer(data->buffers[1], data->offset * array.byte_width(),
data->length * array.byte_width());
return Status::OK();
}
template <typename T>
::arrow::enable_if_t<::arrow::has_c_type<typename T::TypeClass>::value &&
!std::is_same<BooleanType, typename T::TypeClass>::value,
Status>
Visit(const T& array, std::shared_ptr<Buffer>* buffer) {
auto data = array.data();
*buffer = SliceBuffer(
data->buffers[1],
::arrow::TypeTraits<typename T::TypeClass>::bytes_required(data->offset),
::arrow::TypeTraits<typename T::TypeClass>::bytes_required(data->length));
return Status::OK();
}
Status Visit(const ::arrow::BooleanArray& array, std::shared_ptr<Buffer>* buffer) {
auto data = array.data();
if (bit_util::IsMultipleOf8(data->offset)) {
*buffer = SliceBuffer(data->buffers[1], bit_util::BytesForBits(data->offset),
bit_util::BytesForBits(data->length));
return Status::OK();
}
PARQUET_ASSIGN_OR_THROW(*buffer,
::arrow::internal::CopyBitmap(pool_, data->buffers[1]->data(),
data->offset, data->length));
return Status::OK();
}
#define NOT_IMPLEMENTED_VISIT(ArrowTypePrefix) \
Status Visit(const ::arrow::ArrowTypePrefix##Array& array, \
std::shared_ptr<Buffer>* buffer) { \
return Status::NotImplemented("Slicing not implemented for " #ArrowTypePrefix); \
}
NOT_IMPLEMENTED_VISIT(Null);
NOT_IMPLEMENTED_VISIT(Union);
NOT_IMPLEMENTED_VISIT(List);
NOT_IMPLEMENTED_VISIT(LargeList);
NOT_IMPLEMENTED_VISIT(ListView);
NOT_IMPLEMENTED_VISIT(LargeListView);
NOT_IMPLEMENTED_VISIT(Struct);
NOT_IMPLEMENTED_VISIT(FixedSizeList);
NOT_IMPLEMENTED_VISIT(Dictionary);
NOT_IMPLEMENTED_VISIT(RunEndEncoded);
NOT_IMPLEMENTED_VISIT(Extension);
NOT_IMPLEMENTED_VISIT(BinaryView);
NOT_IMPLEMENTED_VISIT(StringView);
#undef NOT_IMPLEMENTED_VISIT
MemoryPool* pool_;
};
template <class T>
inline const T* AddIfNotNull(const T* base, int64_t offset) {
if (base != nullptr) {
return base + offset;
}
return nullptr;
}
} // namespace
LevelEncoder::LevelEncoder() {}
LevelEncoder::~LevelEncoder() {}
void LevelEncoder::Init(Encoding::type encoding, int16_t max_level,
int num_buffered_values, uint8_t* data, int data_size) {
bit_width_ = bit_util::Log2(max_level + 1);
encoding_ = encoding;
switch (encoding) {
case Encoding::RLE: {
rle_encoder_ = std::make_unique<RleBitPackedEncoder>(data, data_size, bit_width_);
break;
}
case Encoding::BIT_PACKED: {
int num_bytes =
static_cast<int>(bit_util::BytesForBits(num_buffered_values * bit_width_));
bit_packed_encoder_ = std::make_unique<BitWriter>(data, num_bytes);
break;
}
default:
throw ParquetException("Unknown encoding type for levels.");
}
}
int LevelEncoder::MaxBufferSize(Encoding::type encoding, int16_t max_level,
int num_buffered_values) {
int bit_width = bit_util::Log2(max_level + 1);
int64_t num_bytes = 0;
switch (encoding) {
case Encoding::RLE: {
// TODO: Due to the way we currently check if the buffer is full enough,
// we need to have MinBufferSize as head room.
num_bytes = RleBitPackedEncoder::MaxBufferSize(bit_width, num_buffered_values) +
RleBitPackedEncoder::MinBufferSize(bit_width);
break;
}
case Encoding::BIT_PACKED: {
num_bytes = bit_util::BytesForBits(num_buffered_values * bit_width);
break;
}
default:
throw ParquetException("Unknown encoding type for levels.");
}
if (num_bytes > std::numeric_limits<int>::max()) {
std::stringstream ss;
ss << "Maximum buffer size for LevelEncoder (" << num_bytes
<< ") is greater than the maximum int32 value";
throw ParquetException(ss.str());
}
return static_cast<int>(num_bytes);
}
int LevelEncoder::Encode(int batch_size, const int16_t* levels) {
int num_encoded = 0;
if (!rle_encoder_ && !bit_packed_encoder_) {
throw ParquetException("Level encoders are not initialized.");
}
if (encoding_ == Encoding::RLE) {
for (int i = 0; i < batch_size; ++i) {
if (!rle_encoder_->Put(*(levels + i))) {
break;
}
++num_encoded;
}
rle_encoder_->Flush();
rle_length_ = rle_encoder_->len();
} else {
for (int i = 0; i < batch_size; ++i) {
if (!bit_packed_encoder_->PutValue(*(levels + i), bit_width_)) {
break;
}
++num_encoded;
}
bit_packed_encoder_->Flush();
}
return num_encoded;
}
// ----------------------------------------------------------------------
// PageWriter implementation
// This subclass delimits pages appearing in a serialized stream, each preceded
// by a serialized Thrift format::PageHeader indicating the type of each page
// and the page metadata.
class SerializedPageWriter : public PageWriter {
public:
SerializedPageWriter(std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
ColumnChunkMetaDataBuilder* metadata, int16_t row_group_ordinal,
int16_t column_chunk_ordinal, bool use_page_checksum_verification,
MemoryPool* pool = ::arrow::default_memory_pool(),
std::shared_ptr<Encryptor> meta_encryptor = nullptr,
std::shared_ptr<Encryptor> data_encryptor = nullptr,
ColumnIndexBuilder* column_index_builder = nullptr,
OffsetIndexBuilder* offset_index_builder = nullptr,
const CodecOptions& codec_options = CodecOptions{})
: sink_(std::move(sink)),
metadata_(metadata),
pool_(pool),
num_values_(0),
dictionary_page_offset_(0),
data_page_offset_(0),
total_uncompressed_size_(0),
total_compressed_size_(0),
page_ordinal_(0),
row_group_ordinal_(row_group_ordinal),
column_ordinal_(column_chunk_ordinal),
page_checksum_verification_(use_page_checksum_verification),
meta_encryptor_(std::move(meta_encryptor)),
data_encryptor_(std::move(data_encryptor)),
encryption_buffer_(AllocateBuffer(pool, 0)),
column_index_builder_(column_index_builder),
offset_index_builder_(offset_index_builder) {
if (data_encryptor_ != nullptr || meta_encryptor_ != nullptr) {
InitEncryption();
}
compressor_ = GetCodec(codec, codec_options);
thrift_serializer_ = std::make_unique<ThriftSerializer>();
}
int64_t WriteDictionaryPage(const DictionaryPage& page) override {
int64_t uncompressed_size = page.buffer()->size();
if (uncompressed_size > std::numeric_limits<int32_t>::max()) {
throw ParquetException(
"Uncompressed dictionary page size overflows INT32_MAX. Size:",
uncompressed_size);
}
std::shared_ptr<Buffer> compressed_data;
if (has_compressor()) {
auto buffer = std::static_pointer_cast<ResizableBuffer>(
AllocateBuffer(pool_, uncompressed_size));
Compress(*(page.buffer().get()), buffer.get());
compressed_data = std::static_pointer_cast<Buffer>(buffer);
} else {
compressed_data = page.buffer();
}
format::DictionaryPageHeader dict_page_header;
dict_page_header.__set_num_values(page.num_values());
dict_page_header.__set_encoding(ToThrift(page.encoding()));
dict_page_header.__set_is_sorted(page.is_sorted());
const uint8_t* output_data_buffer = compressed_data->data();
if (compressed_data->size() > std::numeric_limits<int32_t>::max()) {
throw ParquetException(
"Compressed dictionary page size overflows INT32_MAX. Size: ",
uncompressed_size);
}
int32_t output_data_len = static_cast<int32_t>(compressed_data->size());
if (data_encryptor_.get()) {
UpdateEncryption(encryption::kDictionaryPage);
PARQUET_THROW_NOT_OK(encryption_buffer_->Resize(
data_encryptor_->CiphertextLength(output_data_len), false));
output_data_len =
data_encryptor_->Encrypt(compressed_data->span_as<uint8_t>(),
encryption_buffer_->mutable_span_as<uint8_t>());
output_data_buffer = encryption_buffer_->data();
}
format::PageHeader page_header;
page_header.__set_type(format::PageType::DICTIONARY_PAGE);
page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
page_header.__set_compressed_page_size(static_cast<int32_t>(output_data_len));
page_header.__set_dictionary_page_header(dict_page_header);
if (page_checksum_verification_) {
uint32_t crc32 =
::arrow::internal::crc32(/* prev */ 0, output_data_buffer, output_data_len);
page_header.__set_crc(static_cast<int32_t>(crc32));
}
PARQUET_ASSIGN_OR_THROW(int64_t start_pos, sink_->Tell());
if (dictionary_page_offset_ == 0) {
dictionary_page_offset_ = start_pos;
}
if (meta_encryptor_) {
UpdateEncryption(encryption::kDictionaryPageHeader);
}
const int64_t header_size =
thrift_serializer_->Serialize(&page_header, sink_.get(), meta_encryptor_.get());
PARQUET_THROW_NOT_OK(sink_->Write(output_data_buffer, output_data_len));
total_uncompressed_size_ += uncompressed_size + header_size;
total_compressed_size_ += output_data_len + header_size;
++dict_encoding_stats_[page.encoding()];
return uncompressed_size + header_size;
}
void Close(bool has_dictionary, bool fallback) override {
if (meta_encryptor_ != nullptr) {
UpdateEncryption(encryption::kColumnMetaData);
}
// Serialized page writer does not need to adjust page offsets.
FinishPageIndexes(/*final_position=*/0);
// index_page_offset = -1 since they are not supported
metadata_->Finish(num_values_, dictionary_page_offset_, -1, data_page_offset_,
total_compressed_size_, total_uncompressed_size_, has_dictionary,
fallback, dict_encoding_stats_, data_encoding_stats_,
meta_encryptor_);
}
/**
* Compress a buffer.
*/
void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override {
DCHECK(compressor_ != nullptr);
// Compress the data
int64_t max_compressed_size =
compressor_->MaxCompressedLen(src_buffer.size(), src_buffer.data());
// Use Arrow::Buffer::shrink_to_fit = false
// underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
PARQUET_THROW_NOT_OK(dest_buffer->Resize(max_compressed_size, false));
PARQUET_ASSIGN_OR_THROW(
int64_t compressed_size,
compressor_->Compress(src_buffer.size(), src_buffer.data(), max_compressed_size,
dest_buffer->mutable_data()));
PARQUET_THROW_NOT_OK(dest_buffer->Resize(compressed_size, false));
}
int64_t WriteDataPage(const DataPage& page) override {
const int64_t uncompressed_size = page.uncompressed_size();
if (uncompressed_size > std::numeric_limits<int32_t>::max()) {
throw ParquetException("Uncompressed data page size overflows INT32_MAX. Size:",
uncompressed_size);
}
std::shared_ptr<Buffer> compressed_data = page.buffer();
const uint8_t* output_data_buffer = compressed_data->data();
int64_t output_data_len = compressed_data->size();
if (output_data_len > std::numeric_limits<int32_t>::max()) {
throw ParquetException("Compressed data page size overflows INT32_MAX. Size:",
output_data_len);
}
if (data_encryptor_.get()) {
PARQUET_THROW_NOT_OK(encryption_buffer_->Resize(
data_encryptor_->CiphertextLength(output_data_len), false));
UpdateEncryption(encryption::kDataPage);
output_data_len =
data_encryptor_->Encrypt(compressed_data->span_as<uint8_t>(),
encryption_buffer_->mutable_span_as<uint8_t>());
output_data_buffer = encryption_buffer_->data();
}
format::PageHeader page_header;
page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
page_header.__set_compressed_page_size(static_cast<int32_t>(output_data_len));
if (page_checksum_verification_) {
uint32_t crc32 =
::arrow::internal::crc32(/* prev */ 0, output_data_buffer, output_data_len);
page_header.__set_crc(static_cast<int32_t>(crc32));
}
if (page.type() == PageType::DATA_PAGE) {
const DataPageV1& v1_page = checked_cast<const DataPageV1&>(page);
SetDataPageHeader(page_header, v1_page);
} else if (page.type() == PageType::DATA_PAGE_V2) {
const DataPageV2& v2_page = checked_cast<const DataPageV2&>(page);
SetDataPageV2Header(page_header, v2_page);
} else {
throw ParquetException("Unexpected page type");
}
PARQUET_ASSIGN_OR_THROW(int64_t start_pos, sink_->Tell());
if (page_ordinal_ == 0) {
data_page_offset_ = start_pos;
}
if (meta_encryptor_) {
UpdateEncryption(encryption::kDataPageHeader);
}
const int64_t header_size =
thrift_serializer_->Serialize(&page_header, sink_.get(), meta_encryptor_.get());
PARQUET_THROW_NOT_OK(sink_->Write(output_data_buffer, output_data_len));
/// Collect page index
if (column_index_builder_ != nullptr) {
column_index_builder_->AddPage(page.statistics(), page.size_statistics());
}
if (offset_index_builder_ != nullptr) {
const int64_t compressed_size = output_data_len + header_size;
if (compressed_size > std::numeric_limits<int32_t>::max()) {
throw ParquetException("Compressed page size ", compressed_size,
" overflows INT32_MAX.");
}
if (!page.first_row_index().has_value()) {
throw ParquetException("First row index is not set in data page.");
}
/// start_pos is a relative offset in the buffered mode. It should be
/// adjusted via OffsetIndexBuilder::Finish() after BufferedPageWriter
/// has flushed all data pages.
offset_index_builder_->AddPage(
start_pos, static_cast<int32_t>(compressed_size), *page.first_row_index(),
page.size_statistics().unencoded_byte_array_data_bytes);
}
total_uncompressed_size_ += uncompressed_size + header_size;
total_compressed_size_ += output_data_len + header_size;
num_values_ += page.num_values();
++data_encoding_stats_[page.encoding()];
++page_ordinal_;
return uncompressed_size + header_size;
}
void SetDataPageHeader(format::PageHeader& page_header, const DataPageV1& page) {
format::DataPageHeader data_page_header;
data_page_header.__set_num_values(page.num_values());
data_page_header.__set_encoding(ToThrift(page.encoding()));
data_page_header.__set_definition_level_encoding(
ToThrift(page.definition_level_encoding()));
data_page_header.__set_repetition_level_encoding(
ToThrift(page.repetition_level_encoding()));
// Write page statistics only when page index is not enabled.
if (column_index_builder_ == nullptr) {
data_page_header.__set_statistics(ToThrift(page.statistics()));
}
page_header.__set_type(format::PageType::DATA_PAGE);
page_header.__set_data_page_header(data_page_header);
}
void SetDataPageV2Header(format::PageHeader& page_header, const DataPageV2& page) {
format::DataPageHeaderV2 data_page_header;
data_page_header.__set_num_values(page.num_values());
data_page_header.__set_num_nulls(page.num_nulls());
data_page_header.__set_num_rows(page.num_rows());
data_page_header.__set_encoding(ToThrift(page.encoding()));
data_page_header.__set_definition_levels_byte_length(
page.definition_levels_byte_length());
data_page_header.__set_repetition_levels_byte_length(
page.repetition_levels_byte_length());
data_page_header.__set_is_compressed(page.is_compressed());
// Write page statistics only when page index is not enabled.
if (column_index_builder_ == nullptr) {
data_page_header.__set_statistics(ToThrift(page.statistics()));
}
page_header.__set_type(format::PageType::DATA_PAGE_V2);
page_header.__set_data_page_header_v2(data_page_header);
}
/// \brief Finish page index builders and update the stream offset to adjust
/// page offsets.
void FinishPageIndexes(int64_t final_position) {
if (column_index_builder_ != nullptr) {
column_index_builder_->Finish();
}
if (offset_index_builder_ != nullptr) {
offset_index_builder_->Finish(final_position);
}
}
bool has_compressor() override { return (compressor_ != nullptr); }
int64_t num_values() { return num_values_; }
int64_t dictionary_page_offset() { return dictionary_page_offset_; }
int64_t data_page_offset() { return data_page_offset_; }
int64_t total_compressed_size() { return total_compressed_size_; }
int64_t total_uncompressed_size() { return total_uncompressed_size_; }
int64_t total_compressed_bytes_written() const override {
return total_compressed_size_;
}
bool page_checksum_verification() { return page_checksum_verification_; }
private:
// To allow UpdateEncryption on Close
friend class BufferedPageWriter;
void InitEncryption() {
// Prepare the AAD for quick update later.
if (data_encryptor_ != nullptr) {
data_page_aad_ = encryption::CreateModuleAad(
data_encryptor_->file_aad(), encryption::kDataPage, row_group_ordinal_,
column_ordinal_, kNonPageOrdinal);
}
if (meta_encryptor_ != nullptr) {
data_page_header_aad_ = encryption::CreateModuleAad(
meta_encryptor_->file_aad(), encryption::kDataPageHeader, row_group_ordinal_,
column_ordinal_, kNonPageOrdinal);
}
}
void UpdateEncryption(int8_t module_type) {
switch (module_type) {
case encryption::kColumnMetaData: {
meta_encryptor_->UpdateAad(encryption::CreateModuleAad(
meta_encryptor_->file_aad(), module_type, row_group_ordinal_, column_ordinal_,
kNonPageOrdinal));
break;
}
case encryption::kDataPage: {
encryption::QuickUpdatePageAad(page_ordinal_, &data_page_aad_);
data_encryptor_->UpdateAad(data_page_aad_);
break;
}
case encryption::kDataPageHeader: {
encryption::QuickUpdatePageAad(page_ordinal_, &data_page_header_aad_);
meta_encryptor_->UpdateAad(data_page_header_aad_);
break;
}
case encryption::kDictionaryPageHeader: {
meta_encryptor_->UpdateAad(encryption::CreateModuleAad(
meta_encryptor_->file_aad(), module_type, row_group_ordinal_, column_ordinal_,
kNonPageOrdinal));
break;
}
case encryption::kDictionaryPage: {
data_encryptor_->UpdateAad(encryption::CreateModuleAad(
data_encryptor_->file_aad(), module_type, row_group_ordinal_, column_ordinal_,
kNonPageOrdinal));
break;
}
default:
throw ParquetException("Unknown module type in UpdateEncryption");
}
}
std::shared_ptr<ArrowOutputStream> sink_;
ColumnChunkMetaDataBuilder* metadata_;
MemoryPool* pool_;
int64_t num_values_;
int64_t dictionary_page_offset_;
int64_t data_page_offset_;
// The uncompressed page size the page writer has already
// written.
int64_t total_uncompressed_size_;
// The compressed page size the page writer has already
// written.
// If the column is UNCOMPRESSED, the size would be
// equal to `total_uncompressed_size_`.
int64_t total_compressed_size_;
int32_t page_ordinal_;
int16_t row_group_ordinal_;
int16_t column_ordinal_;
bool page_checksum_verification_;
std::unique_ptr<ThriftSerializer> thrift_serializer_;
// Compression codec to use.
std::unique_ptr<::arrow::util::Codec> compressor_;
std::string data_page_aad_;
std::string data_page_header_aad_;
std::shared_ptr<Encryptor> meta_encryptor_;
std::shared_ptr<Encryptor> data_encryptor_;
std::shared_ptr<ResizableBuffer> encryption_buffer_;
std::map<Encoding::type, int32_t> dict_encoding_stats_;
std::map<Encoding::type, int32_t> data_encoding_stats_;
ColumnIndexBuilder* column_index_builder_;
OffsetIndexBuilder* offset_index_builder_;
};
// This implementation of the PageWriter writes to the final sink on Close .
class BufferedPageWriter : public PageWriter {
public:
BufferedPageWriter(std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
ColumnChunkMetaDataBuilder* metadata, int16_t row_group_ordinal,
int16_t current_column_ordinal, bool use_page_checksum_verification,
MemoryPool* pool = ::arrow::default_memory_pool(),
std::shared_ptr<Encryptor> meta_encryptor = nullptr,
std::shared_ptr<Encryptor> data_encryptor = nullptr,
ColumnIndexBuilder* column_index_builder = nullptr,
OffsetIndexBuilder* offset_index_builder = nullptr,
const CodecOptions& codec_options = CodecOptions{})
: final_sink_(std::move(sink)), metadata_(metadata), has_dictionary_pages_(false) {
in_memory_sink_ = CreateOutputStream(pool);
pager_ = std::make_unique<SerializedPageWriter>(
in_memory_sink_, codec, metadata, row_group_ordinal, current_column_ordinal,
use_page_checksum_verification, pool, std::move(meta_encryptor),
std::move(data_encryptor), column_index_builder, offset_index_builder,
codec_options);
}
int64_t WriteDictionaryPage(const DictionaryPage& page) override {
has_dictionary_pages_ = true;
return pager_->WriteDictionaryPage(page);
}
void Close(bool has_dictionary, bool fallback) override {
if (pager_->meta_encryptor_ != nullptr) {
pager_->UpdateEncryption(encryption::kColumnMetaData);
}
// index_page_offset = -1 since they are not supported
PARQUET_ASSIGN_OR_THROW(int64_t final_position, final_sink_->Tell());
// dictionary page offset should be 0 iff there are no dictionary pages
auto dictionary_page_offset =
has_dictionary_pages_ ? pager_->dictionary_page_offset() + final_position : 0;
metadata_->Finish(pager_->num_values(), dictionary_page_offset, -1,
pager_->data_page_offset() + final_position,
pager_->total_compressed_size(), pager_->total_uncompressed_size(),
has_dictionary, fallback, pager_->dict_encoding_stats_,
pager_->data_encoding_stats_, pager_->meta_encryptor_);
// Buffered page writer needs to adjust page offsets.
pager_->FinishPageIndexes(final_position);
// flush everything to the serialized sink
PARQUET_ASSIGN_OR_THROW(auto buffer, in_memory_sink_->Finish());
PARQUET_THROW_NOT_OK(final_sink_->Write(buffer));
}
int64_t WriteDataPage(const DataPage& page) override {
return pager_->WriteDataPage(page);
}
void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override {
pager_->Compress(src_buffer, dest_buffer);
}
bool has_compressor() override { return pager_->has_compressor(); }
int64_t total_compressed_bytes_written() const override {
return pager_->total_compressed_bytes_written();
}
private:
std::shared_ptr<ArrowOutputStream> final_sink_;
ColumnChunkMetaDataBuilder* metadata_;
std::shared_ptr<::arrow::io::BufferOutputStream> in_memory_sink_;
std::unique_ptr<SerializedPageWriter> pager_;
bool has_dictionary_pages_;
};
std::unique_ptr<PageWriter> PageWriter::Open(
std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
ColumnChunkMetaDataBuilder* metadata, int16_t row_group_ordinal,
int16_t column_chunk_ordinal, MemoryPool* pool, bool buffered_row_group,
std::shared_ptr<Encryptor> meta_encryptor, std::shared_ptr<Encryptor> data_encryptor,
bool page_write_checksum_enabled, ColumnIndexBuilder* column_index_builder,
OffsetIndexBuilder* offset_index_builder, const CodecOptions& codec_options) {
if (buffered_row_group) {
return std::unique_ptr<PageWriter>(new BufferedPageWriter(
std::move(sink), codec, metadata, row_group_ordinal, column_chunk_ordinal,
page_write_checksum_enabled, pool, std::move(meta_encryptor),
std::move(data_encryptor), column_index_builder, offset_index_builder,
codec_options));
} else {
return std::unique_ptr<PageWriter>(new SerializedPageWriter(
std::move(sink), codec, metadata, row_group_ordinal, column_chunk_ordinal,
page_write_checksum_enabled, pool, std::move(meta_encryptor),
std::move(data_encryptor), column_index_builder, offset_index_builder,
codec_options));
}
}
// ----------------------------------------------------------------------
// ColumnWriter
const std::shared_ptr<WriterProperties>& default_writer_properties() {
static std::shared_ptr<WriterProperties> default_writer_properties =
WriterProperties::Builder().build();
return default_writer_properties;
}
class ColumnWriterImpl {
public:
ColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata,
std::unique_ptr<PageWriter> pager, const bool use_dictionary,
Encoding::type encoding, const WriterProperties* properties)
: metadata_(metadata),
descr_(metadata->descr()),
level_info_(internal::LevelInfo::ComputeLevelInfo(metadata->descr())),
pager_(std::move(pager)),
has_dictionary_(use_dictionary),
encoding_(encoding),
properties_(properties),
allocator_(properties->memory_pool()),
num_buffered_values_(0),
num_buffered_encoded_values_(0),
num_buffered_nulls_(0),
num_buffered_rows_(0),
rows_written_(0),
total_bytes_written_(0),
total_compressed_bytes_(0),
closed_(false),
fallback_(false),
definition_levels_sink_(allocator_),
repetition_levels_sink_(allocator_) {
definition_levels_rle_ =
std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
repetition_levels_rle_ =
std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
uncompressed_data_ =
std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
if (pager_->has_compressor()) {
compressor_temp_buffer_ =
std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
}
if (properties_->content_defined_chunking_enabled()) {
auto cdc_options = properties_->content_defined_chunking_options();
content_defined_chunker_.emplace(level_info_, cdc_options.min_chunk_size,
cdc_options.max_chunk_size,
cdc_options.norm_level);
}
}
virtual ~ColumnWriterImpl() = default;
int64_t Close();
protected:
virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0;
// Serializes Dictionary Page if enabled
virtual void WriteDictionaryPage() = 0;
// A convenience struct to combine the encoded statistics and size statistics
struct StatisticsPair {
EncodedStatistics encoded_stats;
SizeStatistics size_stats;
};
// Plain-encoded statistics of the current page
virtual StatisticsPair GetPageStatistics() = 0;
// Plain-encoded statistics of the whole chunk
virtual StatisticsPair GetChunkStatistics() = 0;
// Geospatial statistics of the whole chunk
virtual std::optional<geospatial::EncodedGeoStatistics> GetChunkGeoStatistics() = 0;
// Merges page statistics into chunk statistics, then resets the values
virtual void ResetPageStatistics() = 0;
// Adds Data Pages to an in memory buffer in dictionary encoding mode
// Serializes the Data Pages in other encoding modes
void AddDataPage();
void BuildDataPageV1(int64_t definition_levels_rle_size,
int64_t repetition_levels_rle_size, int64_t uncompressed_size,
const std::shared_ptr<Buffer>& values);
void BuildDataPageV2(int64_t definition_levels_rle_size,
int64_t repetition_levels_rle_size, int64_t uncompressed_size,
const std::shared_ptr<Buffer>& values);
// Serializes Data Pages
void WriteDataPage(const DataPage& page) {
total_bytes_written_ += pager_->WriteDataPage(page);
}
// Write multiple definition levels
void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
DCHECK(!closed_);
PARQUET_THROW_NOT_OK(
definition_levels_sink_.Append(levels, sizeof(int16_t) * num_levels));
}
// Write multiple repetition levels
void WriteRepetitionLevels(int64_t num_levels, const int16_t* levels) {
DCHECK(!closed_);
PARQUET_THROW_NOT_OK(
repetition_levels_sink_.Append(levels, sizeof(int16_t) * num_levels));
}
// RLE encode the src_buffer into dest_buffer and return the encoded size
int64_t RleEncodeLevels(const void* src_buffer, ResizableBuffer* dest_buffer,
int16_t max_level, bool include_length_prefix = true);
// Serialize the buffered Data Pages
void FlushBufferedDataPages();
ColumnChunkMetaDataBuilder* metadata_;
// key_value_metadata_ for the column chunk
// It would be nullptr if there is no KeyValueMetadata set.
std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
const ColumnDescriptor* descr_;
// scratch buffer if validity bits need to be recalculated.
std::shared_ptr<ResizableBuffer> bits_buffer_;
const internal::LevelInfo level_info_;
std::unique_ptr<PageWriter> pager_;
bool has_dictionary_;
Encoding::type encoding_;
const WriterProperties* properties_;
LevelEncoder level_encoder_;
MemoryPool* allocator_;
// The total number of values stored in the data page. This is the maximum of
// the number of encoded definition levels or encoded values. For
// non-repeated, required columns, this is equal to the number of encoded
// values. For repeated or optional values, there may be fewer data values
// than levels, and this tells you how many encoded levels there are in that
// case.
int64_t num_buffered_values_;
// The total number of stored values in the data page. For repeated or optional
// values, this number may be lower than num_buffered_values_.
int64_t num_buffered_encoded_values_;
// The total number of nulls stored in the data page.
int64_t num_buffered_nulls_;
// Total number of rows buffered in the data page.
int64_t num_buffered_rows_;
// Total number of rows written with this ColumnWriter
int64_t rows_written_;
// Records the total number of uncompressed bytes written by the serializer
int64_t total_bytes_written_;
// Records the current number of compressed bytes in a column
// These bytes are unwritten to `pager_` yet
int64_t total_compressed_bytes_;
// Flag to check if the Writer has been closed
bool closed_;
// Flag to infer if dictionary encoding has fallen back to PLAIN
bool fallback_;
::arrow::BufferBuilder definition_levels_sink_;
::arrow::BufferBuilder repetition_levels_sink_;
std::shared_ptr<ResizableBuffer> definition_levels_rle_;
std::shared_ptr<ResizableBuffer> repetition_levels_rle_;
std::shared_ptr<ResizableBuffer> uncompressed_data_;
std::shared_ptr<ResizableBuffer> compressor_temp_buffer_;
std::vector<std::unique_ptr<DataPage>> data_pages_;
std::optional<internal::ContentDefinedChunker> content_defined_chunker_;
private:
void InitSinks() {
definition_levels_sink_.Rewind(0);
repetition_levels_sink_.Rewind(0);
}
// Concatenate the encoded levels and values into one buffer
void ConcatenateBuffers(int64_t definition_levels_rle_size,
int64_t repetition_levels_rle_size,
const std::shared_ptr<Buffer>& values, uint8_t* combined) {
memcpy(combined, repetition_levels_rle_->data(),
static_cast<size_t>(repetition_levels_rle_size));
combined += repetition_levels_rle_size;
memcpy(combined, definition_levels_rle_->data(),
static_cast<size_t>(definition_levels_rle_size));
combined += definition_levels_rle_size;
memcpy(combined, values->data(), static_cast<size_t>(values->size()));
}
};
// return the size of the encoded buffer
int64_t ColumnWriterImpl::RleEncodeLevels(const void* src_buffer,
ResizableBuffer* dest_buffer, int16_t max_level,
bool include_length_prefix) {
// V1 DataPage includes the length of the RLE level as a prefix.
int32_t prefix_size = include_length_prefix ? sizeof(int32_t) : 0;
// TODO: This only works with due to some RLE specifics
int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level,
static_cast<int>(num_buffered_values_)) +
prefix_size;
// Use Arrow::Buffer::shrink_to_fit = false
// underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
PARQUET_THROW_NOT_OK(dest_buffer->Resize(rle_size, false));
level_encoder_.Init(Encoding::RLE, max_level, static_cast<int>(num_buffered_values_),
dest_buffer->mutable_data() + prefix_size,
static_cast<int>(dest_buffer->size() - prefix_size));
int encoded = level_encoder_.Encode(static_cast<int>(num_buffered_values_),
reinterpret_cast<const int16_t*>(src_buffer));
DCHECK_EQ(encoded, num_buffered_values_);
if (include_length_prefix) {
reinterpret_cast<int32_t*>(dest_buffer->mutable_data())[0] = level_encoder_.len();
}
return level_encoder_.len() + prefix_size;
}
void ColumnWriterImpl::AddDataPage() {
int64_t definition_levels_rle_size = 0;
int64_t repetition_levels_rle_size = 0;
std::shared_ptr<Buffer> values = GetValuesBuffer();
bool is_v1_data_page = properties_->data_page_version() == ParquetDataPageVersion::V1;
if (descr_->max_definition_level() > 0) {
definition_levels_rle_size = RleEncodeLevels(
definition_levels_sink_.data(), definition_levels_rle_.get(),
descr_->max_definition_level(), /*include_length_prefix=*/is_v1_data_page);
}
if (descr_->max_repetition_level() > 0) {
repetition_levels_rle_size = RleEncodeLevels(
repetition_levels_sink_.data(), repetition_levels_rle_.get(),
descr_->max_repetition_level(), /*include_length_prefix=*/is_v1_data_page);
}
int64_t uncompressed_size =
definition_levels_rle_size + repetition_levels_rle_size + values->size();
if (is_v1_data_page) {
BuildDataPageV1(definition_levels_rle_size, repetition_levels_rle_size,
uncompressed_size, values);
} else {
BuildDataPageV2(definition_levels_rle_size, repetition_levels_rle_size,
uncompressed_size, values);
}
// Re-initialize the sinks for next Page.
InitSinks();
num_buffered_values_ = 0;
num_buffered_encoded_values_ = 0;
num_buffered_rows_ = 0;
num_buffered_nulls_ = 0;
}
void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size,
int64_t repetition_levels_rle_size,
int64_t uncompressed_size,
const std::shared_ptr<Buffer>& values) {
// Use Arrow::Buffer::shrink_to_fit = false
// underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
PARQUET_THROW_NOT_OK(uncompressed_data_->Resize(uncompressed_size, false));
ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size, values,
uncompressed_data_->mutable_data());
auto [page_stats, page_size_stats] = GetPageStatistics();
page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
ResetPageStatistics();
std::shared_ptr<Buffer> compressed_data;
if (pager_->has_compressor()) {
pager_->Compress(*(uncompressed_data_.get()), compressor_temp_buffer_.get());
compressed_data = compressor_temp_buffer_;
} else {
compressed_data = uncompressed_data_;
}
int32_t num_values = static_cast<int32_t>(num_buffered_values_);
int64_t first_row_index = rows_written_ - num_buffered_rows_;
// Write the page to OutputStream eagerly if there is no dictionary or
// if dictionary encoding has fallen back to PLAIN
if (has_dictionary_ && !fallback_) { // Save pages until end of dictionary encoding
PARQUET_ASSIGN_OR_THROW(
auto compressed_data_copy,
compressed_data->CopySlice(0, compressed_data->size(), allocator_));
std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV1>(
compressed_data_copy, num_values, encoding_, Encoding::RLE, Encoding::RLE,
uncompressed_size, std::move(page_stats), first_row_index,
std::move(page_size_stats));
total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
data_pages_.push_back(std::move(page_ptr));
} else { // Eagerly write pages
DataPageV1 page(compressed_data, num_values, encoding_, Encoding::RLE, Encoding::RLE,
uncompressed_size, std::move(page_stats), first_row_index,
std::move(page_size_stats));
WriteDataPage(page);
}
}
void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size,
int64_t repetition_levels_rle_size,
int64_t uncompressed_size,
const std::shared_ptr<Buffer>& values) {
// Compress the values if needed. Repetition and definition levels are uncompressed in
// V2.
bool page_is_compressed = false;
if (pager_->has_compressor() && values->size() > 0) {
pager_->Compress(*values, compressor_temp_buffer_.get());
if (compressor_temp_buffer_->size() < values->size()) {
page_is_compressed = true;
}
}
std::shared_ptr<Buffer> compressed_values =
(page_is_compressed ? compressor_temp_buffer_ : values);
// Concatenate uncompressed levels and the possibly compressed values
int64_t combined_size =
definition_levels_rle_size + repetition_levels_rle_size + compressed_values->size();
std::shared_ptr<ResizableBuffer> combined = AllocateBuffer(allocator_, combined_size);
ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size,
compressed_values, combined->mutable_data());
auto [page_stats, page_size_stats] = GetPageStatistics();
page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
ResetPageStatistics();
int32_t num_values = static_cast<int32_t>(num_buffered_values_);
int32_t null_count = static_cast<int32_t>(num_buffered_nulls_);
int32_t num_rows = static_cast<int32_t>(num_buffered_rows_);
int32_t def_levels_byte_length = static_cast<int32_t>(definition_levels_rle_size);
int32_t rep_levels_byte_length = static_cast<int32_t>(repetition_levels_rle_size);
int64_t first_row_index = rows_written_ - num_buffered_rows_;
// page_stats.null_count is not set when page_statistics_ is nullptr. It is only used
// here for safety check.
DCHECK(!page_stats.has_null_count || page_stats.null_count == null_count);
// Write the page to OutputStream eagerly if there is no dictionary or
// if dictionary encoding has fallen back to PLAIN
if (has_dictionary_ && !fallback_) { // Save pages until end of dictionary encoding
PARQUET_ASSIGN_OR_THROW(auto data_copy,
combined->CopySlice(0, combined->size(), allocator_));
std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV2>(
combined, num_values, null_count, num_rows, encoding_, def_levels_byte_length,
rep_levels_byte_length, uncompressed_size, page_is_compressed,
std::move(page_stats), first_row_index, std::move(page_size_stats));
total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
data_pages_.push_back(std::move(page_ptr));
} else {
DataPageV2 page(combined, num_values, null_count, num_rows, encoding_,
def_levels_byte_length, rep_levels_byte_length, uncompressed_size,
page_is_compressed, std::move(page_stats), first_row_index,
std::move(page_size_stats));
WriteDataPage(page);
}
}
int64_t ColumnWriterImpl::Close() {
if (!closed_) {
closed_ = true;
if (has_dictionary_ && !fallback_) {
WriteDictionaryPage();
}
FlushBufferedDataPages();
auto [chunk_statistics, chunk_size_statistics] = GetChunkStatistics();
chunk_statistics.ApplyStatSizeLimits(
properties_->max_statistics_size(descr_->path()));
chunk_statistics.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
// Write stats only if the column has at least one row written
if (rows_written_ > 0 && chunk_statistics.is_set()) {
metadata_->SetStatistics(chunk_statistics);
}
if (rows_written_ > 0 && chunk_size_statistics.is_set()) {
metadata_->SetSizeStatistics(chunk_size_statistics);
}
if (descr_->logical_type() != nullptr && descr_->logical_type()->is_geometry()) {
std::optional<geospatial::EncodedGeoStatistics> geo_stats = GetChunkGeoStatistics();
if (geo_stats) {
metadata_->SetGeoStatistics(std::move(*geo_stats));
}
}
metadata_->SetKeyValueMetadata(key_value_metadata_);
pager_->Close(has_dictionary_, fallback_);
}
return total_bytes_written_;
}
void ColumnWriterImpl::FlushBufferedDataPages() {
// Write all outstanding data to a new page
if (num_buffered_values_ > 0) {
AddDataPage();
}
for (const auto& page_ptr : data_pages_) {
WriteDataPage(*page_ptr);
}
data_pages_.clear();
total_compressed_bytes_ = 0;
}
// ----------------------------------------------------------------------
// TypedColumnWriter
// DoInBatches for non-repeated columns
template <typename Action, typename GetBufferedRows>
inline void DoInBatchesNonRepeated(int64_t num_levels, int64_t batch_size,
int64_t max_rows_per_page, Action&& action,
GetBufferedRows&& curr_page_buffered_rows) {
int64_t offset = 0;
while (offset < num_levels) {
int64_t page_buffered_rows = curr_page_buffered_rows();
ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page);
// Every record contains only one level.
int64_t max_batch_size = std::min(batch_size, num_levels - offset);
max_batch_size = std::min(max_batch_size, max_rows_per_page - page_buffered_rows);
int64_t end_offset = offset + max_batch_size;
ARROW_DCHECK_LE(offset, end_offset);
ARROW_DCHECK_LE(end_offset, num_levels);
// Always check page limit for non-repeated columns.
action(offset, end_offset - offset, /*check_page_limit=*/true);
offset = end_offset;
}
}
// DoInBatches for repeated columns
template <typename Action, typename GetBufferedRows>
inline void DoInBatchesRepeated(const int16_t* def_levels, const int16_t* rep_levels,
int64_t num_levels, int64_t batch_size,
int64_t max_rows_per_page,
bool pages_change_on_record_boundaries, Action&& action,
GetBufferedRows&& curr_page_buffered_rows) {
int64_t offset = 0;
while (offset < num_levels) {
int64_t max_batch_size = std::min(batch_size, num_levels - offset);
int64_t end_offset = num_levels; // end offset of the current batch
int64_t check_page_limit_end_offset = -1; // offset to check page limit (if not -1)
int64_t page_buffered_rows = curr_page_buffered_rows();
ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page);
// Iterate rep_levels to find the shortest sequence that ends before a record
// boundary (i.e. rep_levels == 0) with a size no less than max_batch_size
for (int64_t i = offset; i < num_levels; ++i) {
if (rep_levels[i] == 0) {
// Use the beginning of last record to check page limit.
check_page_limit_end_offset = i;
if (i - offset >= max_batch_size || page_buffered_rows >= max_rows_per_page) {
end_offset = i;
break;
}
page_buffered_rows += 1;
}
}
ARROW_DCHECK_LE(offset, end_offset);
ARROW_DCHECK_LE(check_page_limit_end_offset, end_offset);
if (check_page_limit_end_offset >= 0) {
// At least one record boundary is included in this batch.
// It is a good chance to check the page limit.
action(offset, check_page_limit_end_offset - offset, /*check_page_limit=*/true);
offset = check_page_limit_end_offset;
}
if (end_offset > offset) {
// The is the last chunk of batch, and we do not know whether end_offset is a
// record boundary so we cannot check page limit if pages cannot change on
// record boundaries.
ARROW_DCHECK_EQ(end_offset, num_levels);
action(offset, end_offset - offset,
/*check_page_limit=*/!pages_change_on_record_boundaries);
}
offset = end_offset;
}
}
template <typename Action, typename GetBufferedRows>
inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
int64_t num_levels, int64_t batch_size, int64_t max_rows_per_page,
bool pages_change_on_record_boundaries, Action&& action,
GetBufferedRows&& curr_page_buffered_rows) {
if (!rep_levels) {
DoInBatchesNonRepeated(num_levels, batch_size, max_rows_per_page,
std::forward<Action>(action),
std::forward<GetBufferedRows>(curr_page_buffered_rows));
} else {
DoInBatchesRepeated(def_levels, rep_levels, num_levels, batch_size, max_rows_per_page,
pages_change_on_record_boundaries, std::forward<Action>(action),
std::forward<GetBufferedRows>(curr_page_buffered_rows));
}
}
namespace {
bool DictionaryDirectWriteSupported(const ::arrow::Array& array) {
DCHECK_EQ(array.type_id(), ::arrow::Type::DICTIONARY);
const ::arrow::DictionaryType& dict_type =
static_cast<const ::arrow::DictionaryType&>(*array.type());
return ::arrow::is_base_binary_like(dict_type.value_type()->id());
}
Status ConvertDictionaryToDense(const ::arrow::Array& array, MemoryPool* pool,
std::shared_ptr<::arrow::Array>* out) {
const ::arrow::DictionaryType& dict_type =
static_cast<const ::arrow::DictionaryType&>(*array.type());
::arrow::compute::ExecContext ctx(pool);
ARROW_ASSIGN_OR_RAISE(Datum cast_output,
::arrow::compute::Cast(array.data(), dict_type.value_type(),
::arrow::compute::CastOptions(), &ctx));
*out = cast_output.make_array();
return Status::OK();
}
} // namespace
template <typename ParquetType>
class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<ParquetType> {
public:
using T = typename ParquetType::c_type;
TypedColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata,
std::unique_ptr<PageWriter> pager, const bool use_dictionary,
Encoding::type encoding, const WriterProperties* properties,
BloomFilter* bloom_filter)
: ColumnWriterImpl(metadata, std::move(pager), use_dictionary, encoding,
properties) {
current_encoder_ = MakeEncoder(ParquetType::type_num, encoding, use_dictionary,
descr_, properties->memory_pool());
// We have to dynamic_cast as some compilers don't want to static_cast
// through virtual inheritance.
current_value_encoder_ =
dynamic_cast<TypedEncoder<ParquetType>*>(current_encoder_.get());
// Will be null if not using dictionary, but that's ok
current_dict_encoder_ =
dynamic_cast<DictEncoder<ParquetType>*>(current_encoder_.get());
if (bloom_filter != nullptr) {
bloom_filter_writer_ = std::make_unique<BloomFilterWriter>(descr_, bloom_filter);
}
// GH-46205: Geometry/Geography are the first non-nested logical types to have a
// SortOrder::UNKNOWN. Currently, the presence of statistics is tied to
// having a known sort order and so null counts will be missing.
if (properties->statistics_enabled(descr_->path())) {
if (SortOrder::UNKNOWN != descr_->sort_order()) {
page_statistics_ = MakeStatistics<ParquetType>(descr_, allocator_);
chunk_statistics_ = MakeStatistics<ParquetType>(descr_, allocator_);
}
if (descr_->logical_type() != nullptr && descr_->logical_type()->is_geometry()) {
chunk_geospatial_statistics_ = std::make_shared<geospatial::GeoStatistics>();
}
}
if (properties->size_statistics_level() == SizeStatisticsLevel::ColumnChunk ||
properties->size_statistics_level() == SizeStatisticsLevel::PageAndColumnChunk) {
page_size_statistics_ = SizeStatistics::Make(descr_);
chunk_size_statistics_ = SizeStatistics::Make(descr_);
}
pages_change_on_record_boundaries_ =
properties->data_page_version() == ParquetDataPageVersion::V2 ||
properties->page_index_enabled(descr_->path());
}
int64_t Close() override { return ColumnWriterImpl::Close(); }
int64_t WriteBatch(int64_t num_values, const int16_t* def_levels,
const int16_t* rep_levels, const T* values) override {
if (ARROW_PREDICT_FALSE(properties_->content_defined_chunking_enabled())) {
throw ParquetException(
"Content-defined chunking is not supported in WriteBatch() or "
"WriteBatchSpaced(), use WriteArrow() instead.");
}
return WriteBatchInternal(num_values, def_levels, rep_levels, values);
}
int64_t WriteBatchInternal(int64_t num_values, const int16_t* def_levels,
const int16_t* rep_levels, const T* values) {
// We check for DataPage limits only after we have inserted the values. If a user
// writes a large number of values, the DataPage size can be much above the limit.
// The purpose of this chunking is to bound this. Even if a user writes large number
// of values, the chunking will ensure the AddDataPage() is called at a reasonable
// pagesize limit
int64_t value_offset = 0;
auto WriteChunk = [&](int64_t offset, int64_t batch_size, bool check_page) {
int64_t values_to_write = WriteLevels(batch_size, AddIfNotNull(def_levels, offset),
AddIfNotNull(rep_levels, offset));
// PARQUET-780
if (values_to_write > 0) {
DCHECK_NE(nullptr, values);
}
const int64_t num_nulls = batch_size - values_to_write;
WriteValues(AddIfNotNull(values, value_offset), values_to_write, num_nulls);
CommitWriteAndCheckPageLimit(batch_size, values_to_write, num_nulls, check_page);
value_offset += values_to_write;
// Dictionary size checked separately from data page size since we
// circumvent this check when writing ::arrow::DictionaryArray directly
CheckDictionarySizeLimit();
};
DoInBatches(def_levels, rep_levels, num_values, properties_->write_batch_size(),
properties_->max_rows_per_page(), pages_change_on_record_boundaries(),
WriteChunk, [this]() { return num_buffered_rows_; });
return value_offset;
}
void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
const int16_t* rep_levels, const uint8_t* valid_bits,
int64_t valid_bits_offset, const T* values) override {
if (ARROW_PREDICT_FALSE(properties_->content_defined_chunking_enabled())) {
throw ParquetException(
"Content-defined chunking is not supported in WriteBatch() or "
"WriteBatchSpaced(), use WriteArrow() instead.");
}
return WriteBatchSpacedInternal(num_values, def_levels, rep_levels, valid_bits,
valid_bits_offset, values);
}
void WriteBatchSpacedInternal(int64_t num_values, const int16_t* def_levels,
const int16_t* rep_levels, const uint8_t* valid_bits,
int64_t valid_bits_offset, const T* values) {
// Like WriteBatch, but for spaced values
int64_t value_offset = 0;
auto WriteChunk = [&](int64_t offset, int64_t batch_size, bool check_page) {
int64_t batch_num_values = 0;
int64_t batch_num_spaced_values = 0;
int64_t null_count;
MaybeCalculateValidityBits(AddIfNotNull(def_levels, offset), batch_size,
&batch_num_values, &batch_num_spaced_values,
&null_count);
WriteLevelsSpaced(batch_size, AddIfNotNull(def_levels, offset),
AddIfNotNull(rep_levels, offset));
if (bits_buffer_ != nullptr) {
WriteValuesSpaced(AddIfNotNull(values, value_offset), batch_num_values,
batch_num_spaced_values, bits_buffer_->data(), /*offset=*/0,
/*num_levels=*/batch_size, null_count);
} else {
WriteValuesSpaced(AddIfNotNull(values, value_offset), batch_num_values,
batch_num_spaced_values, valid_bits,
valid_bits_offset + value_offset, /*num_levels=*/batch_size,
null_count);
}
CommitWriteAndCheckPageLimit(batch_size, batch_num_spaced_values, null_count,
check_page);
value_offset += batch_num_spaced_values;
// Dictionary size checked separately from data page size since we
// circumvent this check when writing ::arrow::DictionaryArray directly
CheckDictionarySizeLimit();
};
DoInBatches(def_levels, rep_levels, num_values, properties_->write_batch_size(),
properties_->max_rows_per_page(), pages_change_on_record_boundaries(),
WriteChunk, [this]() { return num_buffered_rows_; });
}
Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
int64_t num_levels, const ::arrow::Array& leaf_array,
ArrowWriteContext* ctx, bool leaf_field_nullable) override {
BEGIN_PARQUET_CATCH_EXCEPTIONS
// Leaf nulls are canonical when there is only a single null element after a list
// and it is at the leaf.
bool single_nullable_element =
(level_info_.def_level == level_info_.repeated_ancestor_def_level + 1) &&
leaf_field_nullable;
if (!leaf_field_nullable && leaf_array.null_count() != 0) {
return Status::Invalid("Column '", descr_->name(),
"' is declared non-nullable but contains nulls");
}
bool maybe_parent_nulls = level_info_.HasNullableValues() && !single_nullable_element;
if (maybe_parent_nulls) {
ARROW_ASSIGN_OR_RAISE(
bits_buffer_,
::arrow::AllocateResizableBuffer(
bit_util::BytesForBits(properties_->write_batch_size()), ctx->memory_pool));
bits_buffer_->ZeroPadding();
}
if (ARROW_PREDICT_FALSE(properties_->content_defined_chunking_enabled())) {
DCHECK(content_defined_chunker_.has_value());
auto chunks = content_defined_chunker_->GetChunks(def_levels, rep_levels,
num_levels, leaf_array);
for (size_t i = 0; i < chunks.size(); i++) {
auto chunk = chunks[i];
auto chunk_array = leaf_array.Slice(chunk.value_offset);
auto chunk_def_levels = AddIfNotNull(def_levels, chunk.level_offset);
auto chunk_rep_levels = AddIfNotNull(rep_levels, chunk.level_offset);
if (leaf_array.type()->id() == ::arrow::Type::DICTIONARY) {
ARROW_CHECK_OK(WriteArrowDictionary(chunk_def_levels, chunk_rep_levels,
chunk.levels_to_write, *chunk_array, ctx,
maybe_parent_nulls));
} else {
ARROW_CHECK_OK(WriteArrowDense(chunk_def_levels, chunk_rep_levels,
chunk.levels_to_write, *chunk_array, ctx,
maybe_parent_nulls));
}
bool is_last_chunk = i == (chunks.size() - 1);
if (num_buffered_values_ > 0 && !is_last_chunk) {
// Explicitly add a new data page according to the content-defined chunk
// boundaries. This way the same chunks will have the same byte-sequence
// in the resulting file, which can be identified by content addressible
// storage.
// Note that the last chunk doesn't trigger a new data page in order to
// allow subsequent WriteArrow() calls to continue writing to the same
// data page, the chunker's state is not being reset after the last chunk.
AddDataPage();
}
}
return Status::OK();
} else {
if (leaf_array.type()->id() == ::arrow::Type::DICTIONARY) {
return WriteArrowDictionary(def_levels, rep_levels, num_levels, leaf_array, ctx,
maybe_parent_nulls);
} else {
return WriteArrowDense(def_levels, rep_levels, num_levels, leaf_array, ctx,
maybe_parent_nulls);
}
}
END_PARQUET_CATCH_EXCEPTIONS
}
int64_t estimated_buffered_value_bytes() const override {
return current_encoder_->EstimatedDataEncodedSize();
}
protected:
std::shared_ptr<Buffer> GetValuesBuffer() override {
return current_encoder_->FlushValues();
}
// Internal function to handle direct writing of ::arrow::DictionaryArray,
// since the standard logic concerning dictionary size limits and fallback to
// plain encoding is circumvented
Status WriteArrowDictionary(const int16_t* def_levels, const int16_t* rep_levels,
int64_t num_levels, const ::arrow::Array& array,
ArrowWriteContext* context, bool maybe_parent_nulls);
Status WriteArrowDense(const int16_t* def_levels, const int16_t* rep_levels,
int64_t num_levels, const ::arrow::Array& array,
ArrowWriteContext* context, bool maybe_parent_nulls);
template <typename ArrowType>
Status WriteArrowSerialize(const int16_t* def_levels, const int16_t* rep_levels,
int64_t num_levels, const ::arrow::Array& array,
ArrowWriteContext* ctx, bool maybe_parent_nulls);
Status WriteArrowZeroCopy(const int16_t* def_levels, const int16_t* rep_levels,
int64_t num_levels, const ::arrow::Array& array,
ArrowWriteContext* ctx, bool maybe_parent_nulls) {
const auto& data = checked_cast<const ::arrow::PrimitiveArray&>(array);
const T* values = data.data()->GetValues<T>(1);
bool no_nulls =
this->descr()->schema_node()->is_required() || (array.null_count() == 0);
if (!maybe_parent_nulls && no_nulls) {
PARQUET_CATCH_NOT_OK(
WriteBatchInternal(num_levels, def_levels, rep_levels, values));
} else {
PARQUET_CATCH_NOT_OK(WriteBatchSpacedInternal(num_levels, def_levels, rep_levels,
data.null_bitmap_data(),
data.offset(), values));
}
return Status::OK();
}
Status WriteArrowTimestamps(const int16_t* def_levels, const int16_t* rep_levels,
int64_t num_levels, const ::arrow::Array& values,
ArrowWriteContext* ctx, bool maybe_parent_nulls) {
return Status::NotImplemented("Timestamps writing is only implemented for Int64Type");
}
void WriteDictionaryPage() override {
DCHECK(current_dict_encoder_);
std::shared_ptr<ResizableBuffer> buffer = AllocateBuffer(
properties_->memory_pool(), current_dict_encoder_->dict_encoded_size());
current_dict_encoder_->WriteDict(buffer->mutable_data());
DictionaryPage page(buffer, current_dict_encoder_->num_entries(),
properties_->dictionary_page_encoding());
total_bytes_written_ += pager_->WriteDictionaryPage(page);
}
StatisticsPair GetPageStatistics() override {
StatisticsPair result;
if (page_statistics_) {
result.encoded_stats = page_statistics_->Encode();
}
if (properties_->size_statistics_level() == SizeStatisticsLevel::PageAndColumnChunk) {
ARROW_DCHECK(page_size_statistics_ != nullptr);
result.size_stats = *page_size_statistics_;
}
return result;
}
StatisticsPair GetChunkStatistics() override {
StatisticsPair result;
if (chunk_statistics_) {
result.encoded_stats = chunk_statistics_->Encode();
}
if (chunk_size_statistics_) {
result.size_stats = *chunk_size_statistics_;
}
return result;
}
std::optional<geospatial::EncodedGeoStatistics> GetChunkGeoStatistics() override {
if (chunk_geospatial_statistics_) {
return chunk_geospatial_statistics_->Encode();
} else {
return std::nullopt;
}
}
void ResetPageStatistics() override {
if (chunk_statistics_ != nullptr) {
chunk_statistics_->Merge(*page_statistics_);
page_statistics_->Reset();
}
if (page_size_statistics_ != nullptr) {
chunk_size_statistics_->Merge(*page_size_statistics_);
page_size_statistics_->Reset();
}
}
Type::type type() const override { return descr_->physical_type(); }
const ColumnDescriptor* descr() const override { return descr_; }
int64_t rows_written() const override { return rows_written_; }
int64_t total_compressed_bytes() const override { return total_compressed_bytes_; }
int64_t total_bytes_written() const override { return total_bytes_written_; }
int64_t total_compressed_bytes_written() const override {
return pager_->total_compressed_bytes_written();
}
const WriterProperties* properties() override { return properties_; }
bool pages_change_on_record_boundaries() const {
return pages_change_on_record_boundaries_;
}
void AddKeyValueMetadata(
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) override {
if (closed_) {
throw ParquetException("Cannot add key-value metadata to closed column");
}
if (key_value_metadata_ == nullptr) {
key_value_metadata_ = key_value_metadata;
} else if (key_value_metadata != nullptr) {
key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata);
}
}
void ResetKeyValueMetadata() override {
if (closed_) {
throw ParquetException("Cannot add key-value metadata to closed column");
}
key_value_metadata_ = nullptr;
}
private:
using ValueEncoderType = typename EncodingTraits<ParquetType>::Encoder;
using TypedStats = TypedStatistics<ParquetType>;
using BloomFilterWriter = TypedBloomFilterWriter<ParquetType>;
std::unique_ptr<Encoder> current_encoder_;
// Downcasted observers of current_encoder_.
// The downcast is performed once as opposed to at every use since
// dynamic_cast is so expensive, and static_cast is not available due
// to virtual inheritance.
ValueEncoderType* current_value_encoder_;
DictEncoder<ParquetType>* current_dict_encoder_;
std::shared_ptr<TypedStats> page_statistics_;
std::shared_ptr<TypedStats> chunk_statistics_;
std::unique_ptr<SizeStatistics> page_size_statistics_;
std::shared_ptr<SizeStatistics> chunk_size_statistics_;
std::shared_ptr<geospatial::GeoStatistics> chunk_geospatial_statistics_;
std::unique_ptr<BloomFilterWriter> bloom_filter_writer_;
bool pages_change_on_record_boundaries_;
// If writing a sequence of ::arrow::DictionaryArray to the writer, we keep the
// dictionary passed to DictEncoder<T>::PutDictionary so we can check
// subsequent array chunks to see either if materialization is required (in
// which case we call back to the dense write path)
std::shared_ptr<::arrow::Array> preserved_dictionary_;
int64_t WriteLevels(int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels) {
// Update histograms now, to maximize cache efficiency.
UpdateLevelHistogram(num_levels, def_levels, rep_levels);
int64_t values_to_write = 0;
// If the field is required and non-repeated, there are no definition levels
if (descr_->max_definition_level() > 0) {
for (int64_t i = 0; i < num_levels; ++i) {
if (def_levels[i] == descr_->max_definition_level()) {
++values_to_write;
}
}
WriteDefinitionLevels(num_levels, def_levels);
} else {
// Required field, write all values
values_to_write = num_levels;
}
// Not present for non-repeated fields
if (descr_->max_repetition_level() > 0) {
// A row could include more than one value
// Count the occasions where we start a new row
for (int64_t i = 0; i < num_levels; ++i) {
if (rep_levels[i] == 0) {
rows_written_++;
num_buffered_rows_++;
}
}
WriteRepetitionLevels(num_levels, rep_levels);
} else {
// Each value is exactly one row
rows_written_ += num_levels;
num_buffered_rows_ += num_levels;
}
return values_to_write;
}
// This method will always update the three output parameters,
// out_values_to_write, out_spaced_values_to_write and null_count. Additionally
// it will update the validity bitmap if required (i.e. if at least one level
// of nullable structs directly precede the leaf node).
void MaybeCalculateValidityBits(const int16_t* def_levels, int64_t batch_size,
int64_t* out_values_to_write,
int64_t* out_spaced_values_to_write,
int64_t* null_count) {
if (bits_buffer_ == nullptr) {
if (level_info_.def_level == 0) {
// In this case def levels should be null and we only
// need to output counts which will always be equal to
// the batch size passed in (max def_level == 0 indicates
// there cannot be repeated or null fields).
DCHECK_EQ(def_levels, nullptr);
*out_values_to_write = batch_size;
*out_spaced_values_to_write = batch_size;
*null_count = 0;
} else {
for (int x = 0; x < batch_size; x++) {
*out_values_to_write += def_levels[x] == level_info_.def_level ? 1 : 0;
*out_spaced_values_to_write +=
def_levels[x] >= level_info_.repeated_ancestor_def_level ? 1 : 0;
}
*null_count = batch_size - *out_values_to_write;
}
return;
}
// Shrink to fit possible causes another allocation, and would only be necessary
// on the last batch.
int64_t new_bitmap_size = bit_util::BytesForBits(batch_size);
if (new_bitmap_size != bits_buffer_->size()) {
PARQUET_THROW_NOT_OK(
bits_buffer_->Resize(new_bitmap_size, /*shrink_to_fit=*/false));
bits_buffer_->ZeroPadding();
}
internal::ValidityBitmapInputOutput io;
io.valid_bits = bits_buffer_->mutable_data();
io.values_read_upper_bound = batch_size;
internal::DefLevelsToBitmap(def_levels, batch_size, level_info_, &io);
*out_values_to_write = io.values_read - io.null_count;
*out_spaced_values_to_write = io.values_read;
*null_count = io.null_count;
}
Result<std::shared_ptr<Array>> MaybeReplaceValidity(std::shared_ptr<Array> array,
int64_t new_null_count,
::arrow::MemoryPool* memory_pool) {
if (bits_buffer_ == nullptr) {
return array;
}
std::vector<std::shared_ptr<Buffer>> buffers = array->data()->buffers;
if (buffers.empty()) {
return array;
}
buffers[0] = bits_buffer_;
// Should be a leaf array.
DCHECK_GT(buffers.size(), 1);
ValueBufferSlicer slicer{memory_pool};
if (array->data()->offset > 0) {
RETURN_NOT_OK(::arrow::VisitArrayInline(*array, &slicer, &buffers[1]));
}
return ::arrow::MakeArray(std::make_shared<ArrayData>(
array->type(), array->length(), std::move(buffers), new_null_count));
}
void WriteLevelsSpaced(int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels) {
// Update histograms now, to maximize cache efficiency.
UpdateLevelHistogram(num_levels, def_levels, rep_levels);
// If the field is required and non-repeated, there are no definition levels
if (descr_->max_definition_level() > 0) {
WriteDefinitionLevels(num_levels, def_levels);
}
// Not present for non-repeated fields
if (descr_->max_repetition_level() > 0) {
// A row could include more than one value
// Count the occasions where we start a new row
for (int64_t i = 0; i < num_levels; ++i) {
if (rep_levels[i] == 0) {
rows_written_++;
num_buffered_rows_++;
}
}
WriteRepetitionLevels(num_levels, rep_levels);
} else {
// Each value is exactly one row
rows_written_ += num_levels;
num_buffered_rows_ += num_levels;
}
}
void UpdateLevelHistogram(int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels) const {
if (page_size_statistics_ == nullptr) {
return;
}
auto add_levels = [](std::vector<int64_t>& level_histogram,
::arrow::util::span<const int16_t> levels, int16_t max_level) {
if (max_level == 0) {
return;
}
ARROW_DCHECK_EQ(static_cast<size_t>(max_level) + 1, level_histogram.size());
::parquet::UpdateLevelHistogram(levels, level_histogram);
};
add_levels(page_size_statistics_->definition_level_histogram,
{def_levels, static_cast<size_t>(num_levels)},
descr_->max_definition_level());
add_levels(page_size_statistics_->repetition_level_histogram,
{rep_levels, static_cast<size_t>(num_levels)},
descr_->max_repetition_level());
}
// Update the unencoded data bytes for ByteArray only per the specification.
void UpdateUnencodedDataBytes() const {
if constexpr (std::is_same_v<T, ByteArray>) {
if (page_size_statistics_ != nullptr) {
page_size_statistics_->IncrementUnencodedByteArrayDataBytes(
current_encoder_->ReportUnencodedDataBytes());
}
}
}
void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values,
int64_t num_nulls, bool check_page_limit) {
num_buffered_values_ += num_levels;
num_buffered_encoded_values_ += num_values;
num_buffered_nulls_ += num_nulls;
if (check_page_limit &&
(current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize() ||
num_buffered_rows_ >= properties_->max_rows_per_page())) {
AddDataPage();
}
}
void FallbackToPlainEncoding() {
if (IsDictionaryIndexEncoding(current_encoder_->encoding())) {
WriteDictionaryPage();
// Serialize the buffered Dictionary Indices
FlushBufferedDataPages();
fallback_ = true;
// Only PLAIN encoding is supported for fallback
current_encoder_ = MakeEncoder(ParquetType::type_num, Encoding::PLAIN, false,
descr_, properties_->memory_pool());
current_value_encoder_ = dynamic_cast<ValueEncoderType*>(current_encoder_.get());
current_dict_encoder_ = nullptr; // not using dict
encoding_ = Encoding::PLAIN;
}
}
// Checks if the Dictionary Page size limit is reached
// If the limit is reached, the Dictionary and Data Pages are serialized
// The encoding is switched to PLAIN
//
// Only one Dictionary Page is written.
// Fallback to PLAIN if dictionary page limit is reached.
void CheckDictionarySizeLimit() {
if (!has_dictionary_ || fallback_) {
// Either not using dictionary encoding, or we have already fallen back
// to PLAIN encoding because the size threshold was reached
return;
}
if (current_dict_encoder_->dict_encoded_size() >=
properties_->dictionary_pagesize_limit()) {
FallbackToPlainEncoding();
}
}
void WriteValues(const T* values, int64_t num_values, int64_t num_nulls) {
current_value_encoder_->Put(values, static_cast<int>(num_values));
if (page_statistics_ != nullptr) {
page_statistics_->Update(values, num_values, num_nulls);
}
UpdateUnencodedDataBytes();
if constexpr (std::is_same<T, ByteArray>::value) {
if (chunk_geospatial_statistics_ != nullptr) {
chunk_geospatial_statistics_->Update(values, num_values);
}
}
if (bloom_filter_writer_ != nullptr) {
bloom_filter_writer_->Update(values, num_values);
}
}
/// \brief Write values with spaces and update page statistics accordingly.
///
/// \param values input buffer of values to write, including spaces.
/// \param num_values number of non-null values in the values buffer.
/// \param num_spaced_values length of values buffer, including spaces and does not
/// count some nulls from ancestor (e.g. empty lists).
/// \param valid_bits validity bitmap of values buffer, which does not include some
/// nulls from ancestor (e.g. empty lists).
/// \param valid_bits_offset offset to valid_bits bitmap.
/// \param num_levels number of levels to write, including nulls from values buffer
/// and nulls from ancestor (e.g. empty lists).
/// \param num_nulls number of nulls in the values buffer as well as nulls from the
/// ancestor (e.g. empty lists).
void WriteValuesSpaced(const T* values, int64_t num_values, int64_t num_spaced_values,
const uint8_t* valid_bits, int64_t valid_bits_offset,
int64_t num_levels, int64_t num_nulls) {
if (num_values != num_spaced_values) {
current_value_encoder_->PutSpaced(values, static_cast<int>(num_spaced_values),
valid_bits, valid_bits_offset);
if (bloom_filter_writer_ != nullptr) {
bloom_filter_writer_->UpdateSpaced(values, num_spaced_values, valid_bits,
valid_bits_offset);
}
} else {
current_value_encoder_->Put(values, static_cast<int>(num_values));
if (bloom_filter_writer_ != nullptr) {
bloom_filter_writer_->Update(values, num_values);
}
}
if (page_statistics_ != nullptr) {
page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset,
num_spaced_values, num_values, num_nulls);
}
UpdateUnencodedDataBytes();
if constexpr (std::is_same<T, ByteArray>::value) {
if (chunk_geospatial_statistics_ != nullptr) {
chunk_geospatial_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset,
num_spaced_values, num_values);
}
}
}
};
template <typename ParquetType>
Status TypedColumnWriterImpl<ParquetType>::WriteArrowDictionary(
const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels,
const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) {
// If this is the first time writing a DictionaryArray, then there's
// a few possible paths to take:
//
// - If dictionary encoding is not enabled, convert to densely
// encoded and call WriteArrow
// - Dictionary encoding enabled
// - If this is the first time this is called, then we call
// PutDictionary into the encoder and then PutIndices on each
// chunk. We store the dictionary that was written in
// preserved_dictionary_ so that subsequent calls to this method
// can make sure the dictionary has not changed
// - On subsequent calls, we have to check whether the dictionary
// has changed. If it has, then we trigger the varying
// dictionary path and materialize each chunk and then call
// WriteArrow with that
auto WriteDense = [&] {
std::shared_ptr<::arrow::Array> dense_array;
RETURN_NOT_OK(
ConvertDictionaryToDense(array, properties_->memory_pool(), &dense_array));
return WriteArrowDense(def_levels, rep_levels, num_levels, *dense_array, ctx,
maybe_parent_nulls);
};
if (!IsDictionaryIndexEncoding(current_encoder_->encoding()) ||
!DictionaryDirectWriteSupported(array)) {
// No longer dictionary-encoding for whatever reason, maybe we never were
// or we decided to stop. Note that WriteArrow can be invoked multiple
// times with both dense and dictionary-encoded versions of the same data
// without a problem. Any dense data will be hashed to indices until the
// dictionary page limit is reached, at which everything (dictionary and
// dense) will fall back to plain encoding
return WriteDense();
}
auto dict_encoder = dynamic_cast<DictEncoder<ParquetType>*>(current_encoder_.get());
const auto& data = checked_cast<const ::arrow::DictionaryArray&>(array);
std::shared_ptr<::arrow::Array> dictionary = data.dictionary();
std::shared_ptr<::arrow::Array> indices = data.indices();
auto update_stats = [&](int64_t num_chunk_levels,
const std::shared_ptr<Array>& chunk_indices) {
// TODO(PARQUET-2068) This approach may make two copies. First, a copy of the
// indices array to a (hopefully smaller) referenced indices array. Second, a copy
// of the values array to a (probably not smaller) referenced values array.
//
// Once the MinMax kernel supports all data types we should use that kernel instead
// as it does not make any copies.
::arrow::compute::ExecContext exec_ctx(ctx->memory_pool);
exec_ctx.set_use_threads(false);
std::shared_ptr<::arrow::Array> referenced_dictionary;
PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices,
::arrow::compute::Unique(*chunk_indices, &exec_ctx));
// On first run, we might be able to re-use the existing dictionary
if (referenced_indices.length() == dictionary->length()) {
referenced_dictionary = dictionary;
} else {
PARQUET_ASSIGN_OR_THROW(
::arrow::Datum referenced_dictionary_datum,
::arrow::compute::Take(dictionary, referenced_indices,
::arrow::compute::TakeOptions(/*boundscheck=*/false),
&exec_ctx));
referenced_dictionary = referenced_dictionary_datum.make_array();
}
if (page_statistics_ != nullptr) {
int64_t non_null_count = chunk_indices->length() - chunk_indices->null_count();
page_statistics_->IncrementNullCount(num_chunk_levels - non_null_count);
page_statistics_->IncrementNumValues(non_null_count);
page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false);
}
if (chunk_geospatial_statistics_ != nullptr) {
throw ParquetException(
"Writing dictionary-encoded GEOMETRY or GEOGRAPHY with statistics is not "
"supported");
}
if (bloom_filter_writer_ != nullptr) {
bloom_filter_writer_->Update(*referenced_dictionary);
}
};
int64_t value_offset = 0;
auto WriteIndicesChunk = [&](int64_t offset, int64_t batch_size, bool check_page) {
int64_t batch_num_values = 0;
int64_t batch_num_spaced_values = 0;
int64_t null_count = ::arrow::kUnknownNullCount;
// Bits is not null for nullable values. At this point in the code we can't
// determine if the leaf array has the same null values as any parents it might have
// had so we need to recompute it from def levels.
MaybeCalculateValidityBits(AddIfNotNull(def_levels, offset), batch_size,
&batch_num_values, &batch_num_spaced_values, &null_count);
WriteLevelsSpaced(batch_size, AddIfNotNull(def_levels, offset),
AddIfNotNull(rep_levels, offset));
std::shared_ptr<Array> writeable_indices =
indices->Slice(value_offset, batch_num_spaced_values);
if (page_statistics_ || bloom_filter_writer_) {
update_stats(/*num_chunk_levels=*/batch_size, writeable_indices);
}
PARQUET_ASSIGN_OR_THROW(
writeable_indices,
MaybeReplaceValidity(writeable_indices, null_count, ctx->memory_pool));
dict_encoder->PutIndices(*writeable_indices);
// Update unencoded byte array data size to size statistics
UpdateUnencodedDataBytes();
CommitWriteAndCheckPageLimit(batch_size, batch_num_values, null_count, check_page);
value_offset += batch_num_spaced_values;
};
// Handle seeing dictionary for the first time
if (!preserved_dictionary_) {
// It's a new dictionary. Call PutDictionary and keep track of it
PARQUET_CATCH_NOT_OK(dict_encoder->PutDictionary(*dictionary));
// If there were duplicate value in the dictionary, the encoder's memo table
// will be out of sync with the indices in the Arrow array.
// The easiest solution for this uncommon case is to fallback to plain encoding.
if (dict_encoder->num_entries() != dictionary->length()) {
PARQUET_CATCH_NOT_OK(FallbackToPlainEncoding());
return WriteDense();
}
preserved_dictionary_ = dictionary;
} else if (!dictionary->Equals(*preserved_dictionary_)) {
// Dictionary has changed
PARQUET_CATCH_NOT_OK(FallbackToPlainEncoding());
return WriteDense();
}
PARQUET_CATCH_NOT_OK(
DoInBatches(def_levels, rep_levels, num_levels, properties_->write_batch_size(),
properties_->max_rows_per_page(), pages_change_on_record_boundaries(),
WriteIndicesChunk, [this]() { return num_buffered_rows_; }));
return Status::OK();
}
// ----------------------------------------------------------------------
// Direct Arrow write path
template <typename ParquetType, typename ArrowType, typename Enable = void>
struct SerializeFunctor {
using ArrowCType = typename ArrowType::c_type;
using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
using ParquetCType = typename ParquetType::c_type;
Status Serialize(const ArrayType& array, ArrowWriteContext*, ParquetCType* out) {
const ArrowCType* input = array.raw_values();
if (array.null_count() > 0) {
for (int64_t i = 0; i < array.length(); i++) {
out[i] = static_cast<ParquetCType>(input[i]);
}
} else {
std::copy(input, input + array.length(), out);
}
return Status::OK();
}
};
template <typename ParquetType>
template <typename ArrowType>
Status TypedColumnWriterImpl<ParquetType>::WriteArrowSerialize(
const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels,
const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) {
using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
using ParquetCType = typename ParquetType::c_type;
ParquetCType* buffer = nullptr;
PARQUET_THROW_NOT_OK(ctx->GetScratchData<ParquetCType>(array.length(), &buffer));
SerializeFunctor<ParquetType, ArrowType> functor;
// The value buffer could be empty if all values are nulls.
// The output buffer will then remain uninitialized, but that's ok since
// null value slots are not written in Parquet.
if (array.null_count() != array.length()) {
RETURN_NOT_OK(functor.Serialize(checked_cast<const ArrayType&>(array), ctx, buffer));
}
bool no_nulls =
this->descr()->schema_node()->is_required() || (array.null_count() == 0);
if (!maybe_parent_nulls && no_nulls) {
PARQUET_CATCH_NOT_OK(WriteBatchInternal(num_levels, def_levels, rep_levels, buffer));
} else {
PARQUET_CATCH_NOT_OK(WriteBatchSpacedInternal(num_levels, def_levels, rep_levels,
array.null_bitmap_data(),
array.offset(), buffer));
}
return Status::OK();
}
#define WRITE_SERIALIZE_CASE(ArrowEnum) \
case ::arrow::Type::ArrowEnum: { \
using ArrowType = typename ::arrow::TypeIdTraits<::arrow::Type::ArrowEnum>::Type; \
return WriteArrowSerialize<ArrowType>(def_levels, rep_levels, num_levels, array, \
ctx, maybe_parent_nulls); \
}
#define WRITE_ZERO_COPY_CASE(ArrowEnum) \
case ::arrow::Type::ArrowEnum: \
return WriteArrowZeroCopy(def_levels, rep_levels, num_levels, array, ctx, \
maybe_parent_nulls);
#define ARROW_UNSUPPORTED() \
std::stringstream ss; \
ss << "Arrow type " << array.type()->ToString() \
<< " cannot be written to Parquet type " << descr_->ToString(); \
return Status::Invalid(ss.str());
// ----------------------------------------------------------------------
// Write Arrow to BooleanType
template <>
struct SerializeFunctor<BooleanType, ::arrow::BooleanType> {
Status Serialize(const ::arrow::BooleanArray& data, ArrowWriteContext*, bool* out) {
for (int64_t i = 0; i < data.length(); i++) {
*out++ = data.Value(i);
}
return Status::OK();
}
};
template <>
Status TypedColumnWriterImpl<BooleanType>::WriteArrowDense(
const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels,
const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) {
if (array.type_id() != ::arrow::Type::BOOL) {
ARROW_UNSUPPORTED();
}
return WriteArrowSerialize<::arrow::BooleanType>(def_levels, rep_levels, num_levels,
array, ctx, maybe_parent_nulls);
}
// ----------------------------------------------------------------------
// Write Arrow types to INT32
template <>
struct SerializeFunctor<Int32Type, ::arrow::Date64Type> {
Status Serialize(const ::arrow::Date64Array& array, ArrowWriteContext*, int32_t* out) {
const int64_t* input = array.raw_values();
for (int64_t i = 0; i < array.length(); i++) {
*out++ = static_cast<int32_t>(*input++ / 86400000);
}
return Status::OK();
}
};
template <typename ParquetType, typename ArrowType>
struct SerializeFunctor<
ParquetType, ArrowType,
::arrow::enable_if_t<::arrow::is_decimal_type<ArrowType>::value&& ::arrow::internal::
IsOneOf<ParquetType, Int32Type, Int64Type>::value>> {
using value_type = typename ParquetType::c_type;
Status Serialize(const typename ::arrow::TypeTraits<ArrowType>::ArrayType& array,
ArrowWriteContext* ctx, value_type* out) {
if (array.null_count() == 0) {
for (int64_t i = 0; i < array.length(); i++) {
out[i] = TransferValue(array.Value(i));
}
} else {
for (int64_t i = 0; i < array.length(); i++) {
out[i] = array.IsValid(i) ? TransferValue(array.Value(i)) : 0;
}
}
return Status::OK();
}
private:
value_type TransferValue(const uint8_t* in) const {
using DecimalValue = typename ::arrow::TypeTraits<ArrowType>::CType;
DecimalValue decimal_value(in);
if constexpr (std::is_same_v<ArrowType, ::arrow::Decimal256Type>) {
// Decimal256 does not provide ToInteger, but we are sure it fits in the target
// integer type.
return static_cast<value_type>(decimal_value.low_bits());
} else {
value_type value = 0;
PARQUET_THROW_NOT_OK(decimal_value.ToInteger(&value));
return value;
}
}
};
template <>
struct SerializeFunctor<Int32Type, ::arrow::Time32Type> {
Status Serialize(const ::arrow::Time32Array& array, ArrowWriteContext*, int32_t* out) {
const int32_t* input = array.raw_values();
const auto& type = static_cast<const ::arrow::Time32Type&>(*array.type());
if (type.unit() == ::arrow::TimeUnit::SECOND) {
for (int64_t i = 0; i < array.length(); i++) {
out[i] = input[i] * 1000;
}
} else {
std::copy(input, input + array.length(), out);
}
return Status::OK();
}
};
template <>
Status TypedColumnWriterImpl<Int32Type>::WriteArrowDense(
const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels,
const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) {
switch (array.type()->id()) {
case ::arrow::Type::NA: {
PARQUET_CATCH_NOT_OK(
WriteBatchInternal(num_levels, def_levels, rep_levels, nullptr));
} break;
WRITE_SERIALIZE_CASE(INT8)
WRITE_SERIALIZE_CASE(UINT8)
WRITE_SERIALIZE_CASE(INT16)
WRITE_SERIALIZE_CASE(UINT16)
WRITE_SERIALIZE_CASE(UINT32)
WRITE_ZERO_COPY_CASE(INT32)
WRITE_ZERO_COPY_CASE(DATE32)
WRITE_SERIALIZE_CASE(DATE64)
WRITE_SERIALIZE_CASE(TIME32)
WRITE_SERIALIZE_CASE(DECIMAL32)
WRITE_SERIALIZE_CASE(DECIMAL64)
WRITE_SERIALIZE_CASE(DECIMAL128)
WRITE_SERIALIZE_CASE(DECIMAL256)
default:
ARROW_UNSUPPORTED()
}
return Status::OK();
}
// ----------------------------------------------------------------------
// Write Arrow to Int64 and Int96
#define INT96_CONVERT_LOOP(ConversionFunction) \
for (int64_t i = 0; i < array.length(); i++) ConversionFunction(input[i], &out[i]);
template <>
struct SerializeFunctor<Int96Type, ::arrow::TimestampType> {
Status Serialize(const ::arrow::TimestampArray& array, ArrowWriteContext*, Int96* out) {
const int64_t* input = array.raw_values();
const auto& type = static_cast<const ::arrow::TimestampType&>(*array.type());
switch (type.unit()) {
case ::arrow::TimeUnit::NANO:
INT96_CONVERT_LOOP(internal::NanosecondsToImpalaTimestamp);
break;
case ::arrow::TimeUnit::MICRO:
INT96_CONVERT_LOOP(internal::MicrosecondsToImpalaTimestamp);
break;
case ::arrow::TimeUnit::MILLI:
INT96_CONVERT_LOOP(internal::MillisecondsToImpalaTimestamp);
break;
case ::arrow::TimeUnit::SECOND:
INT96_CONVERT_LOOP(internal::SecondsToImpalaTimestamp);
break;
}
return Status::OK();
}
};
#define COERCE_DIVIDE -1
#define COERCE_INVALID 0
#define COERCE_MULTIPLY +1
static std::pair<int, int64_t> kTimestampCoercionFactors[4][4] = {
// from seconds ...
{{COERCE_INVALID, 0}, // ... to seconds
{COERCE_MULTIPLY, 1000}, // ... to millis
{COERCE_MULTIPLY, 1000000}, // ... to micros
{COERCE_MULTIPLY, INT64_C(1000000000)}}, // ... to nanos
// from millis ...
{{COERCE_INVALID, 0},
{COERCE_MULTIPLY, 1},
{COERCE_MULTIPLY, 1000},
{COERCE_MULTIPLY, 1000000}},
// from micros ...
{{COERCE_INVALID, 0},
{COERCE_DIVIDE, 1000},
{COERCE_MULTIPLY, 1},
{COERCE_MULTIPLY, 1000}},
// from nanos ...
{{COERCE_INVALID, 0},
{COERCE_DIVIDE, 1000000},
{COERCE_DIVIDE, 1000},
{COERCE_MULTIPLY, 1}}};
template <>
struct SerializeFunctor<Int64Type, ::arrow::TimestampType> {
Status Serialize(const ::arrow::TimestampArray& array, ArrowWriteContext* ctx,
int64_t* out) {
const auto& source_type = static_cast<const ::arrow::TimestampType&>(*array.type());
auto source_unit = source_type.unit();
const int64_t* values = array.raw_values();
::arrow::TimeUnit::type target_unit = ctx->properties->coerce_timestamps_unit();
auto target_type = ::arrow::timestamp(target_unit);
bool truncation_allowed = ctx->properties->truncated_timestamps_allowed();
auto DivideBy = [&](const int64_t factor) {
for (int64_t i = 0; i < array.length(); i++) {
if (!truncation_allowed && array.IsValid(i) && (values[i] % factor != 0)) {
return Status::Invalid("Casting from ", source_type.ToString(), " to ",
target_type->ToString(),
" would lose data: ", values[i]);
}
out[i] = values[i] / factor;
}
return Status::OK();
};
auto MultiplyBy = [&](const int64_t factor) {
for (int64_t i = 0; i < array.length(); i++) {
out[i] = values[i] * factor;
}
return Status::OK();
};
const auto& coercion = kTimestampCoercionFactors[static_cast<int>(source_unit)]
[static_cast<int>(target_unit)];
// .first -> coercion operation; .second -> scale factor
DCHECK_NE(coercion.first, COERCE_INVALID);
return coercion.first == COERCE_DIVIDE ? DivideBy(coercion.second)
: MultiplyBy(coercion.second);
}
};
#undef COERCE_DIVIDE
#undef COERCE_INVALID
#undef COERCE_MULTIPLY
template <>
Status TypedColumnWriterImpl<Int64Type>::WriteArrowTimestamps(
const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels,
const ::arrow::Array& values, ArrowWriteContext* ctx, bool maybe_parent_nulls) {
const auto& source_type = static_cast<const ::arrow::TimestampType&>(*values.type());
auto WriteCoerce = [&](const ArrowWriterProperties* properties) {
ArrowWriteContext temp_ctx = *ctx;
temp_ctx.properties = properties;
return WriteArrowSerialize<::arrow::TimestampType>(
def_levels, rep_levels, num_levels, values, &temp_ctx, maybe_parent_nulls);
};
const ParquetVersion::type version = this->properties()->version();
if (ctx->properties->coerce_timestamps_enabled()) {
// User explicitly requested coercion to specific unit
if (source_type.unit() == ctx->properties->coerce_timestamps_unit()) {
// No data conversion necessary
return WriteArrowZeroCopy(def_levels, rep_levels, num_levels, values, ctx,
maybe_parent_nulls);
} else {
return WriteCoerce(ctx->properties);
}
} else if ((version == ParquetVersion::PARQUET_1_0 ||
version == ParquetVersion::PARQUET_2_4) &&
source_type.unit() == ::arrow::TimeUnit::NANO) {
// Absent superseding user instructions, when writing Parquet version <= 2.4 files,
// timestamps in nanoseconds are coerced to microseconds
std::shared_ptr<ArrowWriterProperties> properties =
(ArrowWriterProperties::Builder())
.coerce_timestamps(::arrow::TimeUnit::MICRO)
->disallow_truncated_timestamps()
->build();
return WriteCoerce(properties.get());
} else if (source_type.unit() == ::arrow::TimeUnit::SECOND) {
// Absent superseding user instructions, timestamps in seconds are coerced to
// milliseconds
std::shared_ptr<ArrowWriterProperties> properties =
(ArrowWriterProperties::Builder())
.coerce_timestamps(::arrow::TimeUnit::MILLI)
->build();
return WriteCoerce(properties.get());
} else {
// No data conversion necessary
return WriteArrowZeroCopy(def_levels, rep_levels, num_levels, values, ctx,
maybe_parent_nulls);
}
}
template <>
Status TypedColumnWriterImpl<Int64Type>::WriteArrowDense(
const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels,
const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) {
switch (array.type()->id()) {
case ::arrow::Type::TIMESTAMP:
return WriteArrowTimestamps(def_levels, rep_levels, num_levels, array, ctx,
maybe_parent_nulls);
WRITE_ZERO_COPY_CASE(INT64)
WRITE_SERIALIZE_CASE(UINT32)
WRITE_SERIALIZE_CASE(UINT64)
WRITE_ZERO_COPY_CASE(TIME64)
WRITE_ZERO_COPY_CASE(DURATION)
WRITE_SERIALIZE_CASE(DECIMAL32)
WRITE_SERIALIZE_CASE(DECIMAL64)
WRITE_SERIALIZE_CASE(DECIMAL128)
WRITE_SERIALIZE_CASE(DECIMAL256)
default:
ARROW_UNSUPPORTED();
}
}
template <>
Status TypedColumnWriterImpl<Int96Type>::WriteArrowDense(
const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels,
const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) {
if (array.type_id() != ::arrow::Type::TIMESTAMP) {
ARROW_UNSUPPORTED();
}
return WriteArrowSerialize<::arrow::TimestampType>(def_levels, rep_levels, num_levels,
array, ctx, maybe_parent_nulls);
}
// ----------------------------------------------------------------------
// Floating point types
template <>
Status TypedColumnWriterImpl<FloatType>::WriteArrowDense(
const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels,
const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) {
if (array.type_id() != ::arrow::Type::FLOAT) {
ARROW_UNSUPPORTED();
}
return WriteArrowZeroCopy(def_levels, rep_levels, num_levels, array, ctx,
maybe_parent_nulls);
}
template <>
Status TypedColumnWriterImpl<DoubleType>::WriteArrowDense(
const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels,
const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) {
if (array.type_id() != ::arrow::Type::DOUBLE) {
ARROW_UNSUPPORTED();
}
return WriteArrowZeroCopy(def_levels, rep_levels, num_levels, array, ctx,
maybe_parent_nulls);
}
// ----------------------------------------------------------------------
// Write Arrow to BYTE_ARRAY
template <>
Status TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels,
const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) {
if (!::arrow::is_base_binary_like(array.type()->id()) &&
!::arrow::is_binary_view_like(array.type()->id())) {
ARROW_UNSUPPORTED();
}
int64_t value_offset = 0;
auto WriteChunk = [&](int64_t offset, int64_t batch_size, bool check_page) {
int64_t batch_num_values = 0;
int64_t batch_num_spaced_values = 0;
int64_t null_count = 0;
MaybeCalculateValidityBits(AddIfNotNull(def_levels, offset), batch_size,
&batch_num_values, &batch_num_spaced_values, &null_count);
WriteLevelsSpaced(batch_size, AddIfNotNull(def_levels, offset),
AddIfNotNull(rep_levels, offset));
std::shared_ptr<Array> data_slice =
array.Slice(value_offset, batch_num_spaced_values);
PARQUET_ASSIGN_OR_THROW(
data_slice, MaybeReplaceValidity(data_slice, null_count, ctx->memory_pool));
current_encoder_->Put(*data_slice);
// Null values in ancestors count as nulls.
const int64_t non_null = data_slice->length() - data_slice->null_count();
if (page_statistics_ != nullptr) {
page_statistics_->Update(*data_slice, /*update_counts=*/false);
page_statistics_->IncrementNullCount(batch_size - non_null);
page_statistics_->IncrementNumValues(non_null);
}
UpdateUnencodedDataBytes();
if (chunk_geospatial_statistics_ != nullptr) {
chunk_geospatial_statistics_->Update(*data_slice);
}
if (bloom_filter_writer_ != nullptr) {
bloom_filter_writer_->Update(*data_slice);
}
CommitWriteAndCheckPageLimit(batch_size, batch_num_values, batch_size - non_null,
check_page);
CheckDictionarySizeLimit();
value_offset += batch_num_spaced_values;
};
PARQUET_CATCH_NOT_OK(
DoInBatches(def_levels, rep_levels, num_levels, properties_->write_batch_size(),
properties_->max_rows_per_page(), pages_change_on_record_boundaries(),
WriteChunk, [this]() { return num_buffered_rows_; }));
return Status::OK();
}
// ----------------------------------------------------------------------
// Write Arrow to FIXED_LEN_BYTE_ARRAY
template <typename ParquetType, typename ArrowType>
struct SerializeFunctor<
ParquetType, ArrowType,
::arrow::enable_if_t<::arrow::is_fixed_size_binary_type<ArrowType>::value &&
!::arrow::is_decimal_type<ArrowType>::value>> {
Status Serialize(const ::arrow::FixedSizeBinaryArray& array, ArrowWriteContext*,
FLBA* out) {
if (array.null_count() == 0) {
// no nulls, just dump the data
// todo(advancedxy): use a writeBatch to avoid this step
for (int64_t i = 0; i < array.length(); i++) {
out[i] = FixedLenByteArray(array.GetValue(i));
}
} else {
for (int64_t i = 0; i < array.length(); i++) {
if (array.IsValid(i)) {
out[i] = FixedLenByteArray(array.GetValue(i));
}
}
}
return Status::OK();
}
};
// ----------------------------------------------------------------------
// Write Arrow to Decimal128
// Requires a custom serializer because decimal in parquet are in big-endian
// format. Thus, a temporary local buffer is required.
template <typename ParquetType, typename ArrowType>
struct SerializeFunctor<
ParquetType, ArrowType,
::arrow::enable_if_t<
::arrow::is_decimal_type<ArrowType>::value &&
!::arrow::internal::IsOneOf<ParquetType, Int32Type, Int64Type>::value>> {
Status Serialize(const typename ::arrow::TypeTraits<ArrowType>::ArrayType& array,
ArrowWriteContext* ctx, FLBA* out) {
AllocateScratch(array, ctx);
auto offset = Offset(array);
if (array.null_count() == 0) {
for (int64_t i = 0; i < array.length(); i++) {
out[i] = FixDecimalEndianness(array.GetValue(i), offset);
}
} else {
for (int64_t i = 0; i < array.length(); i++) {
out[i] = array.IsValid(i) ? FixDecimalEndianness(array.GetValue(i), offset)
: FixedLenByteArray();
}
}
return Status::OK();
}
private:
// Parquet's Decimal are stored with FixedLength values where the length is
// proportional to the precision. Arrow's Decimal are always stored with 4/8/16/32
// bytes. Thus the internal FLBA pointer must be adjusted by the offset calculated
// here.
int32_t Offset(const Array& array) {
auto decimal_type = checked_pointer_cast<::arrow::DecimalType>(array.type());
return decimal_type->byte_width() -
::arrow::DecimalType::DecimalSize(decimal_type->precision());
}
void AllocateScratch(const typename ::arrow::TypeTraits<ArrowType>::ArrayType& array,
ArrowWriteContext* ctx) {
int64_t non_null_count = array.length() - array.null_count();
int64_t size = non_null_count * ArrowType::kByteWidth;
scratch_buffer = AllocateBuffer(ctx->memory_pool, size);
scratch = scratch_buffer->mutable_data();
}
FixedLenByteArray FixDecimalEndianness(const uint8_t* in, int64_t offset) {
auto out = reinterpret_cast<const uint8_t*>(scratch) + offset;
if constexpr (std::is_same_v<ArrowType, ::arrow::Decimal32Type>) {
const auto* u32_in = reinterpret_cast<const uint32_t*>(in);
auto p = reinterpret_cast<uint32_t*>(scratch);
*p++ = ::arrow::bit_util::ToBigEndian(u32_in[0]);
scratch = reinterpret_cast<uint8_t*>(p);
} else {
const auto* u64_in = reinterpret_cast<const uint64_t*>(in);
auto p = reinterpret_cast<uint64_t*>(scratch);
if constexpr (std::is_same_v<ArrowType, ::arrow::Decimal64Type>) {
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]);
} else if constexpr (std::is_same_v<ArrowType, ::arrow::Decimal128Type>) {
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[1]);
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]);
} else if constexpr (std::is_same_v<ArrowType, ::arrow::Decimal256Type>) {
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[3]);
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[2]);
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[1]);
*p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]);
}
scratch = reinterpret_cast<uint8_t*>(p);
}
return FixedLenByteArray(out);
}
std::shared_ptr<ResizableBuffer> scratch_buffer;
uint8_t* scratch;
};
// ----------------------------------------------------------------------
// Write Arrow to Float16
// Requires a custom serializer because Float16s in Parquet are stored as a 2-byte
// (little-endian) FLBA, whereas in Arrow they're a native `uint16_t`.
template <>
struct SerializeFunctor<::parquet::FLBAType, ::arrow::HalfFloatType> {
Status Serialize(const ::arrow::HalfFloatArray& array, ArrowWriteContext*, FLBA* out) {
const uint16_t* values = array.raw_values();
if (array.null_count() == 0) {
for (int64_t i = 0; i < array.length(); ++i) {
out[i] = ToFLBA(&values[i]);
}
} else {
for (int64_t i = 0; i < array.length(); ++i) {
out[i] = array.IsValid(i) ? ToFLBA(&values[i]) : FLBA{};
}
}
return Status::OK();
}
private:
FLBA ToFLBA(const uint16_t* value_ptr) const {
return FLBA{reinterpret_cast<const uint8_t*>(value_ptr)};
}
};
template <>
Status TypedColumnWriterImpl<FLBAType>::WriteArrowDense(
const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels,
const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) {
switch (array.type()->id()) {
WRITE_SERIALIZE_CASE(FIXED_SIZE_BINARY)
WRITE_SERIALIZE_CASE(DECIMAL32)
WRITE_SERIALIZE_CASE(DECIMAL64)
WRITE_SERIALIZE_CASE(DECIMAL128)
WRITE_SERIALIZE_CASE(DECIMAL256)
WRITE_SERIALIZE_CASE(HALF_FLOAT)
default:
break;
}
return Status::OK();
}
// ----------------------------------------------------------------------
// Dynamic column writer constructor
std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata,
std::unique_ptr<PageWriter> pager,
const WriterProperties* properties,
BloomFilter* bloom_filter) {
const ColumnDescriptor* descr = metadata->descr();
const bool use_dictionary = properties->dictionary_enabled(descr->path()) &&
descr->physical_type() != Type::BOOLEAN;
Encoding::type encoding = properties->encoding(descr->path());
if (encoding == Encoding::UNKNOWN) {
encoding = (descr->physical_type() == Type::BOOLEAN &&
properties->version() != ParquetVersion::PARQUET_1_0 &&
properties->data_page_version() == ParquetDataPageVersion::V2)
? Encoding::RLE
: Encoding::PLAIN;
}
if (use_dictionary) {
encoding = properties->dictionary_index_encoding();
}
switch (descr->physical_type()) {
case Type::BOOLEAN: {
if (bloom_filter != nullptr) {
throw ParquetException("Bloom filter is not supported for boolean type");
}
return std::make_shared<TypedColumnWriterImpl<BooleanType>>(
metadata, std::move(pager), use_dictionary, encoding, properties,
/*bloom_filter=*/nullptr);
}
case Type::INT32:
return std::make_shared<TypedColumnWriterImpl<Int32Type>>(
metadata, std::move(pager), use_dictionary, encoding, properties, bloom_filter);
case Type::INT64:
return std::make_shared<TypedColumnWriterImpl<Int64Type>>(
metadata, std::move(pager), use_dictionary, encoding, properties, bloom_filter);
case Type::INT96:
return std::make_shared<TypedColumnWriterImpl<Int96Type>>(
metadata, std::move(pager), use_dictionary, encoding, properties, bloom_filter);
case Type::FLOAT:
return std::make_shared<TypedColumnWriterImpl<FloatType>>(
metadata, std::move(pager), use_dictionary, encoding, properties, bloom_filter);
case Type::DOUBLE:
return std::make_shared<TypedColumnWriterImpl<DoubleType>>(
metadata, std::move(pager), use_dictionary, encoding, properties, bloom_filter);
case Type::BYTE_ARRAY:
return std::make_shared<TypedColumnWriterImpl<ByteArrayType>>(
metadata, std::move(pager), use_dictionary, encoding, properties, bloom_filter);
case Type::FIXED_LEN_BYTE_ARRAY:
return std::make_shared<TypedColumnWriterImpl<FLBAType>>(
metadata, std::move(pager), use_dictionary, encoding, properties, bloom_filter);
default:
ParquetException::NYI("Column writer not implemented for type: " +
TypeToString(descr->physical_type()));
}
// Unreachable code, but suppress compiler warning
return std::shared_ptr<ColumnWriter>(nullptr);
}
} // namespace parquet