| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <memory> |
| #include <utility> |
| #include <vector> |
| |
| #include <gmock/gmock.h> |
| #include <gtest/gtest.h> |
| |
| #include "arrow/io/buffered.h" |
| #include "arrow/io/file.h" |
| #include "arrow/testing/gtest_util.h" |
| #include "arrow/util/bit_util.h" |
| #include "arrow/util/bitmap_builders.h" |
| #include "arrow/util/config.h" |
| #include "arrow/util/key_value_metadata.h" |
| |
| #include "parquet/bloom_filter.h" |
| #include "parquet/bloom_filter_writer.h" |
| #include "parquet/column_page.h" |
| #include "parquet/column_reader.h" |
| #include "parquet/column_writer.h" |
| #include "parquet/file_reader.h" |
| #include "parquet/file_writer.h" |
| #include "parquet/geospatial/statistics.h" |
| #include "parquet/metadata.h" |
| #include "parquet/page_index.h" |
| #include "parquet/platform.h" |
| #include "parquet/properties.h" |
| #include "parquet/statistics.h" |
| #include "parquet/test_util.h" |
| #include "parquet/types.h" |
| |
| namespace bit_util = arrow::bit_util; |
| |
| namespace parquet { |
| |
| using schema::GroupNode; |
| using schema::NodePtr; |
| using schema::PrimitiveNode; |
| |
| namespace test { |
| |
| using ::testing::IsNull; |
| using ::testing::NotNull; |
| |
| // The default size used in most tests. |
| const int SMALL_SIZE = 100; |
| #ifdef PARQUET_VALGRIND |
| // Larger size to test some corner cases, only used in some specific cases. |
| const int LARGE_SIZE = 10000; |
| // Very large size to test dictionary fallback. |
| const int VERY_LARGE_SIZE = 40000; |
| // Reduced dictionary page size to use for testing dictionary fallback with valgrind |
| const int64_t DICTIONARY_PAGE_SIZE = 1024; |
| #else |
| // Larger size to test some corner cases, only used in some specific cases. |
| const int LARGE_SIZE = 100000; |
| // Very large size to test dictionary fallback. |
| const int VERY_LARGE_SIZE = 400000; |
| // Dictionary page size to use for testing dictionary fallback |
| const int64_t DICTIONARY_PAGE_SIZE = 1024 * 1024; |
| #endif |
| |
| template <typename TestType> |
| class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> { |
| public: |
| void SetUp() { |
| this->SetupValuesOut(SMALL_SIZE); |
| writer_properties_ = default_writer_properties(); |
| definition_levels_out_.resize(SMALL_SIZE); |
| repetition_levels_out_.resize(SMALL_SIZE); |
| |
| this->SetUpSchema(Repetition::REQUIRED); |
| |
| descr_ = this->schema_.Column(0); |
| } |
| |
| Type::type type_num() { return TestType::type_num; } |
| |
| void BuildReader(int64_t num_rows, |
| Compression::type compression = Compression::UNCOMPRESSED, |
| bool page_checksum_verify = false) { |
| ASSERT_OK_AND_ASSIGN(auto buffer, sink_->Finish()); |
| auto source = std::make_shared<::arrow::io::BufferReader>(buffer); |
| ReaderProperties readerProperties; |
| readerProperties.set_page_checksum_verification(page_checksum_verify); |
| std::unique_ptr<PageReader> page_reader = |
| PageReader::Open(std::move(source), num_rows, compression, readerProperties); |
| reader_ = std::static_pointer_cast<TypedColumnReader<TestType>>( |
| ColumnReader::Make(this->descr_, std::move(page_reader))); |
| } |
| |
| std::shared_ptr<TypedColumnWriter<TestType>> BuildWriter( |
| int64_t output_size = SMALL_SIZE, |
| const ColumnProperties& column_properties = ColumnProperties(), |
| const ParquetVersion::type version = ParquetVersion::PARQUET_1_0, |
| const ParquetDataPageVersion data_page_version = ParquetDataPageVersion::V1, |
| bool enable_checksum = false, int64_t page_size = kDefaultDataPageSize, |
| int64_t max_rows_per_page = kDefaultMaxRowsPerPage) { |
| sink_ = CreateOutputStream(); |
| WriterProperties::Builder wp_builder; |
| wp_builder.version(version)->data_page_version(data_page_version); |
| if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY || |
| column_properties.encoding() == Encoding::RLE_DICTIONARY) { |
| wp_builder.enable_dictionary(); |
| wp_builder.dictionary_pagesize_limit(DICTIONARY_PAGE_SIZE); |
| } else { |
| wp_builder.disable_dictionary(); |
| wp_builder.encoding(column_properties.encoding()); |
| } |
| if (enable_checksum) { |
| wp_builder.enable_page_checksum(); |
| } |
| wp_builder.max_statistics_size(column_properties.max_statistics_size()); |
| wp_builder.data_pagesize(page_size); |
| wp_builder.max_rows_per_page(max_rows_per_page); |
| writer_properties_ = wp_builder.build(); |
| |
| metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_, this->descr_); |
| std::unique_ptr<PageWriter> pager = PageWriter::Open( |
| sink_, column_properties.compression(), metadata_.get(), |
| /* row_group_ordinal */ -1, /* column_chunk_ordinal*/ -1, |
| ::arrow::default_memory_pool(), /* buffered_row_group */ false, |
| /* header_encryptor */ NULLPTR, /* data_encryptor */ NULLPTR, enable_checksum); |
| std::shared_ptr<ColumnWriter> writer = |
| ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get()); |
| return std::dynamic_pointer_cast<TypedColumnWriter<TestType>>(writer); |
| } |
| |
| void ReadColumn(Compression::type compression = Compression::UNCOMPRESSED, |
| bool page_checksum_verify = false) { |
| BuildReader(static_cast<int64_t>(this->values_out_.size()), compression, |
| page_checksum_verify); |
| reader_->ReadBatch(static_cast<int>(this->values_out_.size()), |
| definition_levels_out_.data(), repetition_levels_out_.data(), |
| this->values_out_ptr_, &values_read_); |
| this->SyncValuesOut(); |
| } |
| |
| void ReadColumnFully(Compression::type compression = Compression::UNCOMPRESSED, |
| bool page_checksum_verify = false); |
| |
| void TestRequiredWithEncoding(Encoding::type encoding) { |
| return TestRequiredWithSettings(encoding, Compression::UNCOMPRESSED, false, false); |
| } |
| |
| void TestRequiredWithSettings( |
| Encoding::type encoding, Compression::type compression, bool enable_dictionary, |
| bool enable_statistics, int64_t num_rows = SMALL_SIZE, |
| int compression_level = Codec::UseDefaultCompressionLevel(), |
| bool enable_checksum = false) { |
| this->GenerateData(num_rows); |
| |
| this->WriteRequiredWithSettings(encoding, compression, enable_dictionary, |
| enable_statistics, compression_level, num_rows, |
| enable_checksum); |
| ASSERT_NO_FATAL_FAILURE(this->ReadAndCompare(compression, num_rows, enable_checksum)); |
| |
| this->WriteRequiredWithSettingsSpaced(encoding, compression, enable_dictionary, |
| enable_statistics, num_rows, compression_level, |
| enable_checksum); |
| ASSERT_NO_FATAL_FAILURE(this->ReadAndCompare(compression, num_rows, enable_checksum)); |
| } |
| |
| void TestRequiredWithCodecOptions(Encoding::type encoding, |
| Compression::type compression, bool enable_dictionary, |
| bool enable_statistics, int64_t num_rows = SMALL_SIZE, |
| const std::shared_ptr<CodecOptions>& codec_options = |
| std::make_shared<CodecOptions>(), |
| bool enable_checksum = false) { |
| this->GenerateData(num_rows); |
| |
| this->WriteRequiredWithCodecOptions(encoding, compression, enable_dictionary, |
| enable_statistics, codec_options, num_rows, |
| enable_checksum); |
| ASSERT_NO_FATAL_FAILURE(this->ReadAndCompare(compression, num_rows, enable_checksum)); |
| } |
| |
| void TestDictionaryFallbackEncoding(ParquetVersion::type version, |
| ParquetDataPageVersion data_page_version) { |
| this->GenerateData(VERY_LARGE_SIZE); |
| ColumnProperties column_properties; |
| column_properties.set_dictionary_enabled(true); |
| |
| if (version == ParquetVersion::PARQUET_1_0) { |
| column_properties.set_encoding(Encoding::PLAIN_DICTIONARY); |
| } else { |
| column_properties.set_encoding(Encoding::RLE_DICTIONARY); |
| } |
| |
| auto writer = |
| this->BuildWriter(VERY_LARGE_SIZE, column_properties, version, data_page_version); |
| |
| writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); |
| writer->Close(); |
| |
| // Read all rows so we are sure that also the non-dictionary pages are read correctly |
| this->SetupValuesOut(VERY_LARGE_SIZE); |
| this->ReadColumnFully(); |
| ASSERT_EQ(VERY_LARGE_SIZE, this->values_read_); |
| this->values_.resize(VERY_LARGE_SIZE); |
| ASSERT_EQ(this->values_, this->values_out_); |
| std::vector<Encoding::type> encodings_vector = this->metadata_encodings(); |
| std::set<Encoding::type> encodings(encodings_vector.begin(), encodings_vector.end()); |
| |
| if (this->type_num() == Type::BOOLEAN) { |
| // Dictionary encoding is not allowed for boolean type |
| std::set<Encoding::type> expected; |
| if (version != ParquetVersion::PARQUET_1_0 && |
| data_page_version == ParquetDataPageVersion::V2) { |
| // There is only 1 encoding (RLE) in a fallback case for version 2.0 and data page |
| // v2 enabled. |
| expected = {Encoding::RLE}; |
| } else { |
| // There are 2 encodings (PLAIN, RLE) in a non dictionary encoding case for |
| // version 1.0 or data page v1. Note that RLE is used for DL/RL. |
| expected = {Encoding::PLAIN, Encoding::RLE}; |
| } |
| ASSERT_EQ(encodings, expected); |
| } else if (version == ParquetVersion::PARQUET_1_0) { |
| // There are 3 encodings (PLAIN_DICTIONARY, PLAIN, RLE) in a fallback case |
| // for version 1.0 |
| std::set<Encoding::type> expected( |
| {Encoding::PLAIN_DICTIONARY, Encoding::PLAIN, Encoding::RLE}); |
| ASSERT_EQ(encodings, expected); |
| } else { |
| // There are 3 encodings (RLE_DICTIONARY, PLAIN, RLE) in a fallback case for |
| // version 2.0 |
| std::set<Encoding::type> expected( |
| {Encoding::RLE_DICTIONARY, Encoding::PLAIN, Encoding::RLE}); |
| ASSERT_EQ(encodings, expected); |
| } |
| |
| std::vector<parquet::PageEncodingStats> encoding_stats = |
| this->metadata_encoding_stats(); |
| if (this->type_num() == Type::BOOLEAN) { |
| ASSERT_EQ(encoding_stats[0].encoding, |
| version != ParquetVersion::PARQUET_1_0 && |
| data_page_version == ParquetDataPageVersion::V2 |
| ? Encoding::RLE |
| : Encoding::PLAIN); |
| ASSERT_EQ(encoding_stats[0].page_type, PageType::DATA_PAGE); |
| } else if (version == ParquetVersion::PARQUET_1_0) { |
| std::vector<Encoding::type> expected( |
| {Encoding::PLAIN_DICTIONARY, Encoding::PLAIN, Encoding::PLAIN_DICTIONARY}); |
| ASSERT_EQ(encoding_stats[0].encoding, expected[0]); |
| ASSERT_EQ(encoding_stats[0].page_type, PageType::DICTIONARY_PAGE); |
| for (size_t i = 1; i < encoding_stats.size(); i++) { |
| ASSERT_EQ(encoding_stats[i].encoding, expected[i]); |
| ASSERT_EQ(encoding_stats[i].page_type, PageType::DATA_PAGE); |
| } |
| } else { |
| std::vector<Encoding::type> expected( |
| {Encoding::PLAIN, Encoding::PLAIN, Encoding::RLE_DICTIONARY}); |
| ASSERT_EQ(encoding_stats[0].encoding, expected[0]); |
| ASSERT_EQ(encoding_stats[0].page_type, PageType::DICTIONARY_PAGE); |
| for (size_t i = 1; i < encoding_stats.size(); i++) { |
| ASSERT_EQ(encoding_stats[i].encoding, expected[i]); |
| ASSERT_EQ(encoding_stats[i].page_type, PageType::DATA_PAGE); |
| } |
| } |
| } |
| |
| void WriteRequiredWithSettings(Encoding::type encoding, Compression::type compression, |
| bool enable_dictionary, bool enable_statistics, |
| int compression_level, int64_t num_rows, |
| bool enable_checksum) { |
| ColumnProperties column_properties(encoding, compression, enable_dictionary, |
| enable_statistics); |
| column_properties.set_codec_options( |
| std::make_shared<CodecOptions>(compression_level)); |
| std::shared_ptr<TypedColumnWriter<TestType>> writer = |
| this->BuildWriter(num_rows, column_properties, ParquetVersion::PARQUET_1_0, |
| ParquetDataPageVersion::V1, enable_checksum); |
| writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); |
| // The behaviour should be independent from the number of Close() calls |
| writer->Close(); |
| writer->Close(); |
| } |
| |
| void WriteRequiredWithSettingsSpaced(Encoding::type encoding, |
| Compression::type compression, |
| bool enable_dictionary, bool enable_statistics, |
| int64_t num_rows, int compression_level, |
| bool enable_checksum) { |
| std::vector<uint8_t> valid_bits( |
| bit_util::BytesForBits(static_cast<uint32_t>(this->values_.size())) + 1, 255); |
| ColumnProperties column_properties(encoding, compression, enable_dictionary, |
| enable_statistics); |
| column_properties.set_codec_options( |
| std::make_shared<CodecOptions>(compression_level)); |
| std::shared_ptr<TypedColumnWriter<TestType>> writer = |
| this->BuildWriter(num_rows, column_properties, ParquetVersion::PARQUET_1_0, |
| ParquetDataPageVersion::V1, enable_checksum); |
| writer->WriteBatchSpaced(this->values_.size(), nullptr, nullptr, valid_bits.data(), 0, |
| this->values_ptr_); |
| // The behaviour should be independent from the number of Close() calls |
| writer->Close(); |
| writer->Close(); |
| } |
| |
| void WriteRequiredWithCodecOptions(Encoding::type encoding, |
| Compression::type compression, |
| bool enable_dictionary, bool enable_statistics, |
| const std::shared_ptr<CodecOptions>& codec_options, |
| int64_t num_rows, bool enable_checksum) { |
| ColumnProperties column_properties(encoding, compression, enable_dictionary, |
| enable_statistics); |
| column_properties.set_codec_options(codec_options); |
| std::shared_ptr<TypedColumnWriter<TestType>> writer = |
| this->BuildWriter(num_rows, column_properties, ParquetVersion::PARQUET_1_0, |
| ParquetDataPageVersion::V1, enable_checksum); |
| writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); |
| // The behaviour should be independent from the number of Close() calls |
| writer->Close(); |
| writer->Close(); |
| } |
| |
| void ReadAndCompare(Compression::type compression, int64_t num_rows, |
| bool page_checksum_verify) { |
| this->SetupValuesOut(num_rows); |
| this->ReadColumnFully(compression, page_checksum_verify); |
| auto comparator = MakeComparator<TestType>(this->descr_); |
| for (size_t i = 0; i < this->values_.size(); i++) { |
| if (comparator->Compare(this->values_[i], this->values_out_[i]) || |
| comparator->Compare(this->values_out_[i], this->values_[i])) { |
| ARROW_SCOPED_TRACE("i = ", i); |
| } |
| ASSERT_FALSE(comparator->Compare(this->values_[i], this->values_out_[i])); |
| ASSERT_FALSE(comparator->Compare(this->values_out_[i], this->values_[i])); |
| } |
| ASSERT_EQ(this->values_, this->values_out_); |
| } |
| |
| int64_t metadata_num_values() { |
| // Metadata accessor must be created lazily. |
| // This is because the ColumnChunkMetaData semantics dictate the metadata object is |
| // complete (no changes to the metadata buffer can be made after instantiation) |
| auto metadata_accessor = |
| ColumnChunkMetaData::Make(metadata_->contents(), this->descr_); |
| return metadata_accessor->num_values(); |
| } |
| |
| bool metadata_is_stats_set() { |
| // Metadata accessor must be created lazily. |
| // This is because the ColumnChunkMetaData semantics dictate the metadata object is |
| // complete (no changes to the metadata buffer can be made after instantiation) |
| ApplicationVersion app_version(this->writer_properties_->created_by()); |
| auto metadata_accessor = ColumnChunkMetaData::Make( |
| metadata_->contents(), this->descr_, default_reader_properties(), &app_version); |
| return metadata_accessor->is_stats_set(); |
| } |
| |
| std::pair<bool, bool> metadata_stats_has_min_max() { |
| // Metadata accessor must be created lazily. |
| // This is because the ColumnChunkMetaData semantics dictate the metadata object is |
| // complete (no changes to the metadata buffer can be made after instantiation) |
| ApplicationVersion app_version(this->writer_properties_->created_by()); |
| auto metadata_accessor = ColumnChunkMetaData::Make( |
| metadata_->contents(), this->descr_, default_reader_properties(), &app_version); |
| auto encoded_stats = metadata_accessor->statistics()->Encode(); |
| return {encoded_stats.has_min, encoded_stats.has_max}; |
| } |
| |
| std::vector<Encoding::type> metadata_encodings() { |
| // Metadata accessor must be created lazily. |
| // This is because the ColumnChunkMetaData semantics dictate the metadata object is |
| // complete (no changes to the metadata buffer can be made after instantiation) |
| auto metadata_accessor = |
| ColumnChunkMetaData::Make(metadata_->contents(), this->descr_); |
| return metadata_accessor->encodings(); |
| } |
| |
| std::vector<parquet::PageEncodingStats> metadata_encoding_stats() { |
| // Metadata accessor must be created lazily. |
| // This is because the ColumnChunkMetaData semantics dictate the metadata object is |
| // complete (no changes to the metadata buffer can be made after instantiation) |
| auto metadata_accessor = |
| ColumnChunkMetaData::Make(metadata_->contents(), this->descr_); |
| return metadata_accessor->encoding_stats(); |
| } |
| |
| std::shared_ptr<const KeyValueMetadata> metadata_key_value_metadata() { |
| // Metadata accessor must be created lazily. |
| // This is because the ColumnChunkMetaData semantics dictate the metadata object is |
| // complete (no changes to the metadata buffer can be made after instantiation) |
| auto metadata_accessor = |
| ColumnChunkMetaData::Make(metadata_->contents(), this->descr_); |
| return metadata_accessor->key_value_metadata(); |
| } |
| |
| std::unique_ptr<ColumnChunkMetaData> metadata_accessor() { |
| // Metadata accessor must be created lazily. |
| // This is because the ColumnChunkMetaData semantics dictate the metadata object is |
| // complete (no changes to the metadata buffer can be made after instantiation) |
| ApplicationVersion app_version(this->writer_properties_->created_by()); |
| return ColumnChunkMetaData::Make(metadata_->contents(), this->descr_, |
| default_reader_properties(), &app_version); |
| } |
| |
| EncodedStatistics metadata_encoded_stats() { return metadata_stats()->Encode(); } |
| |
| std::shared_ptr<Statistics> metadata_stats() { |
| return metadata_accessor()->statistics(); |
| } |
| |
| std::shared_ptr<geospatial::GeoStatistics> metadata_geo_stats() { |
| return metadata_accessor()->geo_statistics(); |
| } |
| |
| protected: |
| int64_t values_read_; |
| // Keep the reader alive as for ByteArray the lifetime of the ByteArray |
| // content is bound to the reader. |
| std::shared_ptr<TypedColumnReader<TestType>> reader_; |
| |
| std::vector<int16_t> definition_levels_out_; |
| std::vector<int16_t> repetition_levels_out_; |
| |
| const ColumnDescriptor* descr_; |
| |
| protected: |
| std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_; |
| std::shared_ptr<::arrow::io::BufferOutputStream> sink_; |
| std::shared_ptr<WriterProperties> writer_properties_; |
| std::vector<std::vector<uint8_t>> data_buffer_; |
| }; |
| |
| template <typename TestType> |
| void TestPrimitiveWriter<TestType>::ReadColumnFully(Compression::type compression, |
| bool page_checksum_verify) { |
| int64_t total_values = static_cast<int64_t>(this->values_out_.size()); |
| BuildReader(total_values, compression, page_checksum_verify); |
| values_read_ = 0; |
| while (values_read_ < total_values) { |
| int64_t values_read_recently = 0; |
| reader_->ReadBatch( |
| static_cast<int>(this->values_out_.size()) - static_cast<int>(values_read_), |
| definition_levels_out_.data() + values_read_, |
| repetition_levels_out_.data() + values_read_, |
| this->values_out_ptr_ + values_read_, &values_read_recently); |
| values_read_ += values_read_recently; |
| } |
| this->SyncValuesOut(); |
| } |
| |
| template <> |
| void TestPrimitiveWriter<Int96Type>::ReadAndCompare(Compression::type compression, |
| int64_t num_rows, |
| bool page_checksum_verify) { |
| this->SetupValuesOut(num_rows); |
| this->ReadColumnFully(compression, page_checksum_verify); |
| |
| auto comparator = MakeComparator<Int96Type>(Type::INT96, SortOrder::SIGNED); |
| for (size_t i = 0; i < this->values_.size(); i++) { |
| if (comparator->Compare(this->values_[i], this->values_out_[i]) || |
| comparator->Compare(this->values_out_[i], this->values_[i])) { |
| ARROW_SCOPED_TRACE("i = ", i); |
| } |
| ASSERT_FALSE(comparator->Compare(this->values_[i], this->values_out_[i])); |
| ASSERT_FALSE(comparator->Compare(this->values_out_[i], this->values_[i])); |
| } |
| ASSERT_EQ(this->values_, this->values_out_); |
| } |
| |
| template <> |
| void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compression, |
| bool page_checksum_verify) { |
| int64_t total_values = static_cast<int64_t>(this->values_out_.size()); |
| BuildReader(total_values, compression, page_checksum_verify); |
| this->data_buffer_.clear(); |
| |
| values_read_ = 0; |
| while (values_read_ < total_values) { |
| int64_t values_read_recently = 0; |
| reader_->ReadBatch( |
| static_cast<int>(this->values_out_.size()) - static_cast<int>(values_read_), |
| definition_levels_out_.data() + values_read_, |
| repetition_levels_out_.data() + values_read_, |
| this->values_out_ptr_ + values_read_, &values_read_recently); |
| |
| // Copy contents of the pointers |
| std::vector<uint8_t> data(values_read_recently * this->descr_->type_length()); |
| uint8_t* data_ptr = data.data(); |
| for (int64_t i = 0; i < values_read_recently; i++) { |
| memcpy(data_ptr + this->descr_->type_length() * i, |
| this->values_out_[i + values_read_].ptr, this->descr_->type_length()); |
| this->values_out_[i + values_read_].ptr = |
| data_ptr + this->descr_->type_length() * i; |
| } |
| data_buffer_.emplace_back(std::move(data)); |
| |
| values_read_ += values_read_recently; |
| } |
| this->SyncValuesOut(); |
| } |
| |
| template <> |
| void TestPrimitiveWriter<ByteArrayType>::ReadColumnFully(Compression::type compression, |
| bool page_checksum_verify) { |
| int64_t total_values = static_cast<int64_t>(this->values_out_.size()); |
| BuildReader(total_values, compression, page_checksum_verify); |
| this->data_buffer_.clear(); |
| |
| values_read_ = 0; |
| while (values_read_ < total_values) { |
| int64_t values_read_recently = 0; |
| reader_->ReadBatch( |
| static_cast<int>(this->values_out_.size()) - static_cast<int>(values_read_), |
| definition_levels_out_.data() + values_read_, |
| repetition_levels_out_.data() + values_read_, |
| this->values_out_ptr_ + values_read_, &values_read_recently); |
| |
| // Compute the total length of the data |
| int64_t total_length = 0; |
| for (int64_t i = 0; i < values_read_recently; i++) { |
| total_length += this->values_out_[i + values_read_].len; |
| } |
| |
| // Copy contents of the pointers |
| std::vector<uint8_t> data(total_length); |
| uint8_t* data_ptr = data.data(); |
| for (int64_t i = 0; i < values_read_recently; i++) { |
| const ByteArray& value = this->values_out_ptr_[i + values_read_]; |
| memcpy(data_ptr, value.ptr, value.len); |
| this->values_out_[i + values_read_].ptr = data_ptr; |
| data_ptr += value.len; |
| } |
| data_buffer_.emplace_back(std::move(data)); |
| |
| values_read_ += values_read_recently; |
| } |
| this->SyncValuesOut(); |
| } |
| |
| typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType, |
| BooleanType, ByteArrayType, FLBAType> |
| TestTypes; |
| |
| TYPED_TEST_SUITE(TestPrimitiveWriter, TestTypes); |
| |
| using TestValuesWriterInt32Type = TestPrimitiveWriter<Int32Type>; |
| using TestValuesWriterInt64Type = TestPrimitiveWriter<Int64Type>; |
| using TestByteArrayValuesWriter = TestPrimitiveWriter<ByteArrayType>; |
| using TestFixedLengthByteArrayValuesWriter = TestPrimitiveWriter<FLBAType>; |
| |
| using ::testing::HasSubstr; |
| |
| TYPED_TEST(TestPrimitiveWriter, RequiredPlain) { |
| this->TestRequiredWithEncoding(Encoding::PLAIN); |
| } |
| |
| TYPED_TEST(TestPrimitiveWriter, RequiredDictionary) { |
| this->TestRequiredWithEncoding(Encoding::PLAIN_DICTIONARY); |
| } |
| |
| /* |
| TYPED_TEST(TestPrimitiveWriter, RequiredRLE) { |
| this->TestRequiredWithEncoding(Encoding::RLE); |
| } |
| |
| TYPED_TEST(TestPrimitiveWriter, RequiredBitPacked) { |
| this->TestRequiredWithEncoding(Encoding::BIT_PACKED); |
| } |
| */ |
| |
| TEST_F(TestValuesWriterInt32Type, RequiredDeltaBinaryPacked) { |
| this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED); |
| } |
| |
| TEST_F(TestValuesWriterInt32Type, RequiredByteStreamSplit) { |
| this->TestRequiredWithEncoding(Encoding::BYTE_STREAM_SPLIT); |
| } |
| |
| TEST_F(TestValuesWriterInt64Type, RequiredDeltaBinaryPacked) { |
| this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED); |
| } |
| |
| TEST_F(TestValuesWriterInt64Type, RequiredByteStreamSplit) { |
| this->TestRequiredWithEncoding(Encoding::BYTE_STREAM_SPLIT); |
| } |
| |
| TEST_F(TestByteArrayValuesWriter, RequiredDeltaLengthByteArray) { |
| this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY); |
| } |
| |
| TEST_F(TestByteArrayValuesWriter, RequiredDeltaByteArray) { |
| this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY); |
| } |
| |
| TEST_F(TestFixedLengthByteArrayValuesWriter, RequiredDeltaByteArray) { |
| this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY); |
| } |
| |
| TEST_F(TestFixedLengthByteArrayValuesWriter, RequiredByteStreamSplit) { |
| this->TestRequiredWithEncoding(Encoding::BYTE_STREAM_SPLIT); |
| } |
| |
| TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) { |
| this->TestRequiredWithEncoding(Encoding::RLE_DICTIONARY); |
| } |
| |
| TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStats) { |
| this->TestRequiredWithSettings(Encoding::PLAIN, Compression::UNCOMPRESSED, false, true, |
| LARGE_SIZE); |
| } |
| |
| #ifdef ARROW_WITH_SNAPPY |
| TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithSnappyCompression) { |
| this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, false, |
| LARGE_SIZE); |
| } |
| |
| TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndSnappyCompression) { |
| this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, true, |
| LARGE_SIZE); |
| } |
| #endif |
| |
| #ifdef ARROW_WITH_BROTLI |
| TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithBrotliCompression) { |
| this->TestRequiredWithSettings(Encoding::PLAIN, Compression::BROTLI, false, false, |
| LARGE_SIZE); |
| } |
| |
| TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithBrotliCompressionAndLevel) { |
| this->TestRequiredWithSettings(Encoding::PLAIN, Compression::BROTLI, false, false, |
| LARGE_SIZE, 10); |
| } |
| |
| TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndBrotliCompression) { |
| this->TestRequiredWithSettings(Encoding::PLAIN, Compression::BROTLI, false, true, |
| LARGE_SIZE); |
| } |
| |
| #endif |
| |
| #ifdef ARROW_WITH_ZLIB |
| TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompression) { |
| this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, false, |
| LARGE_SIZE); |
| } |
| |
| TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompressionAndLevel) { |
| this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, false, |
| LARGE_SIZE, 10); |
| } |
| |
| TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndGzipCompression) { |
| this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, true, |
| LARGE_SIZE); |
| } |
| |
| TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCodecOptions) { |
| auto codec_options = std::make_shared<::arrow::util::GZipCodecOptions>(); |
| codec_options->gzip_format = ::arrow::util::GZipFormat::GZIP; |
| codec_options->window_bits = 12; |
| this->TestRequiredWithCodecOptions(Encoding::PLAIN, Compression::GZIP, false, false, |
| LARGE_SIZE, codec_options); |
| } |
| #endif |
| |
| #ifdef ARROW_WITH_LZ4 |
| TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithLz4Compression) { |
| this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, false, |
| LARGE_SIZE); |
| } |
| |
| TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndLz4Compression) { |
| this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, true, |
| LARGE_SIZE); |
| } |
| #endif |
| |
| #ifdef ARROW_WITH_ZSTD |
| TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithZstdCompression) { |
| this->TestRequiredWithSettings(Encoding::PLAIN, Compression::ZSTD, false, false, |
| LARGE_SIZE); |
| } |
| |
| TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithZstdCompressionAndLevel) { |
| this->TestRequiredWithSettings(Encoding::PLAIN, Compression::ZSTD, false, false, |
| LARGE_SIZE, 6); |
| } |
| |
| TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndZstdCompression) { |
| this->TestRequiredWithSettings(Encoding::PLAIN, Compression::ZSTD, false, true, |
| LARGE_SIZE); |
| } |
| TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithZstdCodecOptions) { |
| constexpr int ZSTD_c_windowLog = 101; |
| auto codec_options = std::make_shared<::arrow::util::ZstdCodecOptions>(); |
| codec_options->compression_context_params = {{ZSTD_c_windowLog, 23}}; |
| this->TestRequiredWithCodecOptions(Encoding::PLAIN, Compression::ZSTD, false, false, |
| LARGE_SIZE, codec_options); |
| } |
| #endif |
| |
| TYPED_TEST(TestPrimitiveWriter, Optional) { |
| // Optional and non-repeated, with definition levels |
| // but no repetition levels |
| this->SetUpSchema(Repetition::OPTIONAL); |
| |
| this->GenerateData(SMALL_SIZE); |
| std::vector<int16_t> definition_levels(SMALL_SIZE, 1); |
| definition_levels[1] = 0; |
| |
| auto writer = this->BuildWriter(); |
| writer->WriteBatch(this->values_.size(), definition_levels.data(), nullptr, |
| this->values_ptr_); |
| writer->Close(); |
| |
| // PARQUET-703 |
| ASSERT_EQ(100, this->metadata_num_values()); |
| |
| this->ReadColumn(); |
| ASSERT_EQ(99, this->values_read_); |
| this->values_out_.resize(99); |
| this->values_.resize(99); |
| ASSERT_EQ(this->values_, this->values_out_); |
| } |
| |
| TYPED_TEST(TestPrimitiveWriter, OptionalSpaced) { |
| // Optional and non-repeated, with definition levels |
| // but no repetition levels |
| this->SetUpSchema(Repetition::OPTIONAL); |
| |
| this->GenerateData(SMALL_SIZE); |
| std::vector<int16_t> definition_levels(SMALL_SIZE, 1); |
| std::vector<uint8_t> valid_bits(::arrow::bit_util::BytesForBits(SMALL_SIZE), 255); |
| |
| definition_levels[SMALL_SIZE - 1] = 0; |
| ::arrow::bit_util::ClearBit(valid_bits.data(), SMALL_SIZE - 1); |
| definition_levels[1] = 0; |
| ::arrow::bit_util::ClearBit(valid_bits.data(), 1); |
| |
| auto writer = this->BuildWriter(); |
| writer->WriteBatchSpaced(this->values_.size(), definition_levels.data(), nullptr, |
| valid_bits.data(), 0, this->values_ptr_); |
| writer->Close(); |
| |
| // PARQUET-703 |
| ASSERT_EQ(100, this->metadata_num_values()); |
| |
| this->ReadColumn(); |
| ASSERT_EQ(98, this->values_read_); |
| this->values_out_.resize(98); |
| this->values_.resize(99); |
| this->values_.erase(this->values_.begin() + 1); |
| ASSERT_EQ(this->values_, this->values_out_); |
| } |
| |
| TYPED_TEST(TestPrimitiveWriter, Repeated) { |
| // Optional and repeated, so definition and repetition levels |
| this->SetUpSchema(Repetition::REPEATED); |
| |
| this->GenerateData(SMALL_SIZE); |
| std::vector<int16_t> definition_levels(SMALL_SIZE, 1); |
| definition_levels[1] = 0; |
| std::vector<int16_t> repetition_levels(SMALL_SIZE, 0); |
| |
| auto writer = this->BuildWriter(); |
| writer->WriteBatch(this->values_.size(), definition_levels.data(), |
| repetition_levels.data(), this->values_ptr_); |
| writer->Close(); |
| |
| this->ReadColumn(); |
| ASSERT_EQ(SMALL_SIZE - 1, this->values_read_); |
| this->values_out_.resize(SMALL_SIZE - 1); |
| this->values_.resize(SMALL_SIZE - 1); |
| ASSERT_EQ(this->values_, this->values_out_); |
| } |
| |
| TYPED_TEST(TestPrimitiveWriter, RequiredLargeChunk) { |
| this->GenerateData(LARGE_SIZE); |
| |
| // Test case 1: required and non-repeated, so no definition or repetition levels |
| auto writer = this->BuildWriter(LARGE_SIZE); |
| writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); |
| writer->Close(); |
| |
| // Just read the first SMALL_SIZE rows to ensure we could read it back in |
| this->ReadColumn(); |
| ASSERT_EQ(SMALL_SIZE, this->values_read_); |
| this->values_.resize(SMALL_SIZE); |
| ASSERT_EQ(this->values_, this->values_out_); |
| } |
| |
| // Test cases for dictionary fallback encoding |
| TYPED_TEST(TestPrimitiveWriter, DictionaryFallbackVersion1_0) { |
| this->TestDictionaryFallbackEncoding(ParquetVersion::PARQUET_1_0, |
| ParquetDataPageVersion::V1); |
| } |
| |
| TYPED_TEST(TestPrimitiveWriter, DictionaryFallbackVersion2_0) { |
| this->TestDictionaryFallbackEncoding(ParquetVersion::PARQUET_2_4, |
| ParquetDataPageVersion::V1); |
| this->TestDictionaryFallbackEncoding(ParquetVersion::PARQUET_2_4, |
| ParquetDataPageVersion::V2); |
| this->TestDictionaryFallbackEncoding(ParquetVersion::PARQUET_2_6, |
| ParquetDataPageVersion::V1); |
| this->TestDictionaryFallbackEncoding(ParquetVersion::PARQUET_2_6, |
| ParquetDataPageVersion::V2); |
| } |
| |
| TEST(TestWriter, NullValuesBuffer) { |
| std::shared_ptr<::arrow::io::BufferOutputStream> sink = CreateOutputStream(); |
| |
| const auto item_node = schema::PrimitiveNode::Make( |
| "item", Repetition::REQUIRED, LogicalType::Int(32, true), Type::INT32); |
| const auto list_node = |
| schema::GroupNode::Make("list", Repetition::REPEATED, {item_node}); |
| const auto column_node = schema::GroupNode::Make( |
| "array_of_ints_column", Repetition::OPTIONAL, {list_node}, LogicalType::List()); |
| const auto schema_node = |
| schema::GroupNode::Make("schema", Repetition::REQUIRED, {column_node}); |
| |
| auto file_writer = ParquetFileWriter::Open( |
| sink, std::dynamic_pointer_cast<schema::GroupNode>(schema_node)); |
| auto group_writer = file_writer->AppendRowGroup(); |
| auto column_writer = group_writer->NextColumn(); |
| auto typed_writer = dynamic_cast<Int32Writer*>(column_writer); |
| |
| const int64_t num_values = 1; |
| const int16_t def_levels[] = {0}; |
| const int16_t rep_levels[] = {0}; |
| const uint8_t valid_bits[] = {0}; |
| const int64_t valid_bits_offset = 0; |
| const int32_t* values = nullptr; |
| |
| typed_writer->WriteBatchSpaced(num_values, def_levels, rep_levels, valid_bits, |
| valid_bits_offset, values); |
| } |
| |
| TYPED_TEST(TestPrimitiveWriter, RequiredPlainChecksum) { |
| this->TestRequiredWithSettings(Encoding::PLAIN, Compression::UNCOMPRESSED, |
| /* enable_dictionary */ false, false, SMALL_SIZE, |
| Codec::UseDefaultCompressionLevel(), |
| /* enable_checksum */ true); |
| } |
| |
| TYPED_TEST(TestPrimitiveWriter, RequiredDictChecksum) { |
| this->TestRequiredWithSettings(Encoding::PLAIN, Compression::UNCOMPRESSED, |
| /* enable_dictionary */ true, false, SMALL_SIZE, |
| Codec::UseDefaultCompressionLevel(), |
| /* enable_checksum */ true); |
| } |
| |
| // PARQUET-719 |
| // Test case for NULL values |
| TEST_F(TestValuesWriterInt32Type, OptionalNullValueChunk) { |
| this->SetUpSchema(Repetition::OPTIONAL); |
| |
| this->GenerateData(LARGE_SIZE); |
| |
| std::vector<int16_t> definition_levels(LARGE_SIZE, 0); |
| std::vector<int16_t> repetition_levels(LARGE_SIZE, 0); |
| |
| auto writer = this->BuildWriter(LARGE_SIZE); |
| // All values being written are NULL |
| writer->WriteBatch(this->values_.size(), definition_levels.data(), |
| repetition_levels.data(), nullptr); |
| writer->Close(); |
| |
| // Just read the first SMALL_SIZE rows to ensure we could read it back in |
| this->ReadColumn(); |
| ASSERT_EQ(0, this->values_read_); |
| } |
| |
| class TestBooleanValuesWriter : public TestPrimitiveWriter<BooleanType> { |
| public: |
| void TestWithEncoding(ParquetVersion::type version, |
| ParquetDataPageVersion data_page_version, |
| const std::set<Encoding::type>& expected_encodings) { |
| this->SetUpSchema(Repetition::REQUIRED); |
| auto writer = |
| this->BuildWriter(SMALL_SIZE, ColumnProperties(), version, data_page_version, |
| /*enable_checksum*/ false); |
| for (int i = 0; i < SMALL_SIZE; i++) { |
| bool value = (i % 2 == 0) ? true : false; |
| writer->WriteBatch(1, nullptr, nullptr, &value); |
| } |
| writer->Close(); |
| this->ReadColumn(); |
| for (int i = 0; i < SMALL_SIZE; i++) { |
| ASSERT_EQ((i % 2 == 0) ? true : false, this->values_out_[i]) << i; |
| } |
| auto metadata_encodings = this->metadata_encodings(); |
| std::set<Encoding::type> metadata_encodings_set{metadata_encodings.begin(), |
| metadata_encodings.end()}; |
| EXPECT_EQ(expected_encodings, metadata_encodings_set); |
| } |
| }; |
| |
| // PARQUET-764 |
| // Correct bitpacking for boolean write at non-byte boundaries |
| TEST_F(TestBooleanValuesWriter, AlternateBooleanValues) { |
| for (auto data_page_version : |
| {ParquetDataPageVersion::V1, ParquetDataPageVersion::V2}) { |
| TestWithEncoding(ParquetVersion::PARQUET_1_0, data_page_version, |
| {Encoding::PLAIN, Encoding::RLE}); |
| } |
| } |
| |
| // Default encoding for boolean is RLE when both V2 format and V2 pages enabled. |
| TEST_F(TestBooleanValuesWriter, RleEncodedBooleanValues) { |
| TestWithEncoding(ParquetVersion::PARQUET_2_4, ParquetDataPageVersion::V1, |
| {Encoding::PLAIN, Encoding::RLE}); |
| TestWithEncoding(ParquetVersion::PARQUET_2_4, ParquetDataPageVersion::V2, |
| {Encoding::RLE}); |
| } |
| |
| // PARQUET-979 |
| // Prevent writing large MIN, MAX stats |
| TEST_F(TestByteArrayValuesWriter, OmitStats) { |
| int min_len = 1024 * 4; |
| int max_len = 1024 * 8; |
| this->SetUpSchema(Repetition::REQUIRED); |
| auto writer = this->BuildWriter(); |
| |
| values_.resize(SMALL_SIZE); |
| InitWideByteArrayValues(SMALL_SIZE, this->values_, this->buffer_, min_len, max_len); |
| writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_.data()); |
| writer->Close(); |
| |
| auto has_min_max = this->metadata_stats_has_min_max(); |
| ASSERT_FALSE(has_min_max.first); |
| ASSERT_FALSE(has_min_max.second); |
| } |
| |
| // PARQUET-1405 |
| // Prevent writing large stats in the DataPageHeader |
| TEST_F(TestByteArrayValuesWriter, OmitDataPageStats) { |
| int min_len = static_cast<int>(std::pow(10, 7)); |
| int max_len = static_cast<int>(std::pow(10, 7)); |
| this->SetUpSchema(Repetition::REQUIRED); |
| ColumnProperties column_properties; |
| column_properties.set_statistics_enabled(false); |
| auto writer = this->BuildWriter(SMALL_SIZE, column_properties); |
| |
| values_.resize(1); |
| InitWideByteArrayValues(1, this->values_, this->buffer_, min_len, max_len); |
| writer->WriteBatch(1, nullptr, nullptr, this->values_.data()); |
| writer->Close(); |
| |
| ASSERT_NO_THROW(this->ReadColumn()); |
| } |
| |
| TEST_F(TestByteArrayValuesWriter, LimitStats) { |
| int min_len = 1024 * 4; |
| int max_len = 1024 * 8; |
| this->SetUpSchema(Repetition::REQUIRED); |
| ColumnProperties column_properties; |
| column_properties.set_max_statistics_size(static_cast<size_t>(max_len)); |
| auto writer = this->BuildWriter(SMALL_SIZE, column_properties); |
| |
| values_.resize(SMALL_SIZE); |
| InitWideByteArrayValues(SMALL_SIZE, this->values_, this->buffer_, min_len, max_len); |
| writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_.data()); |
| writer->Close(); |
| |
| ASSERT_TRUE(this->metadata_is_stats_set()); |
| } |
| |
| TEST_F(TestByteArrayValuesWriter, CheckDefaultStats) { |
| this->SetUpSchema(Repetition::REQUIRED); |
| auto writer = this->BuildWriter(); |
| this->GenerateData(SMALL_SIZE); |
| |
| writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_ptr_); |
| writer->Close(); |
| |
| ASSERT_TRUE(this->metadata_is_stats_set()); |
| } |
| |
| // Test for https://github.com/apache/arrow/issues/47027. |
| // When writing a repeated column with page indexes enabled |
| // and batches that are aligned with list boundaries, |
| // pages should be written after reaching the page limit. |
| TEST_F(TestValuesWriterInt32Type, PagesSplitWithListAlignedWrites) { |
| this->SetUpSchema(Repetition::REPEATED); |
| |
| constexpr int list_length = 10; |
| constexpr int num_rows = 100; |
| constexpr int64_t page_size = sizeof(int32_t) * 100; |
| |
| this->GenerateData(num_rows * list_length); |
| |
| std::vector<int16_t> repetition_levels(list_length, 1); |
| repetition_levels[0] = 0; |
| |
| ColumnProperties column_properties; |
| column_properties.set_dictionary_enabled(false); |
| column_properties.set_encoding(Encoding::PLAIN); |
| column_properties.set_page_index_enabled(true); |
| |
| auto writer = |
| this->BuildWriter(list_length, column_properties, ParquetVersion::PARQUET_1_0, |
| ParquetDataPageVersion::V1, false, page_size); |
| |
| int64_t pages_written = 0; |
| int64_t prev_bytes_written = 0; |
| |
| for (int row_idx = 0; row_idx < num_rows; ++row_idx) { |
| writer->WriteBatch(list_length, def_levels_.data(), repetition_levels.data(), |
| values_ptr_ + row_idx * list_length); |
| |
| int64_t bytes_written = writer->total_bytes_written(); |
| if (bytes_written != prev_bytes_written) { |
| pages_written++; |
| prev_bytes_written = bytes_written; |
| } |
| // Buffered bytes shouldn't grow larger than the specified page size |
| ASSERT_LE(writer->estimated_buffered_value_bytes(), page_size); |
| } |
| |
| writer->Close(); |
| |
| // pages_written doesn't include the last page written when closing the writer: |
| ASSERT_EQ(pages_written, 9); |
| |
| this->SetupValuesOut(num_rows * list_length); |
| definition_levels_out_.resize(num_rows * list_length); |
| repetition_levels_out_.resize(num_rows * list_length); |
| this->ReadColumnFully(); |
| |
| ASSERT_EQ(values_out_, values_); |
| } |
| |
| // Test writing a dictionary encoded page where the number of |
| // bits is greater than max int32. |
| // For https://github.com/apache/arrow/issues/47973 |
| TEST(TestColumnWriter, LARGE_MEMORY_TEST(WriteLargeDictEncodedPage)) { |
| auto sink = CreateOutputStream(); |
| auto schema = std::static_pointer_cast<GroupNode>( |
| GroupNode::Make("schema", Repetition::REQUIRED, |
| { |
| PrimitiveNode::Make("item", Repetition::REQUIRED, Type::INT32), |
| })); |
| auto properties = |
| WriterProperties::Builder().data_pagesize(1024 * 1024 * 1024)->build(); |
| auto file_writer = ParquetFileWriter::Open(sink, schema, properties); |
| auto rg_writer = file_writer->AppendRowGroup(); |
| |
| constexpr int64_t num_batches = 150; |
| constexpr int64_t batch_size = 1'000'000; |
| constexpr int64_t unique_count = 200'000; |
| static_assert(batch_size % unique_count == 0); |
| |
| std::vector<int32_t> values(batch_size, 0); |
| for (int64_t i = 0; i < batch_size; i++) { |
| values[i] = static_cast<int32_t>(i % unique_count); |
| } |
| |
| auto col_writer = dynamic_cast<parquet::Int32Writer*>(rg_writer->NextColumn()); |
| for (int64_t i = 0; i < num_batches; i++) { |
| col_writer->WriteBatch(batch_size, nullptr, nullptr, values.data()); |
| } |
| file_writer->Close(); |
| |
| ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); |
| |
| auto file_reader = ParquetFileReader::Open( |
| std::make_shared<::arrow::io::BufferReader>(buffer), default_reader_properties()); |
| auto metadata = file_reader->metadata(); |
| ASSERT_EQ(1, metadata->num_row_groups()); |
| auto row_group_reader = file_reader->RowGroup(0); |
| |
| // Verify page size property was applied and only 1 data page was written |
| auto page_reader = row_group_reader->GetColumnPageReader(0); |
| int64_t page_count = 0; |
| while (true) { |
| auto page = page_reader->NextPage(); |
| if (page == nullptr) { |
| break; |
| } |
| if (page_count == 0) { |
| ASSERT_EQ(page->type(), PageType::DICTIONARY_PAGE); |
| } else { |
| ASSERT_EQ(page->type(), PageType::DATA_PAGE); |
| } |
| page_count++; |
| } |
| ASSERT_EQ(page_count, 2); |
| |
| auto col_reader = std::static_pointer_cast<Int32Reader>(row_group_reader->Column(0)); |
| |
| constexpr int64_t buffer_size = 1024 * 1024; |
| values.resize(buffer_size); |
| |
| // Verify values were round-tripped correctly |
| int64_t levels_read = 0; |
| while (levels_read < num_batches * batch_size) { |
| int64_t batch_values; |
| int64_t batch_levels = col_reader->ReadBatch(buffer_size, nullptr, nullptr, |
| values.data(), &batch_values); |
| for (int64_t i = 0; i < batch_levels; i++) { |
| ASSERT_EQ(values[i], (levels_read + i) % unique_count); |
| } |
| levels_read += batch_levels; |
| } |
| } |
| |
| TEST(TestColumnWriter, LARGE_MEMORY_TEST(ThrowsOnDictIndicesTooLarge)) { |
| auto sink = CreateOutputStream(); |
| auto schema = std::static_pointer_cast<GroupNode>( |
| GroupNode::Make("schema", Repetition::REQUIRED, |
| { |
| PrimitiveNode::Make("item", Repetition::REQUIRED, Type::INT32), |
| })); |
| auto properties = |
| WriterProperties::Builder().data_pagesize(4 * 1024LL * 1024 * 1024)->build(); |
| auto file_writer = ParquetFileWriter::Open(sink, schema, properties); |
| auto rg_writer = file_writer->AppendRowGroup(); |
| |
| constexpr int64_t num_batches = 1'000; |
| constexpr int64_t batch_size = 1'000'000; |
| constexpr int64_t unique_count = 200'000; |
| static_assert(batch_size % unique_count == 0); |
| |
| std::vector<int32_t> values(batch_size, 0); |
| for (int64_t i = 0; i < batch_size; i++) { |
| values[i] = static_cast<int32_t>(i % unique_count); |
| } |
| |
| auto col_writer = dynamic_cast<parquet::Int32Writer*>(rg_writer->NextColumn()); |
| for (int64_t i = 0; i < num_batches; i++) { |
| col_writer->WriteBatch(batch_size, nullptr, nullptr, values.data()); |
| } |
| |
| EXPECT_THROW_THAT( |
| [&]() { file_writer->Close(); }, ParquetException, |
| ::testing::Property(&ParquetException::what, |
| ::testing::HasSubstr("exceeds maximum int value"))); |
| } |
| |
| TEST(TestPageWriter, ThrowsOnPagesTooLarge) { |
| NodePtr item = schema::Int32("item"); // optional item |
| NodePtr list(GroupNode::Make("b", Repetition::REPEATED, {item}, ConvertedType::LIST)); |
| NodePtr bag(GroupNode::Make("bag", Repetition::OPTIONAL, {list})); // optional list |
| std::vector<NodePtr> fields = {bag}; |
| NodePtr root = GroupNode::Make("schema", Repetition::REPEATED, fields); |
| |
| SchemaDescriptor schema; |
| schema.Init(root); |
| |
| auto sink = CreateOutputStream(); |
| auto props = WriterProperties::Builder().build(); |
| |
| auto metadata = ColumnChunkMetaDataBuilder::Make(props, schema.Column(0)); |
| std::unique_ptr<PageWriter> pager = |
| PageWriter::Open(sink, Compression::UNCOMPRESSED, metadata.get()); |
| |
| uint8_t data; |
| std::shared_ptr<Buffer> buffer = |
| std::make_shared<Buffer>(&data, std::numeric_limits<int32_t>::max() + int64_t{1}); |
| DataPageV1 over_compressed_limit(buffer, /*num_values=*/100, Encoding::BIT_PACKED, |
| Encoding::BIT_PACKED, Encoding::BIT_PACKED, |
| /*uncompressed_size=*/100); |
| EXPECT_THROW_THAT([&]() { pager->WriteDataPage(over_compressed_limit); }, |
| ParquetException, |
| ::testing::Property(&ParquetException::what, |
| ::testing::HasSubstr("overflows INT32_MAX"))); |
| DictionaryPage dictionary_over_compressed_limit(buffer, /*num_values=*/100, |
| Encoding::PLAIN); |
| EXPECT_THROW_THAT( |
| [&]() { pager->WriteDictionaryPage(dictionary_over_compressed_limit); }, |
| ParquetException, |
| ::testing::Property(&ParquetException::what, |
| ::testing::HasSubstr("overflows INT32_MAX"))); |
| |
| buffer = std::make_shared<Buffer>(&data, 1); |
| DataPageV1 over_uncompressed_limit( |
| buffer, /*num_values=*/100, Encoding::BIT_PACKED, Encoding::BIT_PACKED, |
| Encoding::BIT_PACKED, |
| /*uncompressed_size=*/std::numeric_limits<int32_t>::max() + int64_t{1}); |
| EXPECT_THROW_THAT([&]() { pager->WriteDataPage(over_compressed_limit); }, |
| ParquetException, |
| ::testing::Property(&ParquetException::what, |
| ::testing::HasSubstr("overflows INT32_MAX"))); |
| } |
| |
| TEST(TestColumnWriter, RepeatedListsUpdateSpacedBug) { |
| // In ARROW-3930 we discovered a bug when writing from Arrow when we had data |
| // that looks like this: |
| // |
| // [null, [0, 1, null, 2, 3, 4, null]] |
| |
| // Create schema |
| NodePtr item = schema::Int32("item"); // optional item |
| NodePtr list(GroupNode::Make("b", Repetition::REPEATED, {item}, ConvertedType::LIST)); |
| NodePtr bag(GroupNode::Make("bag", Repetition::OPTIONAL, {list})); // optional list |
| std::vector<NodePtr> fields = {bag}; |
| NodePtr root = GroupNode::Make("schema", Repetition::REPEATED, fields); |
| |
| SchemaDescriptor schema; |
| schema.Init(root); |
| |
| auto sink = CreateOutputStream(); |
| auto props = WriterProperties::Builder().build(); |
| |
| auto metadata = ColumnChunkMetaDataBuilder::Make(props, schema.Column(0)); |
| std::unique_ptr<PageWriter> pager = |
| PageWriter::Open(sink, Compression::UNCOMPRESSED, metadata.get()); |
| std::shared_ptr<ColumnWriter> writer = |
| ColumnWriter::Make(metadata.get(), std::move(pager), props.get()); |
| auto typed_writer = std::static_pointer_cast<TypedColumnWriter<Int32Type>>(writer); |
| |
| std::vector<int16_t> def_levels = {1, 3, 3, 2, 3, 3, 3, 2, 3, 3, 3, 2, 3, 3}; |
| std::vector<int16_t> rep_levels = {0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}; |
| std::vector<int32_t> values = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}; |
| |
| // Write the values into uninitialized memory |
| ASSERT_OK_AND_ASSIGN(auto values_buffer, ::arrow::AllocateBuffer(64)); |
| memcpy(values_buffer->mutable_data(), values.data(), 13 * sizeof(int32_t)); |
| auto values_data = reinterpret_cast<const int32_t*>(values_buffer->data()); |
| |
| std::shared_ptr<Buffer> valid_bits; |
| std::vector<uint8_t> bitmap_bytes = {1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1}; |
| ASSERT_OK_AND_ASSIGN(valid_bits, ::arrow::internal::BytesToBits(bitmap_bytes)); |
| |
| // valgrind will warn about out of bounds access into def_levels_data |
| typed_writer->WriteBatchSpaced(14, def_levels.data(), rep_levels.data(), |
| valid_bits->data(), 0, values_data); |
| writer->Close(); |
| } |
| |
| void GenerateLevels(int min_repeat_factor, int max_repeat_factor, int max_level, |
| std::vector<int16_t>& input_levels) { |
| // for each repetition count up to max_repeat_factor |
| for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) { |
| // repeat count increases by a factor of 2 for every iteration |
| int repeat_count = (1 << repeat); |
| // generate levels for repetition count up to the maximum level |
| int16_t value = 0; |
| int bwidth = 0; |
| while (value <= max_level) { |
| for (int i = 0; i < repeat_count; i++) { |
| input_levels.push_back(value); |
| } |
| value = static_cast<int16_t>((2 << bwidth) - 1); |
| bwidth++; |
| } |
| } |
| } |
| |
| void EncodeLevels(Encoding::type encoding, int16_t max_level, int num_levels, |
| const int16_t* input_levels, std::vector<uint8_t>& bytes) { |
| LevelEncoder encoder; |
| int levels_count = 0; |
| bytes.resize(2 * num_levels); |
| ASSERT_EQ(2 * num_levels, static_cast<int>(bytes.size())); |
| // encode levels |
| if (encoding == Encoding::RLE) { |
| // leave space to write the rle length value |
| encoder.Init(encoding, max_level, num_levels, bytes.data() + sizeof(int32_t), |
| static_cast<int>(bytes.size())); |
| |
| levels_count = encoder.Encode(num_levels, input_levels); |
| (reinterpret_cast<int32_t*>(bytes.data()))[0] = encoder.len(); |
| } else { |
| encoder.Init(encoding, max_level, num_levels, bytes.data(), |
| static_cast<int>(bytes.size())); |
| levels_count = encoder.Encode(num_levels, input_levels); |
| } |
| ASSERT_EQ(num_levels, levels_count); |
| } |
| |
| void VerifyDecodingLevels(Encoding::type encoding, int16_t max_level, |
| const std::vector<int16_t>& input_levels, |
| std::vector<uint8_t>& bytes) { |
| LevelDecoder decoder; |
| int levels_count = 0; |
| std::vector<int16_t> output_levels; |
| int num_levels = static_cast<int>(input_levels.size()); |
| |
| output_levels.resize(num_levels); |
| ASSERT_EQ(num_levels, static_cast<int>(output_levels.size())); |
| |
| // Decode levels and test with multiple decode calls |
| decoder.SetData(encoding, max_level, num_levels, bytes.data(), |
| static_cast<int32_t>(bytes.size())); |
| int decode_count = 4; |
| int num_inner_levels = num_levels / decode_count; |
| // Try multiple decoding on a single SetData call |
| for (int ct = 0; ct < decode_count; ct++) { |
| int offset = ct * num_inner_levels; |
| levels_count = decoder.Decode(num_inner_levels, output_levels.data()); |
| ASSERT_EQ(num_inner_levels, levels_count); |
| for (int i = 0; i < num_inner_levels; i++) { |
| EXPECT_EQ(input_levels[i + offset], output_levels[i]); |
| } |
| } |
| // check the remaining levels |
| int num_levels_completed = decode_count * (num_levels / decode_count); |
| int num_remaining_levels = num_levels - num_levels_completed; |
| if (num_remaining_levels > 0) { |
| levels_count = decoder.Decode(num_remaining_levels, output_levels.data()); |
| ASSERT_EQ(num_remaining_levels, levels_count); |
| for (int i = 0; i < num_remaining_levels; i++) { |
| EXPECT_EQ(input_levels[i + num_levels_completed], output_levels[i]); |
| } |
| } |
| // Test zero Decode values |
| ASSERT_EQ(0, decoder.Decode(1, output_levels.data())); |
| } |
| |
| void VerifyDecodingMultipleSetData(Encoding::type encoding, int16_t max_level, |
| const std::vector<int16_t>& input_levels, |
| std::vector<std::vector<uint8_t>>& bytes) { |
| LevelDecoder decoder; |
| int levels_count = 0; |
| std::vector<int16_t> output_levels; |
| |
| // Decode levels and test with multiple SetData calls |
| int setdata_count = static_cast<int>(bytes.size()); |
| int num_levels = static_cast<int>(input_levels.size()) / setdata_count; |
| output_levels.resize(num_levels); |
| // Try multiple SetData |
| for (int ct = 0; ct < setdata_count; ct++) { |
| int offset = ct * num_levels; |
| ASSERT_EQ(num_levels, static_cast<int>(output_levels.size())); |
| decoder.SetData(encoding, max_level, num_levels, bytes[ct].data(), |
| static_cast<int32_t>(bytes[ct].size())); |
| levels_count = decoder.Decode(num_levels, output_levels.data()); |
| ASSERT_EQ(num_levels, levels_count); |
| for (int i = 0; i < num_levels; i++) { |
| EXPECT_EQ(input_levels[i + offset], output_levels[i]); |
| } |
| } |
| } |
| |
| // Test levels with maximum bit-width from 1 to 8 |
| // increase the repetition count for each iteration by a factor of 2 |
| TEST(TestLevels, TestLevelsDecodeMultipleBitWidth) { |
| int min_repeat_factor = 0; |
| int max_repeat_factor = 7; // 128 |
| int max_bit_width = 8; |
| std::vector<int16_t> input_levels; |
| std::vector<uint8_t> bytes; |
| Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED}; |
| |
| // for each encoding |
| for (int encode = 0; encode < 2; encode++) { |
| Encoding::type encoding = encodings[encode]; |
| // BIT_PACKED requires a sequence of at least 8 |
| if (encoding == Encoding::BIT_PACKED) min_repeat_factor = 3; |
| // for each maximum bit-width |
| for (int bit_width = 1; bit_width <= max_bit_width; bit_width++) { |
| // find the maximum level for the current bit_width |
| int16_t max_level = static_cast<int16_t>((1 << bit_width) - 1); |
| // Generate levels |
| GenerateLevels(min_repeat_factor, max_repeat_factor, max_level, input_levels); |
| ASSERT_NO_FATAL_FAILURE(EncodeLevels(encoding, max_level, |
| static_cast<int>(input_levels.size()), |
| input_levels.data(), bytes)); |
| ASSERT_NO_FATAL_FAILURE( |
| VerifyDecodingLevels(encoding, max_level, input_levels, bytes)); |
| input_levels.clear(); |
| } |
| } |
| } |
| |
| // Test multiple decoder SetData calls |
| TEST(TestLevels, TestLevelsDecodeMultipleSetData) { |
| int min_repeat_factor = 3; |
| int max_repeat_factor = 7; // 128 |
| int bit_width = 8; |
| int16_t max_level = static_cast<int16_t>((1 << bit_width) - 1); |
| std::vector<int16_t> input_levels; |
| std::vector<std::vector<uint8_t>> bytes; |
| Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED}; |
| GenerateLevels(min_repeat_factor, max_repeat_factor, max_level, input_levels); |
| int num_levels = static_cast<int>(input_levels.size()); |
| int setdata_factor = 8; |
| int split_level_size = num_levels / setdata_factor; |
| bytes.resize(setdata_factor); |
| |
| // for each encoding |
| for (int encode = 0; encode < 2; encode++) { |
| Encoding::type encoding = encodings[encode]; |
| for (int rf = 0; rf < setdata_factor; rf++) { |
| int offset = rf * split_level_size; |
| ASSERT_NO_FATAL_FAILURE(EncodeLevels( |
| encoding, max_level, split_level_size, |
| reinterpret_cast<int16_t*>(input_levels.data()) + offset, bytes[rf])); |
| } |
| ASSERT_NO_FATAL_FAILURE( |
| VerifyDecodingMultipleSetData(encoding, max_level, input_levels, bytes)); |
| } |
| } |
| |
| TEST(TestLevelEncoder, MinimumBufferSize) { |
| // PARQUET-676, PARQUET-698 |
| const int kNumToEncode = 1024; |
| |
| std::vector<int16_t> levels; |
| for (int i = 0; i < kNumToEncode; ++i) { |
| if (i % 9 == 0) { |
| levels.push_back(0); |
| } else { |
| levels.push_back(1); |
| } |
| } |
| |
| std::vector<uint8_t> output( |
| LevelEncoder::MaxBufferSize(Encoding::RLE, 1, kNumToEncode)); |
| |
| LevelEncoder encoder; |
| encoder.Init(Encoding::RLE, 1, kNumToEncode, output.data(), |
| static_cast<int>(output.size())); |
| int encode_count = encoder.Encode(kNumToEncode, levels.data()); |
| |
| ASSERT_EQ(kNumToEncode, encode_count); |
| } |
| |
| TEST(TestLevelEncoder, MinimumBufferSize2) { |
| // PARQUET-708 |
| // Test the worst case for bit_width=2 consisting of |
| // LiteralRun(size=8) |
| // RepeatedRun(size=8) |
| // LiteralRun(size=8) |
| // ... |
| const int kNumToEncode = 1024; |
| |
| std::vector<int16_t> levels; |
| for (int i = 0; i < kNumToEncode; ++i) { |
| // This forces a literal run of 00000001 |
| // followed by eight 1s |
| if ((i % 16) < 7) { |
| levels.push_back(0); |
| } else { |
| levels.push_back(1); |
| } |
| } |
| |
| for (int16_t bit_width = 1; bit_width <= 8; bit_width++) { |
| std::vector<uint8_t> output( |
| LevelEncoder::MaxBufferSize(Encoding::RLE, bit_width, kNumToEncode)); |
| |
| LevelEncoder encoder; |
| encoder.Init(Encoding::RLE, bit_width, kNumToEncode, output.data(), |
| static_cast<int>(output.size())); |
| int encode_count = encoder.Encode(kNumToEncode, levels.data()); |
| |
| ASSERT_EQ(kNumToEncode, encode_count); |
| } |
| } |
| |
| TEST(TestColumnWriter, WriteDataPageV2Header) { |
| auto sink = CreateOutputStream(); |
| auto schema = std::static_pointer_cast<GroupNode>( |
| GroupNode::Make("schema", Repetition::REQUIRED, |
| { |
| schema::Int32("required", Repetition::REQUIRED), |
| schema::Int32("optional", Repetition::OPTIONAL), |
| schema::Int32("repeated", Repetition::REPEATED), |
| })); |
| auto properties = WriterProperties::Builder() |
| .disable_dictionary() |
| ->data_page_version(ParquetDataPageVersion::V2) |
| ->build(); |
| auto file_writer = ParquetFileWriter::Open(sink, schema, properties); |
| auto rg_writer = file_writer->AppendRowGroup(); |
| |
| constexpr int32_t num_rows = 100; |
| |
| auto required_writer = static_cast<parquet::Int32Writer*>(rg_writer->NextColumn()); |
| for (int32_t i = 0; i < num_rows; i++) { |
| required_writer->WriteBatch(1, nullptr, nullptr, &i); |
| } |
| |
| // Write a null value at every other row. |
| auto optional_writer = static_cast<parquet::Int32Writer*>(rg_writer->NextColumn()); |
| for (int32_t i = 0; i < num_rows; i++) { |
| int16_t definition_level = i % 2 == 0 ? 1 : 0; |
| optional_writer->WriteBatch(1, &definition_level, nullptr, &i); |
| } |
| |
| // Each row has repeated twice. |
| auto repeated_writer = static_cast<parquet::Int32Writer*>(rg_writer->NextColumn()); |
| for (int i = 0; i < 2 * num_rows; i++) { |
| int32_t value = i * 1000; |
| int16_t definition_level = 1; |
| int16_t repetition_level = i % 2 == 0 ? 1 : 0; |
| repeated_writer->WriteBatch(1, &definition_level, &repetition_level, &value); |
| } |
| |
| ASSERT_NO_THROW(file_writer->Close()); |
| ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); |
| auto file_reader = ParquetFileReader::Open( |
| std::make_shared<::arrow::io::BufferReader>(buffer), default_reader_properties()); |
| auto metadata = file_reader->metadata(); |
| ASSERT_EQ(1, metadata->num_row_groups()); |
| auto row_group_reader = file_reader->RowGroup(0); |
| |
| // Verify required column. |
| { |
| auto page_reader = row_group_reader->GetColumnPageReader(0); |
| auto page = page_reader->NextPage(); |
| ASSERT_NE(page, nullptr); |
| auto data_page = std::static_pointer_cast<DataPageV2>(page); |
| EXPECT_EQ(num_rows, data_page->num_rows()); |
| EXPECT_EQ(num_rows, data_page->num_values()); |
| EXPECT_EQ(0, data_page->num_nulls()); |
| EXPECT_EQ(page_reader->NextPage(), nullptr); |
| } |
| |
| // Verify optional column. |
| { |
| auto page_reader = row_group_reader->GetColumnPageReader(1); |
| auto page = page_reader->NextPage(); |
| ASSERT_NE(page, nullptr); |
| auto data_page = std::static_pointer_cast<DataPageV2>(page); |
| EXPECT_EQ(num_rows, data_page->num_rows()); |
| EXPECT_EQ(num_rows, data_page->num_values()); |
| EXPECT_EQ(num_rows / 2, data_page->num_nulls()); |
| EXPECT_EQ(page_reader->NextPage(), nullptr); |
| } |
| |
| // Verify repeated column. |
| { |
| auto page_reader = row_group_reader->GetColumnPageReader(2); |
| auto page = page_reader->NextPage(); |
| ASSERT_NE(page, nullptr); |
| auto data_page = std::static_pointer_cast<DataPageV2>(page); |
| EXPECT_EQ(num_rows, data_page->num_rows()); |
| EXPECT_EQ(num_rows * 2, data_page->num_values()); |
| EXPECT_EQ(0, data_page->num_nulls()); |
| EXPECT_EQ(page_reader->NextPage(), nullptr); |
| } |
| } |
| |
| // The test below checks that data page v2 changes on record boundaries for |
| // all repetition types (i.e. required, optional, and repeated) |
| TEST(TestColumnWriter, WriteDataPagesChangeOnRecordBoundaries) { |
| auto sink = CreateOutputStream(); |
| auto schema = std::static_pointer_cast<GroupNode>( |
| GroupNode::Make("schema", Repetition::REQUIRED, |
| {schema::Int32("required", Repetition::REQUIRED), |
| schema::Int32("optional", Repetition::OPTIONAL), |
| schema::Int32("repeated", Repetition::REPEATED)})); |
| // Write at most 11 levels per batch. |
| constexpr int64_t batch_size = 11; |
| auto properties = WriterProperties::Builder() |
| .disable_dictionary() |
| ->data_page_version(ParquetDataPageVersion::V2) |
| ->write_batch_size(batch_size) |
| ->data_pagesize(1) /* every page size check creates a new page */ |
| ->build(); |
| auto file_writer = ParquetFileWriter::Open(sink, schema, properties); |
| auto rg_writer = file_writer->AppendRowGroup(); |
| |
| constexpr int32_t num_levels = 100; |
| const std::vector<int32_t> values(num_levels, 1024); |
| std::array<int16_t, num_levels> def_levels; |
| std::array<int16_t, num_levels> rep_levels; |
| for (int32_t i = 0; i < num_levels; i++) { |
| def_levels[i] = i % 2 == 0 ? 1 : 0; |
| rep_levels[i] = i % 2 == 0 ? 0 : 1; |
| } |
| |
| auto required_writer = static_cast<parquet::Int32Writer*>(rg_writer->NextColumn()); |
| required_writer->WriteBatch(num_levels, nullptr, nullptr, values.data()); |
| |
| // Write a null value at every other row. |
| auto optional_writer = static_cast<parquet::Int32Writer*>(rg_writer->NextColumn()); |
| optional_writer->WriteBatch(num_levels, def_levels.data(), nullptr, values.data()); |
| |
| // Each row has repeated twice. |
| auto repeated_writer = static_cast<parquet::Int32Writer*>(rg_writer->NextColumn()); |
| repeated_writer->WriteBatch(num_levels, def_levels.data(), rep_levels.data(), |
| values.data()); |
| repeated_writer->WriteBatch(num_levels, def_levels.data(), rep_levels.data(), |
| values.data()); |
| |
| ASSERT_NO_THROW(file_writer->Close()); |
| ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); |
| auto file_reader = ParquetFileReader::Open( |
| std::make_shared<::arrow::io::BufferReader>(buffer), default_reader_properties()); |
| auto metadata = file_reader->metadata(); |
| ASSERT_EQ(1, metadata->num_row_groups()); |
| auto row_group_reader = file_reader->RowGroup(0); |
| |
| // Check if pages are changed on record boundaries. |
| constexpr int num_columns = 3; |
| const std::array<int64_t, num_columns> expected_num_pages = {10, 10, 19}; |
| for (int i = 0; i < num_columns; ++i) { |
| auto page_reader = row_group_reader->GetColumnPageReader(i); |
| int64_t num_rows = 0; |
| int64_t num_pages = 0; |
| std::shared_ptr<Page> page; |
| while ((page = page_reader->NextPage()) != nullptr) { |
| auto data_page = std::static_pointer_cast<DataPageV2>(page); |
| if (i < 2) { |
| EXPECT_EQ(data_page->num_values(), data_page->num_rows()); |
| } else { |
| // Make sure repeated column has 2 values per row and not span multiple pages. |
| EXPECT_EQ(data_page->num_values(), 2 * data_page->num_rows()); |
| } |
| num_rows += data_page->num_rows(); |
| num_pages++; |
| } |
| EXPECT_EQ(num_levels, num_rows); |
| EXPECT_EQ(expected_num_pages[i], num_pages); |
| } |
| } |
| |
| // The test below checks that data page v2 changes on record boundaries for |
| // repeated columns with small batches. |
| TEST(TestColumnWriter, WriteDataPagesChangeOnRecordBoundariesWithSmallBatches) { |
| auto sink = CreateOutputStream(); |
| auto schema = std::static_pointer_cast<GroupNode>( |
| GroupNode::Make("schema", Repetition::REQUIRED, |
| {schema::Int32("tiny_repeat", Repetition::REPEATED), |
| schema::Int32("small_repeat", Repetition::REPEATED), |
| schema::Int32("medium_repeat", Repetition::REPEATED), |
| schema::Int32("large_repeat", Repetition::REPEATED)})); |
| |
| // The batch_size is large enough so each WriteBatch call checks page size at most once. |
| constexpr int64_t batch_size = std::numeric_limits<int64_t>::max(); |
| auto properties = WriterProperties::Builder() |
| .disable_dictionary() |
| ->data_page_version(ParquetDataPageVersion::V2) |
| ->write_batch_size(batch_size) |
| ->data_pagesize(1) /* every page size check creates a new page */ |
| ->build(); |
| auto file_writer = ParquetFileWriter::Open(sink, schema, properties); |
| auto rg_writer = file_writer->AppendRowGroup(); |
| |
| constexpr int32_t num_cols = 4; |
| constexpr int64_t num_rows = 400; |
| constexpr int64_t num_levels = 100; |
| constexpr std::array<int64_t, num_cols> num_levels_per_row_by_col = {1, 50, 99, 150}; |
| |
| // All values are not null and fixed to 1024 for simplicity. |
| const std::vector<int32_t> values(num_levels, 1024); |
| const std::vector<int16_t> def_levels(num_levels, 1); |
| std::vector<int16_t> rep_levels(num_levels, 0); |
| |
| for (int32_t i = 0; i < num_cols; ++i) { |
| auto writer = static_cast<parquet::Int32Writer*>(rg_writer->NextColumn()); |
| const auto num_levels_per_row = num_levels_per_row_by_col[i]; |
| int64_t num_rows_written = 0; |
| int64_t num_levels_written_curr_row = 0; |
| while (num_rows_written < num_rows) { |
| int32_t num_levels_to_write = 0; |
| while (num_levels_to_write < num_levels) { |
| if (num_levels_written_curr_row == 0) { |
| // A new record. |
| rep_levels[num_levels_to_write++] = 0; |
| } else { |
| rep_levels[num_levels_to_write++] = 1; |
| } |
| |
| if (++num_levels_written_curr_row == num_levels_per_row) { |
| // Current row has enough levels. |
| num_levels_written_curr_row = 0; |
| if (++num_rows_written == num_rows) { |
| // Enough rows have been written. |
| break; |
| } |
| } |
| } |
| |
| writer->WriteBatch(num_levels_to_write, def_levels.data(), rep_levels.data(), |
| values.data()); |
| } |
| } |
| |
| ASSERT_NO_THROW(file_writer->Close()); |
| ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); |
| auto file_reader = ParquetFileReader::Open( |
| std::make_shared<::arrow::io::BufferReader>(buffer), default_reader_properties()); |
| auto metadata = file_reader->metadata(); |
| ASSERT_EQ(1, metadata->num_row_groups()); |
| auto row_group_reader = file_reader->RowGroup(0); |
| |
| // Check if pages are changed on record boundaries. |
| const std::array<int64_t, num_cols> expect_num_pages_by_col = {5, 201, 397, 400}; |
| const std::array<int64_t, num_cols> expect_num_rows_1st_page_by_col = {99, 1, 1, 1}; |
| const std::array<int64_t, num_cols> expect_num_vals_1st_page_by_col = {99, 50, 99, 150}; |
| for (int32_t i = 0; i < num_cols; ++i) { |
| auto page_reader = row_group_reader->GetColumnPageReader(i); |
| int64_t num_rows_read = 0; |
| int64_t num_pages_read = 0; |
| int64_t num_values_read = 0; |
| std::shared_ptr<Page> page; |
| while ((page = page_reader->NextPage()) != nullptr) { |
| auto data_page = std::static_pointer_cast<DataPageV2>(page); |
| num_values_read += data_page->num_values(); |
| num_rows_read += data_page->num_rows(); |
| if (num_pages_read++ == 0) { |
| EXPECT_EQ(expect_num_rows_1st_page_by_col[i], data_page->num_rows()); |
| EXPECT_EQ(expect_num_vals_1st_page_by_col[i], data_page->num_values()); |
| } |
| } |
| EXPECT_EQ(num_rows, num_rows_read); |
| EXPECT_EQ(expect_num_pages_by_col[i], num_pages_read); |
| EXPECT_EQ(num_levels_per_row_by_col[i] * num_rows, num_values_read); |
| } |
| } |
| |
| class ColumnWriterTestSizeEstimated : public ::testing::Test { |
| public: |
| void SetUp() { |
| sink_ = CreateOutputStream(); |
| node_ = std::static_pointer_cast<GroupNode>( |
| GroupNode::Make("schema", Repetition::REQUIRED, |
| { |
| schema::Int32("required", Repetition::REQUIRED), |
| })); |
| std::vector<schema::NodePtr> fields; |
| schema_descriptor_ = std::make_unique<SchemaDescriptor>(); |
| schema_descriptor_->Init(node_); |
| } |
| |
| std::shared_ptr<parquet::Int32Writer> BuildWriter(Compression::type compression, |
| bool buffered, |
| bool enable_dictionary = false) { |
| auto builder = WriterProperties::Builder(); |
| builder.disable_dictionary() |
| ->compression(compression) |
| ->data_pagesize(100 * sizeof(int)); |
| if (enable_dictionary) { |
| builder.enable_dictionary(); |
| } else { |
| builder.disable_dictionary(); |
| } |
| writer_properties_ = builder.build(); |
| metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_, |
| schema_descriptor_->Column(0)); |
| |
| std::unique_ptr<PageWriter> pager = PageWriter::Open( |
| sink_, compression, metadata_.get(), |
| /* row_group_ordinal */ -1, /* column_chunk_ordinal*/ -1, |
| ::arrow::default_memory_pool(), /* buffered_row_group */ buffered, |
| /* header_encryptor */ NULLPTR, /* data_encryptor */ NULLPTR, |
| /* enable_checksum */ false); |
| return std::static_pointer_cast<parquet::Int32Writer>( |
| ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get())); |
| } |
| |
| std::shared_ptr<::arrow::io::BufferOutputStream> sink_; |
| std::shared_ptr<GroupNode> node_; |
| std::unique_ptr<SchemaDescriptor> schema_descriptor_; |
| |
| std::shared_ptr<WriterProperties> writer_properties_; |
| std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_; |
| }; |
| |
| TEST_F(ColumnWriterTestSizeEstimated, NonBuffered) { |
| auto required_writer = |
| this->BuildWriter(Compression::UNCOMPRESSED, /* buffered*/ false); |
| // Write half page, page will not be flushed after loop |
| for (int32_t i = 0; i < 50; i++) { |
| required_writer->WriteBatch(1, nullptr, nullptr, &i); |
| } |
| // Page not flushed, check size |
| EXPECT_EQ(0, required_writer->total_bytes_written()); |
| EXPECT_EQ(0, required_writer->total_compressed_bytes()); // unbuffered |
| EXPECT_EQ(0, required_writer->total_compressed_bytes_written()); |
| // Write half page, page be flushed after loop |
| for (int32_t i = 0; i < 50; i++) { |
| required_writer->WriteBatch(1, nullptr, nullptr, &i); |
| } |
| // Page flushed, check size |
| EXPECT_LT(400, required_writer->total_bytes_written()); |
| EXPECT_EQ(0, required_writer->total_compressed_bytes()); |
| EXPECT_LT(400, required_writer->total_compressed_bytes_written()); |
| |
| // Test after closed |
| int64_t written_size = required_writer->Close(); |
| EXPECT_EQ(0, required_writer->total_compressed_bytes()); |
| EXPECT_EQ(written_size, required_writer->total_bytes_written()); |
| // uncompressed writer should be equal |
| EXPECT_EQ(written_size, required_writer->total_compressed_bytes_written()); |
| } |
| |
| TEST_F(ColumnWriterTestSizeEstimated, Buffered) { |
| auto required_writer = this->BuildWriter(Compression::UNCOMPRESSED, /* buffered*/ true); |
| // Write half page, page will not be flushed after loop |
| for (int32_t i = 0; i < 50; i++) { |
| required_writer->WriteBatch(1, nullptr, nullptr, &i); |
| } |
| // Page not flushed, check size |
| EXPECT_EQ(0, required_writer->total_bytes_written()); |
| EXPECT_EQ(0, required_writer->total_compressed_bytes()); // buffered |
| EXPECT_EQ(0, required_writer->total_compressed_bytes_written()); |
| // Write half page, page be flushed after loop |
| for (int32_t i = 0; i < 50; i++) { |
| required_writer->WriteBatch(1, nullptr, nullptr, &i); |
| } |
| // Page flushed, check size |
| EXPECT_LT(400, required_writer->total_bytes_written()); |
| EXPECT_EQ(0, required_writer->total_compressed_bytes()); |
| EXPECT_LT(400, required_writer->total_compressed_bytes_written()); |
| |
| // Test after closed |
| int64_t written_size = required_writer->Close(); |
| EXPECT_EQ(0, required_writer->total_compressed_bytes()); |
| EXPECT_EQ(written_size, required_writer->total_bytes_written()); |
| // uncompressed writer should be equal |
| EXPECT_EQ(written_size, required_writer->total_compressed_bytes_written()); |
| } |
| |
| TEST_F(ColumnWriterTestSizeEstimated, NonBufferedDictionary) { |
| auto required_writer = |
| this->BuildWriter(Compression::UNCOMPRESSED, /* buffered*/ false, true); |
| // for dict, keep all values equal |
| int32_t dict_value = 1; |
| for (int32_t i = 0; i < 50; i++) { |
| required_writer->WriteBatch(1, nullptr, nullptr, &dict_value); |
| } |
| // Page not flushed, check size |
| EXPECT_EQ(0, required_writer->total_bytes_written()); |
| EXPECT_EQ(0, required_writer->total_compressed_bytes()); |
| EXPECT_EQ(0, required_writer->total_compressed_bytes_written()); |
| // write a huge batch to trigger page flush |
| for (int32_t i = 0; i < 50000; i++) { |
| required_writer->WriteBatch(1, nullptr, nullptr, &dict_value); |
| } |
| // Page flushed, check size |
| EXPECT_EQ(0, required_writer->total_bytes_written()); |
| EXPECT_LT(400, required_writer->total_compressed_bytes()); |
| EXPECT_EQ(0, required_writer->total_compressed_bytes_written()); |
| |
| required_writer->Close(); |
| |
| // Test after closed |
| int64_t written_size = required_writer->Close(); |
| EXPECT_EQ(0, required_writer->total_compressed_bytes()); |
| EXPECT_EQ(written_size, required_writer->total_bytes_written()); |
| // uncompressed writer should be equal |
| EXPECT_EQ(written_size, required_writer->total_compressed_bytes_written()); |
| } |
| |
| TEST_F(ColumnWriterTestSizeEstimated, BufferedCompression) { |
| #ifndef ARROW_WITH_SNAPPY |
| GTEST_SKIP() << "Test requires snappy compression"; |
| #endif |
| auto required_writer = this->BuildWriter(Compression::SNAPPY, true); |
| |
| // Write half page |
| for (int32_t i = 0; i < 50; i++) { |
| required_writer->WriteBatch(1, nullptr, nullptr, &i); |
| } |
| // Page not flushed, check size |
| EXPECT_EQ(0, required_writer->total_bytes_written()); |
| EXPECT_EQ(0, required_writer->total_compressed_bytes()); // buffered |
| EXPECT_EQ(0, required_writer->total_compressed_bytes_written()); |
| for (int32_t i = 0; i < 50; i++) { |
| required_writer->WriteBatch(1, nullptr, nullptr, &i); |
| } |
| // Page flushed, check size |
| EXPECT_LT(400, required_writer->total_bytes_written()); |
| EXPECT_EQ(0, required_writer->total_compressed_bytes()); |
| EXPECT_LT(required_writer->total_compressed_bytes_written(), |
| required_writer->total_bytes_written()); |
| |
| // Test after closed |
| int64_t written_size = required_writer->Close(); |
| EXPECT_EQ(0, required_writer->total_compressed_bytes()); |
| EXPECT_EQ(written_size, required_writer->total_bytes_written()); |
| EXPECT_GT(written_size, required_writer->total_compressed_bytes_written()); |
| } |
| |
| TEST(TestColumnWriter, WriteDataPageV2HeaderNullCount) { |
| auto sink = CreateOutputStream(); |
| auto list_type = GroupNode::Make("list", Repetition::REPEATED, |
| {schema::Int32("elem", Repetition::OPTIONAL)}); |
| auto schema = std::static_pointer_cast<GroupNode>(GroupNode::Make( |
| "schema", Repetition::REQUIRED, |
| { |
| schema::Int32("non_null", Repetition::OPTIONAL), |
| schema::Int32("half_null", Repetition::OPTIONAL), |
| schema::Int32("all_null", Repetition::OPTIONAL), |
| GroupNode::Make("half_null_list", Repetition::OPTIONAL, {list_type}), |
| GroupNode::Make("half_empty_list", Repetition::OPTIONAL, {list_type}), |
| GroupNode::Make("half_list_of_null", Repetition::OPTIONAL, {list_type}), |
| GroupNode::Make("all_single_list", Repetition::OPTIONAL, {list_type}), |
| })); |
| auto properties = WriterProperties::Builder() |
| /* Use V2 data page to read null_count from header */ |
| .data_page_version(ParquetDataPageVersion::V2) |
| /* Disable stats to test null_count is properly set */ |
| ->disable_statistics() |
| ->disable_dictionary() |
| ->build(); |
| auto file_writer = ParquetFileWriter::Open(sink, schema, properties); |
| auto rg_writer = file_writer->AppendRowGroup(); |
| |
| constexpr int32_t num_rows = 10; |
| constexpr int32_t num_cols = 7; |
| const std::vector<std::vector<int16_t>> def_levels_by_col = { |
| {1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, {1, 0, 1, 0, 1, 0, 1, 0, 1, 0}, |
| {0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, {0, 3, 0, 3, 0, 3, 0, 3, 0, 3}, |
| {1, 3, 1, 3, 1, 3, 1, 3, 1, 3}, {2, 3, 2, 3, 2, 3, 2, 3, 2, 3}, |
| {3, 3, 3, 3, 3, 3, 3, 3, 3, 3}, |
| }; |
| const std::vector<int16_t> ref_levels(num_rows, 0); |
| const std::vector<int32_t> values(num_rows, 123); |
| const std::vector<int64_t> expect_null_count_by_col = {0, 5, 10, 5, 5, 5, 0}; |
| |
| for (int32_t i = 0; i < num_cols; ++i) { |
| auto writer = static_cast<parquet::Int32Writer*>(rg_writer->NextColumn()); |
| writer->WriteBatch(num_rows, def_levels_by_col[i].data(), |
| i >= 3 ? ref_levels.data() : nullptr, values.data()); |
| } |
| |
| ASSERT_NO_THROW(file_writer->Close()); |
| ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); |
| auto file_reader = ParquetFileReader::Open( |
| std::make_shared<::arrow::io::BufferReader>(buffer), default_reader_properties()); |
| auto metadata = file_reader->metadata(); |
| ASSERT_EQ(1, metadata->num_row_groups()); |
| auto row_group_reader = file_reader->RowGroup(0); |
| |
| std::shared_ptr<Page> page; |
| for (int32_t i = 0; i < num_cols; ++i) { |
| auto page_reader = row_group_reader->GetColumnPageReader(i); |
| int64_t num_nulls_read = 0; |
| int64_t num_rows_read = 0; |
| int64_t num_values_read = 0; |
| while ((page = page_reader->NextPage()) != nullptr) { |
| auto data_page = std::static_pointer_cast<DataPageV2>(page); |
| num_nulls_read += data_page->num_nulls(); |
| num_rows_read += data_page->num_rows(); |
| num_values_read += data_page->num_values(); |
| } |
| EXPECT_EQ(expect_null_count_by_col[i], num_nulls_read); |
| EXPECT_EQ(num_rows, num_rows_read); |
| EXPECT_EQ(num_rows, num_values_read); |
| } |
| } |
| |
| using TestInt32Writer = TestPrimitiveWriter<Int32Type>; |
| |
| TEST_F(TestInt32Writer, NoWriteKeyValueMetadata) { |
| auto writer = this->BuildWriter(); |
| writer->Close(); |
| auto key_value_metadata = metadata_key_value_metadata(); |
| ASSERT_THAT(key_value_metadata, IsNull()); |
| } |
| |
| TEST_F(TestInt32Writer, WriteKeyValueMetadata) { |
| auto writer = this->BuildWriter(); |
| writer->AddKeyValueMetadata( |
| KeyValueMetadata::Make({"hello", "bye"}, {"world", "earth"})); |
| // overwrite the previous value |
| writer->AddKeyValueMetadata(KeyValueMetadata::Make({"bye"}, {"moon"})); |
| writer->Close(); |
| auto key_value_metadata = metadata_key_value_metadata(); |
| ASSERT_THAT(key_value_metadata, NotNull()); |
| ASSERT_EQ(2, key_value_metadata->size()); |
| ASSERT_OK_AND_ASSIGN(auto value, key_value_metadata->Get("hello")); |
| ASSERT_EQ("world", value); |
| ASSERT_OK_AND_ASSIGN(value, key_value_metadata->Get("bye")); |
| ASSERT_EQ("moon", value); |
| } |
| |
| TEST_F(TestInt32Writer, ResetKeyValueMetadata) { |
| auto writer = this->BuildWriter(); |
| writer->AddKeyValueMetadata(KeyValueMetadata::Make({"hello"}, {"world"})); |
| writer->ResetKeyValueMetadata(); |
| writer->Close(); |
| auto key_value_metadata = metadata_key_value_metadata(); |
| ASSERT_THAT(key_value_metadata, IsNull()); |
| } |
| |
| TEST_F(TestInt32Writer, WriteKeyValueMetadataEndToEnd) { |
| auto sink = CreateOutputStream(); |
| { |
| auto file_writer = ParquetFileWriter::Open( |
| sink, std::dynamic_pointer_cast<schema::GroupNode>(schema_.schema_root())); |
| auto rg_writer = file_writer->AppendRowGroup(); |
| auto col_writer = rg_writer->NextColumn(); |
| col_writer->AddKeyValueMetadata(KeyValueMetadata::Make({"foo"}, {"bar"})); |
| file_writer->Close(); |
| } |
| ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); |
| auto file_reader = |
| ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer)); |
| auto key_value_metadata = |
| file_reader->metadata()->RowGroup(0)->ColumnChunk(0)->key_value_metadata(); |
| ASSERT_THAT(key_value_metadata, NotNull()); |
| ASSERT_EQ(1U, key_value_metadata->size()); |
| ASSERT_OK_AND_ASSIGN(auto value, key_value_metadata->Get("foo")); |
| ASSERT_EQ("bar", value); |
| } |
| |
| TEST_F(TestValuesWriterInt32Type, AllNullsCompressionInPageV2) { |
| // GH-31992: In DataPageV2, the levels and data will not be compressed together, |
| // so, when all values are null, the compressed values should be empty. And |
| // we should handle this case correctly. |
| std::vector<Compression::type> compressions = {Compression::SNAPPY, Compression::GZIP, |
| Compression::ZSTD, Compression::BROTLI, |
| Compression::LZ4}; |
| for (auto compression : compressions) { |
| if (!Codec::IsAvailable(compression)) { |
| continue; |
| } |
| ARROW_SCOPED_TRACE("compression = ", Codec::GetCodecAsString(compression)); |
| // Optional and non-repeated, with definition levels |
| // but no repetition levels |
| this->SetUpSchema(Repetition::OPTIONAL); |
| this->GenerateData(SMALL_SIZE); |
| std::fill(this->def_levels_.begin(), this->def_levels_.end(), 0); |
| ColumnProperties column_properties; |
| column_properties.set_compression(compression); |
| |
| auto writer = |
| this->BuildWriter(SMALL_SIZE, column_properties, ParquetVersion::PARQUET_2_LATEST, |
| ParquetDataPageVersion::V2); |
| writer->WriteBatch(this->values_.size(), this->def_levels_.data(), nullptr, |
| this->values_ptr_); |
| writer->Close(); |
| |
| ASSERT_EQ(100, this->metadata_num_values()); |
| this->ReadColumn(compression); |
| ASSERT_EQ(0, this->values_read_); |
| } |
| } |
| |
| #ifdef ARROW_WITH_ZSTD |
| TEST_F(TestValuesWriterInt32Type, AvoidCompressedInDataPageV2) { |
| Compression::type compression = Compression::ZSTD; |
| auto verify_only_one_uncompressed_page = [&](int total_num_values) { |
| ColumnProperties column_properties; |
| column_properties.set_compression(compression); |
| |
| auto writer = |
| this->BuildWriter(SMALL_SIZE, column_properties, ParquetVersion::PARQUET_2_LATEST, |
| ParquetDataPageVersion::V2); |
| writer->WriteBatch(static_cast<int64_t>(values_.size()), this->def_levels_.data(), |
| nullptr, this->values_ptr_); |
| writer->Close(); |
| ASSERT_OK_AND_ASSIGN(auto buffer, this->sink_->Finish()); |
| auto source = std::make_shared<::arrow::io::BufferReader>(buffer); |
| ReaderProperties readerProperties; |
| std::unique_ptr<PageReader> page_reader = PageReader::Open( |
| std::move(source), total_num_values, compression, readerProperties); |
| auto data_page = std::static_pointer_cast<DataPageV2>(page_reader->NextPage()); |
| ASSERT_TRUE(data_page != nullptr); |
| ASSERT_FALSE(data_page->is_compressed()); |
| ASSERT_TRUE(page_reader->NextPage() == nullptr); |
| }; |
| { |
| // zero-sized data buffer should be handled correctly. |
| this->SetUpSchema(Repetition::OPTIONAL); |
| this->GenerateData(SMALL_SIZE); |
| std::fill(this->def_levels_.begin(), this->def_levels_.end(), 0); |
| verify_only_one_uncompressed_page(SMALL_SIZE); |
| } |
| { |
| // When only compress little data, the compressed size would even be |
| // larger than the original size. In this case, the `is_compressed` flag |
| // should be set to false. |
| this->SetUpSchema(Repetition::OPTIONAL); |
| this->GenerateData(1); |
| std::fill(this->def_levels_.begin(), this->def_levels_.end(), 1); |
| values_[0] = 142857; |
| verify_only_one_uncompressed_page(/*total_num_values=*/1); |
| } |
| } |
| #endif |
| |
| // Test writing and reading geometry columns |
| class TestGeometryValuesWriter : public TestPrimitiveWriter<ByteArrayType> { |
| public: |
| void SetUpSchema(Repetition::type repetition, int num_columns) override { |
| std::vector<schema::NodePtr> fields; |
| |
| for (int i = 0; i < num_columns; ++i) { |
| std::string name = TestColumnName(i); |
| std::shared_ptr<const LogicalType> logical_type = |
| GeometryLogicalType::Make("srid:1234"); |
| fields.push_back(schema::PrimitiveNode::Make(name, repetition, logical_type, |
| ByteArrayType::type_num)); |
| } |
| node_ = schema::GroupNode::Make("schema", Repetition::REQUIRED, fields); |
| schema_.Init(node_); |
| } |
| |
| void GenerateData(int64_t num_values, uint32_t seed = 0) { |
| values_.resize(num_values); |
| |
| buffer_.resize(num_values * kWkbPointXYSize); |
| uint8_t* ptr = buffer_.data(); |
| for (int k = 0; k < num_values; k++) { |
| std::string item = test::MakeWKBPoint( |
| {static_cast<double>(k), static_cast<double>(k + 1)}, false, false); |
| std::memcpy(ptr, item.data(), item.size()); |
| values_[k].len = kWkbPointXYSize; |
| values_[k].ptr = ptr; |
| ptr += kWkbPointXYSize; |
| } |
| |
| values_ptr_ = values_.data(); |
| } |
| }; |
| |
| TEST_F(TestGeometryValuesWriter, TestWriteAndRead) { |
| this->SetUpSchema(Repetition::REQUIRED, 1); |
| this->GenerateData(SMALL_SIZE); |
| size_t num_values = this->values_.size(); |
| auto writer = this->BuildWriter(num_values, ColumnProperties()); |
| writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_.data()); |
| |
| writer->Close(); |
| this->ReadColumn(); |
| for (size_t i = 0; i < num_values; i++) { |
| const ByteArray& value = this->values_out_[i]; |
| auto xy = GetWKBPointCoordinateXY(value); |
| EXPECT_TRUE(xy.has_value()); |
| auto expected_x = static_cast<double>(i); |
| auto expected_y = static_cast<double>(i + 1); |
| EXPECT_EQ(*xy, std::make_pair(expected_x, expected_y)); |
| } |
| |
| // Statistics are unset because the sort order is unknown |
| ASSERT_FALSE(metadata_accessor()->is_stats_set()); |
| ASSERT_EQ(metadata_accessor()->statistics(), nullptr); |
| |
| ASSERT_TRUE(metadata_accessor()->is_geo_stats_set()); |
| std::shared_ptr<geospatial::GeoStatistics> geospatial_statistics = metadata_geo_stats(); |
| ASSERT_TRUE(geospatial_statistics != nullptr); |
| EXPECT_THAT(*geospatial_statistics->geometry_types(), ::testing::ElementsAre(1)); |
| EXPECT_DOUBLE_EQ(0, geospatial_statistics->lower_bound()[0]); |
| EXPECT_DOUBLE_EQ(1, geospatial_statistics->lower_bound()[1]); |
| EXPECT_DOUBLE_EQ(99, geospatial_statistics->upper_bound()[0]); |
| EXPECT_DOUBLE_EQ(100, geospatial_statistics->upper_bound()[1]); |
| EXPECT_THAT(geospatial_statistics->dimension_valid(), |
| ::testing::ElementsAre(true, true, false, false)); |
| } |
| |
| TEST_F(TestGeometryValuesWriter, TestWriteAndReadSpaced) { |
| this->SetUpSchema(Repetition::OPTIONAL, 1); |
| this->GenerateData(SMALL_SIZE); |
| size_t num_values = this->values_.size(); |
| |
| std::vector<int16_t> definition_levels(num_values, 1); |
| std::vector<int16_t> repetition_levels(num_values, 0); |
| std::vector<size_t> non_null_indices; |
| |
| // Replace some of the generated data with NULL |
| for (size_t i = 0; i < num_values; i++) { |
| if (i % 3 == 0) { |
| definition_levels[i] = 0; |
| } else { |
| non_null_indices.push_back(i); |
| } |
| } |
| |
| // Construct valid bits using definition levels |
| std::vector<uint8_t> valid_bytes(num_values); |
| std::transform(definition_levels.begin(), definition_levels.end(), valid_bytes.begin(), |
| [&](int64_t level) { return static_cast<uint8_t>(level); }); |
| std::shared_ptr<Buffer> valid_bits; |
| ASSERT_OK_AND_ASSIGN(valid_bits, ::arrow::internal::BytesToBits(valid_bytes)); |
| |
| auto writer = this->BuildWriter(num_values, ColumnProperties()); |
| writer->WriteBatchSpaced(this->values_.size(), definition_levels.data(), |
| repetition_levels.data(), valid_bits->data(), 0, |
| this->values_.data()); |
| |
| writer->Close(); |
| this->ReadColumn(); |
| size_t expected_values_read = non_null_indices.size(); |
| EXPECT_EQ(expected_values_read, values_read_); |
| for (int64_t i = 0; i < values_read_; i++) { |
| const ByteArray& value = this->values_out_[i]; |
| auto xy = GetWKBPointCoordinateXY(value); |
| EXPECT_TRUE(xy.has_value()); |
| auto expected_x = static_cast<double>(non_null_indices[i]); |
| auto expected_y = static_cast<double>(non_null_indices[i] + 1); |
| EXPECT_EQ(*xy, std::make_pair(expected_x, expected_y)); |
| } |
| |
| std::shared_ptr<geospatial::GeoStatistics> geospatial_statistics = metadata_geo_stats(); |
| ASSERT_TRUE(geospatial_statistics != nullptr); |
| EXPECT_DOUBLE_EQ(1, geospatial_statistics->lower_bound()[0]); |
| EXPECT_DOUBLE_EQ(2, geospatial_statistics->lower_bound()[1]); |
| EXPECT_DOUBLE_EQ(98, geospatial_statistics->upper_bound()[0]); |
| EXPECT_DOUBLE_EQ(99, geospatial_statistics->upper_bound()[1]); |
| EXPECT_THAT(geospatial_statistics->dimension_valid(), |
| ::testing::ElementsAre(true, true, false, false)); |
| } |
| |
| TEST_F(TestGeometryValuesWriter, TestWriteAndReadAllNull) { |
| this->SetUpSchema(Repetition::OPTIONAL, 1); |
| this->values_.resize(SMALL_SIZE); |
| std::fill(this->values_.begin(), this->values_.end(), ByteArray{0, nullptr}); |
| this->def_levels_.resize(SMALL_SIZE); |
| std::fill(this->def_levels_.begin(), this->def_levels_.end(), 0); |
| auto writer = this->BuildWriter(SMALL_SIZE); |
| writer->WriteBatch(this->values_.size(), this->def_levels_.data(), nullptr, |
| this->values_.data()); |
| |
| writer->Close(); |
| this->ReadColumn(); |
| for (int i = 0; i < SMALL_SIZE; i++) { |
| EXPECT_EQ(this->definition_levels_out_[i], 0); |
| } |
| |
| // Statistics are unset because the sort order is unknown |
| ASSERT_FALSE(metadata_accessor()->is_stats_set()); |
| ASSERT_EQ(metadata_accessor()->statistics(), nullptr); |
| |
| // GeoStatistics should exist but all components should be marked as uncalculated |
| ASSERT_TRUE(metadata_accessor()->is_geo_stats_set()); |
| std::shared_ptr<geospatial::GeoStatistics> geospatial_statistics = metadata_geo_stats(); |
| ASSERT_TRUE(geospatial_statistics != nullptr); |
| EXPECT_THAT(geospatial_statistics->dimension_valid(), |
| ::testing::ElementsAre(false, false, false, false)); |
| EXPECT_EQ(geospatial_statistics->geometry_types(), std::nullopt); |
| } |
| |
| template <typename TestType> |
| class TestColumnWriterMaxRowsPerPage : public TestPrimitiveWriter<TestType> { |
| public: |
| TypedColumnWriter<TestType>* BuildWriter( |
| int64_t max_rows_per_page = kDefaultMaxRowsPerPage, |
| int64_t page_size = kDefaultDataPageSize) { |
| this->sink_ = CreateOutputStream(); |
| this->writer_properties_ = WriterProperties::Builder() |
| .max_rows_per_page(max_rows_per_page) |
| ->data_pagesize(page_size) |
| ->enable_write_page_index() |
| ->build(); |
| file_writer_ = ParquetFileWriter::Open( |
| this->sink_, std::static_pointer_cast<GroupNode>(this->schema_.schema_root()), |
| this->writer_properties_); |
| return static_cast<TypedColumnWriter<TestType>*>( |
| file_writer_->AppendRowGroup()->NextColumn()); |
| } |
| |
| void CloseWriter() const { file_writer_->Close(); } |
| |
| void BuildReader() { |
| ASSERT_OK_AND_ASSIGN(auto buffer, this->sink_->Finish()); |
| file_reader_ = ParquetFileReader::Open( |
| std::make_shared<::arrow::io::BufferReader>(buffer), default_reader_properties()); |
| this->reader_ = std::static_pointer_cast<TypedColumnReader<TestType>>( |
| file_reader_->RowGroup(0)->Column(0)); |
| } |
| |
| void VerifyMaxRowsPerPage(int64_t max_rows_per_page) const { |
| auto file_meta = file_reader_->metadata(); |
| int64_t num_row_groups = file_meta->num_row_groups(); |
| ASSERT_EQ(num_row_groups, 1); |
| |
| auto page_index_reader = file_reader_->GetPageIndexReader(); |
| ASSERT_NE(page_index_reader, nullptr); |
| |
| auto row_group_page_index_reader = page_index_reader->RowGroup(0); |
| ASSERT_NE(row_group_page_index_reader, nullptr); |
| |
| auto offset_index = row_group_page_index_reader->GetOffsetIndex(0); |
| ASSERT_NE(offset_index, nullptr); |
| size_t num_pages = offset_index->page_locations().size(); |
| int64_t num_rows = 0; |
| for (size_t j = 1; j < num_pages; ++j) { |
| int64_t page_rows = offset_index->page_locations()[j].first_row_index - |
| offset_index->page_locations()[j - 1].first_row_index; |
| EXPECT_LE(page_rows, max_rows_per_page); |
| num_rows += page_rows; |
| } |
| if (num_pages != 0) { |
| int64_t last_page_rows = file_meta->RowGroup(0)->num_rows() - |
| offset_index->page_locations().back().first_row_index; |
| EXPECT_LE(last_page_rows, max_rows_per_page); |
| num_rows += last_page_rows; |
| } |
| |
| EXPECT_EQ(num_rows, file_meta->RowGroup(0)->num_rows()); |
| } |
| |
| private: |
| std::shared_ptr<ParquetFileWriter> file_writer_; |
| std::shared_ptr<ParquetFileReader> file_reader_; |
| }; |
| |
| TYPED_TEST_SUITE(TestColumnWriterMaxRowsPerPage, TestTypes); |
| |
| TYPED_TEST(TestColumnWriterMaxRowsPerPage, Optional) { |
| for (int64_t max_rows_per_page : {1, 10, 100}) { |
| this->SetUpSchema(Repetition::OPTIONAL); |
| this->GenerateData(SMALL_SIZE); |
| std::vector<int16_t> definition_levels(SMALL_SIZE, 1); |
| definition_levels[1] = 0; |
| |
| auto writer = this->BuildWriter(max_rows_per_page); |
| writer->WriteBatch(this->values_.size(), definition_levels.data(), nullptr, |
| this->values_ptr_); |
| this->CloseWriter(); |
| |
| this->BuildReader(); |
| ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page)); |
| } |
| } |
| |
| TYPED_TEST(TestColumnWriterMaxRowsPerPage, OptionalSpaced) { |
| for (int64_t max_rows_per_page : {1, 10, 100}) { |
| this->SetUpSchema(Repetition::OPTIONAL); |
| |
| this->GenerateData(SMALL_SIZE); |
| std::vector<int16_t> definition_levels(SMALL_SIZE, 1); |
| std::vector<uint8_t> valid_bits(::arrow::bit_util::BytesForBits(SMALL_SIZE), 255); |
| |
| definition_levels[SMALL_SIZE - 1] = 0; |
| ::arrow::bit_util::ClearBit(valid_bits.data(), SMALL_SIZE - 1); |
| definition_levels[1] = 0; |
| ::arrow::bit_util::ClearBit(valid_bits.data(), 1); |
| |
| auto writer = this->BuildWriter(max_rows_per_page); |
| writer->WriteBatchSpaced(this->values_.size(), definition_levels.data(), nullptr, |
| valid_bits.data(), 0, this->values_ptr_); |
| this->CloseWriter(); |
| |
| this->BuildReader(); |
| ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page)); |
| } |
| } |
| |
| TYPED_TEST(TestColumnWriterMaxRowsPerPage, Repeated) { |
| for (int64_t max_rows_per_page : {1, 10, 100}) { |
| this->SetUpSchema(Repetition::REPEATED); |
| |
| this->GenerateData(SMALL_SIZE); |
| std::vector<int16_t> definition_levels(SMALL_SIZE); |
| std::vector<int16_t> repetition_levels(SMALL_SIZE); |
| |
| // Generate levels to include variable-sized lists and empty lists |
| for (int i = 0; i < SMALL_SIZE; i++) { |
| int list_length = (i % 5) + 1; |
| if (i % 13 == 0 || i % 17 == 0) { |
| list_length = 0; |
| } |
| |
| if (list_length == 0) { |
| definition_levels[i] = 0; |
| repetition_levels[i] = 0; |
| } else { |
| for (int j = 0; j < list_length && i + j < SMALL_SIZE; j++) { |
| definition_levels[i + j] = 1; |
| repetition_levels[i + j] = (j == 0) ? 0 : 1; |
| } |
| i += list_length - 1; |
| } |
| } |
| |
| auto writer = this->BuildWriter(max_rows_per_page); |
| writer->WriteBatch(this->values_.size(), definition_levels.data(), |
| repetition_levels.data(), this->values_ptr_); |
| this->CloseWriter(); |
| |
| this->BuildReader(); |
| ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page)); |
| } |
| } |
| |
| TYPED_TEST(TestColumnWriterMaxRowsPerPage, RequiredLargeChunk) { |
| for (int64_t max_rows_per_page : {10, 100, 10000}) { |
| this->GenerateData(LARGE_SIZE); |
| |
| auto writer = this->BuildWriter(max_rows_per_page); |
| writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); |
| this->CloseWriter(); |
| |
| this->BuildReader(); |
| ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page)); |
| } |
| } |
| |
| template <typename TestType> |
| class TestBloomFilterWriter : public TestPrimitiveWriter<TestType> { |
| public: |
| void SetUp() override { |
| TestPrimitiveWriter<TestType>::SetUp(); |
| builder_ = nullptr; |
| bloom_filter_ = nullptr; |
| } |
| |
| std::shared_ptr<TypedColumnWriter<TestType>> BuildWriter( |
| int64_t output_size, const ColumnProperties& column_properties) { |
| this->sink_ = CreateOutputStream(); |
| |
| WriterProperties::Builder properties_builder; |
| if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY || |
| column_properties.encoding() == Encoding::RLE_DICTIONARY) { |
| properties_builder.enable_dictionary(); |
| } else { |
| properties_builder.disable_dictionary(); |
| properties_builder.encoding(column_properties.encoding()); |
| } |
| auto path = this->schema_.Column(0)->path(); |
| if (column_properties.bloom_filter_enabled()) { |
| properties_builder.enable_bloom_filter(path, |
| *column_properties.bloom_filter_options()); |
| } else { |
| properties_builder.disable_bloom_filter(path); |
| } |
| this->writer_properties_ = properties_builder.build(); |
| |
| this->metadata_ = |
| ColumnChunkMetaDataBuilder::Make(this->writer_properties_, this->descr_); |
| auto pager = PageWriter::Open(this->sink_, column_properties.compression(), |
| this->metadata_.get()); |
| |
| builder_ = BloomFilterBuilder::Make(&this->schema_, this->writer_properties_.get()); |
| builder_->AppendRowGroup(); |
| bloom_filter_ = builder_->CreateBloomFilter(/*column_ordinal=*/0); |
| |
| return std::dynamic_pointer_cast<TypedColumnWriter<TestType>>( |
| ColumnWriter::Make(this->metadata_.get(), std::move(pager), |
| this->writer_properties_.get(), bloom_filter_)); |
| } |
| |
| std::unique_ptr<BloomFilterBuilder> builder_; |
| BloomFilter* bloom_filter_{nullptr}; // Will be created and owned by `builder_`. |
| }; |
| |
| // Note: BooleanType is excluded. |
| using TestBloomFilterTypes = ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, |
| DoubleType, ByteArrayType, FLBAType>; |
| |
| TYPED_TEST_SUITE(TestBloomFilterWriter, TestBloomFilterTypes); |
| |
| TYPED_TEST(TestBloomFilterWriter, Basic) { |
| this->GenerateData(SMALL_SIZE); |
| |
| ColumnProperties column_properties; |
| column_properties.set_bloom_filter_options(BloomFilterOptions{SMALL_SIZE, 0.05}); |
| |
| auto writer = this->BuildWriter(SMALL_SIZE, column_properties); |
| writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); |
| writer->Close(); |
| |
| // Make sure that column values are read correctly |
| this->SetupValuesOut(SMALL_SIZE); |
| this->ReadColumnFully(); |
| ASSERT_EQ(SMALL_SIZE, this->values_read_); |
| ASSERT_EQ(this->values_, this->values_out_); |
| |
| // Verify values are found by bloom filter |
| for (auto& value : this->values_) { |
| if constexpr (std::is_same_v<TypeParam, FLBAType>) { |
| EXPECT_TRUE(this->bloom_filter_->FindHash( |
| this->bloom_filter_->Hash(&value, this->descr_->type_length()))); |
| } else { |
| EXPECT_TRUE(this->bloom_filter_->FindHash(this->bloom_filter_->Hash(value))); |
| } |
| } |
| } |
| |
| } // namespace test |
| } // namespace parquet |