| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <cstdint> |
| #include <cstdlib> |
| #include <functional> |
| #include <iostream> |
| #include <memory> |
| #include <string> |
| #include <type_traits> |
| #include <utility> |
| |
| #include <gmock/gmock.h> |
| #include <gtest/gtest.h> |
| |
| #include "arrow/json/rapidjson_defs.h" // IWYU pragma: keep |
| |
| #include <rapidjson/document.h> |
| #include <rapidjson/error/en.h> |
| #include <rapidjson/stringbuffer.h> |
| |
| #include "arrow/array.h" |
| #include "arrow/array/array_binary.h" |
| #include "arrow/array/builder_binary.h" |
| #include "arrow/buffer.h" |
| #include "arrow/io/file.h" |
| #include "arrow/testing/future_util.h" |
| #include "arrow/testing/gtest_util.h" |
| #include "arrow/testing/random.h" |
| #include "arrow/util/checked_cast.h" |
| #include "arrow/util/config.h" |
| #include "arrow/util/range.h" |
| |
| #include "parquet/column_reader.h" |
| #include "parquet/column_scanner.h" |
| #include "parquet/column_writer.h" |
| #include "parquet/file_reader.h" |
| #include "parquet/file_writer.h" |
| #include "parquet/geospatial/statistics.h" |
| #include "parquet/metadata.h" |
| #include "parquet/page_index.h" |
| #include "parquet/platform.h" |
| #include "parquet/printer.h" |
| #include "parquet/statistics.h" |
| #include "parquet/test_util.h" |
| #include "parquet/types.h" |
| |
| namespace rj = arrow::rapidjson; |
| |
| using arrow::internal::checked_pointer_cast; |
| using arrow::internal::Zip; |
| |
| namespace parquet { |
| using schema::GroupNode; |
| using schema::PrimitiveNode; |
| |
| using ReadableFile = ::arrow::io::ReadableFile; |
| |
| std::string data_file(const char* file) { |
| std::string dir_string(test::get_data_dir()); |
| std::stringstream ss; |
| ss << dir_string << "/" << file; |
| return ss.str(); |
| } |
| |
| std::string alltypes_plain() { return data_file("alltypes_plain.parquet"); } |
| |
| std::string nation_dict_truncated_data_page() { |
| return data_file("nation.dict-malformed.parquet"); |
| } |
| |
| // LZ4-compressed data files. |
| // These files come in three flavours: |
| // - legacy "LZ4" compression type, actually compressed with block LZ4 codec |
| // (as emitted by some earlier versions of parquet-cpp) |
| // - legacy "LZ4" compression type, actually compressed with custom Hadoop LZ4 codec |
| // (as emitted by parquet-mr) |
| // - "LZ4_RAW" compression type (added in Parquet format version 2.9.0) |
| |
| std::string hadoop_lz4_compressed() { return data_file("hadoop_lz4_compressed.parquet"); } |
| |
| std::string hadoop_lz4_compressed_larger() { |
| return data_file("hadoop_lz4_compressed_larger.parquet"); |
| } |
| |
| std::string non_hadoop_lz4_compressed() { |
| return data_file("non_hadoop_lz4_compressed.parquet"); |
| } |
| |
| std::string lz4_raw_compressed() { return data_file("lz4_raw_compressed.parquet"); } |
| |
| std::string lz4_raw_compressed_larger() { |
| return data_file("lz4_raw_compressed_larger.parquet"); |
| } |
| |
| std::string overflow_i16_page_ordinal() { |
| return data_file("overflow_i16_page_cnt.parquet"); |
| } |
| |
| std::string data_page_v1_corrupt_checksum() { |
| return data_file("datapage_v1-corrupt-checksum.parquet"); |
| } |
| |
| std::string data_page_v1_uncompressed_checksum() { |
| return data_file("datapage_v1-uncompressed-checksum.parquet"); |
| } |
| |
| std::string data_page_v1_snappy_checksum() { |
| return data_file("datapage_v1-snappy-compressed-checksum.parquet"); |
| } |
| |
| std::string plain_dict_uncompressed_checksum() { |
| return data_file("plain-dict-uncompressed-checksum.parquet"); |
| } |
| |
| std::string rle_dict_snappy_checksum() { |
| return data_file("rle-dict-snappy-checksum.parquet"); |
| } |
| |
| std::string rle_dict_uncompressed_corrupt_checksum() { |
| return data_file("rle-dict-uncompressed-corrupt-checksum.parquet"); |
| } |
| |
| std::string concatenated_gzip_members() { |
| return data_file("concatenated_gzip_members.parquet"); |
| } |
| |
| std::string byte_stream_split() { return data_file("byte_stream_split.zstd.parquet"); } |
| |
| std::string byte_stream_split_extended() { |
| return data_file("byte_stream_split_extended.gzip.parquet"); |
| } |
| |
| template <typename DType, typename ValueType = typename DType::c_type> |
| std::vector<ValueType> ReadColumnValues(ParquetFileReader* file_reader, int row_group, |
| int column, int64_t expected_values_read) { |
| auto column_reader = checked_pointer_cast<TypedColumnReader<DType>>( |
| file_reader->RowGroup(row_group)->Column(column)); |
| std::vector<ValueType> values(expected_values_read); |
| int64_t values_read; |
| auto levels_read = column_reader->ReadBatch(expected_values_read, nullptr, nullptr, |
| values.data(), &values_read); |
| EXPECT_EQ(expected_values_read, levels_read); |
| EXPECT_EQ(expected_values_read, values_read); |
| return values; |
| } |
| |
| template <typename ValueType> |
| void AssertColumnValuesEqual(const ColumnDescriptor* descr, |
| const std::vector<ValueType>& left_values, |
| const std::vector<ValueType>& right_values) { |
| if constexpr (std::is_same_v<ValueType, FLBA>) { |
| // operator== for FLBA in test_util.h is unusable (it hard-codes length to 12) |
| const auto length = descr->type_length(); |
| for (const auto& [left, right] : Zip(left_values, right_values)) { |
| std::string_view left_view(reinterpret_cast<const char*>(left.ptr), length); |
| std::string_view right_view(reinterpret_cast<const char*>(right.ptr), length); |
| ASSERT_EQ(left_view, right_view); |
| } |
| } else { |
| ASSERT_EQ(left_values, right_values); |
| } |
| } |
| |
| // TODO: Assert on definition and repetition levels |
| template <typename DType, typename ValueType = typename DType::c_type> |
| void AssertColumnValues(std::shared_ptr<TypedColumnReader<DType>> col, int64_t batch_size, |
| int64_t expected_levels_read, |
| const std::vector<ValueType>& expected_values, |
| int64_t expected_values_read) { |
| std::vector<ValueType> values(batch_size); |
| int64_t values_read; |
| |
| auto levels_read = |
| col->ReadBatch(batch_size, nullptr, nullptr, values.data(), &values_read); |
| ASSERT_EQ(expected_levels_read, levels_read); |
| ASSERT_EQ(expected_values_read, values_read); |
| AssertColumnValuesEqual(col->descr(), expected_values, values); |
| } |
| |
| template <typename DType, typename ValueType = typename DType::c_type> |
| void AssertColumnValuesEqual(std::shared_ptr<TypedColumnReader<DType>> left_col, |
| std::shared_ptr<TypedColumnReader<DType>> right_col, |
| int64_t batch_size, int64_t expected_levels_read, |
| int64_t expected_values_read) { |
| std::vector<ValueType> left_values(batch_size); |
| std::vector<ValueType> right_values(batch_size); |
| int64_t values_read, levels_read; |
| |
| levels_read = |
| left_col->ReadBatch(batch_size, nullptr, nullptr, left_values.data(), &values_read); |
| ASSERT_EQ(expected_levels_read, levels_read); |
| ASSERT_EQ(expected_values_read, values_read); |
| |
| levels_read = right_col->ReadBatch(batch_size, nullptr, nullptr, right_values.data(), |
| &values_read); |
| ASSERT_EQ(expected_levels_read, levels_read); |
| ASSERT_EQ(expected_values_read, values_read); |
| |
| AssertColumnValuesEqual(left_col->descr(), left_values, right_values); |
| } |
| |
| template <typename DType, typename ValueType = typename DType::c_type> |
| void AssertColumnValuesEqual(ParquetFileReader* file_reader, const std::string& left_col, |
| const std::string& right_col, int64_t num_rows, |
| int row_group = 0) { |
| ARROW_SCOPED_TRACE("left_col = '", left_col, "', right_col = '", right_col, "'"); |
| |
| auto left_col_index = file_reader->metadata()->schema()->ColumnIndex(left_col); |
| auto right_col_index = file_reader->metadata()->schema()->ColumnIndex(right_col); |
| auto row_group_reader = file_reader->RowGroup(row_group); |
| auto left_reader = checked_pointer_cast<TypedColumnReader<DType>>( |
| row_group_reader->Column(left_col_index)); |
| auto right_reader = checked_pointer_cast<TypedColumnReader<DType>>( |
| row_group_reader->Column(right_col_index)); |
| AssertColumnValuesEqual(left_reader, right_reader, num_rows, num_rows, num_rows); |
| } |
| |
| void CheckRowGroupMetadata(const RowGroupMetaData* rg_metadata, |
| bool allow_uncompressed_mismatch = false) { |
| const int64_t total_byte_size = rg_metadata->total_byte_size(); |
| const int64_t total_compressed_size = rg_metadata->total_compressed_size(); |
| |
| ASSERT_GE(total_byte_size, 0); |
| ASSERT_GE(total_compressed_size, 0); |
| |
| int64_t total_column_byte_size = 0; |
| int64_t total_column_compressed_size = 0; |
| for (int i = 0; i < rg_metadata->num_columns(); ++i) { |
| total_column_byte_size += rg_metadata->ColumnChunk(i)->total_uncompressed_size(); |
| total_column_compressed_size += rg_metadata->ColumnChunk(i)->total_compressed_size(); |
| } |
| |
| if (!allow_uncompressed_mismatch) { |
| ASSERT_EQ(total_byte_size, total_column_byte_size); |
| } |
| if (total_compressed_size != 0) { |
| ASSERT_EQ(total_compressed_size, total_column_compressed_size); |
| } |
| } |
| |
| class TestBooleanRLE : public ::testing::Test { |
| public: |
| void SetUp() { |
| reader_ = ParquetFileReader::OpenFile(data_file("rle_boolean_encoding.parquet")); |
| } |
| |
| void TearDown() {} |
| |
| protected: |
| std::unique_ptr<ParquetFileReader> reader_; |
| }; |
| |
| TEST_F(TestBooleanRLE, TestBooleanScanner) { |
| #ifndef ARROW_WITH_ZLIB |
| GTEST_SKIP() << "Test requires Zlib compression"; |
| #endif |
| int nvalues = 68; |
| int validation_values = 16; |
| |
| auto group = reader_->RowGroup(0); |
| |
| // column 0, id |
| auto scanner = std::make_shared<BoolScanner>(group->Column(0)); |
| |
| bool val = false; |
| bool is_null = false; |
| |
| // For this file, 3rd and 16th index value is null |
| std::vector<bool> expected_null = {false, false, true, false, false, false, |
| false, false, false, false, false, false, |
| false, false, false, true}; |
| std::vector<bool> expected_value = {true, false, false, true, true, false, |
| false, true, true, true, false, false, |
| true, true, false, false}; |
| |
| // Assert sizes are same |
| ASSERT_EQ(validation_values, expected_null.size()); |
| ASSERT_EQ(validation_values, expected_value.size()); |
| |
| for (int i = 0; i < validation_values; i++) { |
| ASSERT_TRUE(scanner->HasNext()); |
| ASSERT_TRUE(scanner->NextValue(&val, &is_null)); |
| |
| ASSERT_EQ(expected_null[i], is_null); |
| |
| // Only validate val if not null |
| if (!is_null) { |
| ASSERT_EQ(expected_value[i], val); |
| } |
| } |
| |
| // Loop through rest of the values to assert data exists |
| for (int i = validation_values; i < nvalues; i++) { |
| ASSERT_TRUE(scanner->HasNext()); |
| ASSERT_TRUE(scanner->NextValue(&val, &is_null)); |
| } |
| |
| // Attempt to read past end of column |
| ASSERT_FALSE(scanner->HasNext()); |
| ASSERT_FALSE(scanner->NextValue(&val, &is_null)); |
| } |
| |
| TEST_F(TestBooleanRLE, TestBatchRead) { |
| #ifndef ARROW_WITH_ZLIB |
| GTEST_SKIP() << "Test requires Zlib compression"; |
| #endif |
| int nvalues = 68; |
| int num_row_groups = 1; |
| int metadata_size = 111; |
| |
| auto group = reader_->RowGroup(0); |
| |
| // column 0, id |
| auto col = std::dynamic_pointer_cast<BoolReader>(group->Column(0)); |
| |
| // This file only has 68 rows |
| ASSERT_EQ(nvalues, reader_->metadata()->num_rows()); |
| // This file only has 1 row group |
| ASSERT_EQ(num_row_groups, reader_->metadata()->num_row_groups()); |
| // Size of the metadata is 111 bytes |
| ASSERT_EQ(metadata_size, reader_->metadata()->size()); |
| // This row group must have 68 rows |
| ASSERT_EQ(nvalues, group->metadata()->num_rows()); |
| |
| // Check if the column is encoded with RLE |
| auto col_chunk = group->metadata()->ColumnChunk(0); |
| ASSERT_TRUE(std::find(col_chunk->encodings().begin(), col_chunk->encodings().end(), |
| Encoding::RLE) != col_chunk->encodings().end()); |
| |
| // Assert column has values to be read |
| ASSERT_TRUE(col->HasNext()); |
| int64_t curr_batch_read = 0; |
| |
| const int16_t batch_size = 17; |
| const int16_t num_nulls = 2; |
| int16_t def_levels[batch_size]; |
| int16_t rep_levels[batch_size]; |
| bool values[batch_size]; |
| std::fill_n(values, batch_size, false); |
| |
| auto levels_read = |
| col->ReadBatch(batch_size, def_levels, rep_levels, values, &curr_batch_read); |
| ASSERT_EQ(batch_size, levels_read); |
| |
| // Since two value's are null value, expect batches read to be num_nulls less than |
| // indicated batch_size |
| ASSERT_EQ(batch_size - num_nulls, curr_batch_read); |
| |
| // 3rd index is null value |
| ASSERT_THAT(def_levels, |
| testing::ElementsAre(1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1)); |
| |
| // Validate inserted data is as expected |
| ASSERT_THAT(values, |
| testing::ElementsAre(1, 0, 1, 1, 0, 0, 1, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0)); |
| |
| // Loop through rest of the values and assert batch_size read |
| for (int i = batch_size; i < nvalues; i = i + batch_size) { |
| levels_read = |
| col->ReadBatch(batch_size, def_levels, rep_levels, values, &curr_batch_read); |
| ASSERT_EQ(batch_size, levels_read); |
| } |
| |
| // Now read past the end of the file |
| ASSERT_FALSE(col->HasNext()); |
| } |
| |
| class TestTextDeltaLengthByteArray : public ::testing::Test { |
| public: |
| void SetUp() { |
| reader_ = ParquetFileReader::OpenFile(data_file("delta_length_byte_array.parquet")); |
| } |
| |
| void TearDown() {} |
| |
| protected: |
| std::unique_ptr<ParquetFileReader> reader_; |
| }; |
| |
| TEST_F(TestTextDeltaLengthByteArray, TestTextScanner) { |
| #ifndef ARROW_WITH_ZSTD |
| GTEST_SKIP() << "Test requires Zstd compression"; |
| #endif |
| auto group = reader_->RowGroup(0); |
| |
| // column 0, id |
| auto scanner = std::make_shared<ByteArrayScanner>(group->Column(0)); |
| ByteArray val; |
| bool is_null; |
| std::string expected_prefix("apple_banana_mango"); |
| for (int i = 0; i < 1000; ++i) { |
| ASSERT_TRUE(scanner->HasNext()); |
| ASSERT_TRUE(scanner->NextValue(&val, &is_null)); |
| ASSERT_FALSE(is_null); |
| std::string expected = expected_prefix + std::to_string(i * i); |
| ASSERT_TRUE(val.len == expected.length()); |
| ASSERT_EQ(::std::string_view(reinterpret_cast<const char*>(val.ptr), val.len), |
| expected); |
| } |
| ASSERT_FALSE(scanner->HasNext()); |
| ASSERT_FALSE(scanner->NextValue(&val, &is_null)); |
| } |
| |
| TEST_F(TestTextDeltaLengthByteArray, TestBatchRead) { |
| #ifndef ARROW_WITH_ZSTD |
| GTEST_SKIP() << "Test requires Zstd compression"; |
| #endif |
| auto group = reader_->RowGroup(0); |
| |
| // column 0, id |
| auto col = std::dynamic_pointer_cast<ByteArrayReader>(group->Column(0)); |
| |
| // This file only has 1000 rows |
| ASSERT_EQ(1000, reader_->metadata()->num_rows()); |
| // This file only has 1 row group |
| ASSERT_EQ(1, reader_->metadata()->num_row_groups()); |
| // Size of the metadata is 105 bytes |
| ASSERT_EQ(105, reader_->metadata()->size()); |
| // This row group must have 1000 rows |
| ASSERT_EQ(1000, group->metadata()->num_rows()); |
| |
| // Check if the column is encoded with DELTA_LENGTH_BYTE_ARRAY |
| auto col_chunk = group->metadata()->ColumnChunk(0); |
| |
| ASSERT_TRUE(std::find(col_chunk->encodings().begin(), col_chunk->encodings().end(), |
| Encoding::DELTA_LENGTH_BYTE_ARRAY) != |
| col_chunk->encodings().end()); |
| |
| ASSERT_TRUE(col->HasNext()); |
| int64_t values_read = 0; |
| int64_t curr_batch_read; |
| std::string expected_prefix("apple_banana_mango"); |
| while (values_read < 1000) { |
| const int16_t batch_size = 25; |
| int16_t def_levels[batch_size]; |
| int16_t rep_levels[batch_size]; |
| ByteArray values[batch_size]; |
| |
| auto levels_read = |
| col->ReadBatch(batch_size, def_levels, rep_levels, values, &curr_batch_read); |
| ASSERT_EQ(batch_size, levels_read); |
| ASSERT_EQ(batch_size, curr_batch_read); |
| for (int16_t i = 0; i < batch_size; i++) { |
| auto expected = |
| expected_prefix + std::to_string((i + values_read) * (i + values_read)); |
| ASSERT_TRUE(values[i].len == expected.length()); |
| ASSERT_EQ( |
| ::std::string_view(reinterpret_cast<const char*>(values[i].ptr), values[i].len), |
| expected); |
| } |
| values_read += curr_batch_read; |
| } |
| |
| // Now read past the end of the file |
| ASSERT_FALSE(col->HasNext()); |
| } |
| |
| class TestAllTypesPlain : public ::testing::Test { |
| public: |
| void SetUp() { reader_ = ParquetFileReader::OpenFile(alltypes_plain()); } |
| |
| void TearDown() {} |
| |
| protected: |
| std::unique_ptr<ParquetFileReader> reader_; |
| }; |
| |
| TEST_F(TestAllTypesPlain, NoopConstructDestruct) {} |
| |
| TEST_F(TestAllTypesPlain, RowGroupMetaData) { |
| auto group = reader_->RowGroup(0); |
| CheckRowGroupMetadata(group->metadata()); |
| } |
| |
| TEST_F(TestAllTypesPlain, TestBatchRead) { |
| std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0); |
| |
| // column 0, id |
| std::shared_ptr<Int32Reader> col = |
| std::dynamic_pointer_cast<Int32Reader>(group->Column(0)); |
| |
| int16_t def_levels[4]; |
| int16_t rep_levels[4]; |
| int32_t values[4]; |
| |
| // This file only has 8 rows |
| ASSERT_EQ(8, reader_->metadata()->num_rows()); |
| // This file only has 1 row group |
| ASSERT_EQ(1, reader_->metadata()->num_row_groups()); |
| // Size of the metadata is 730 bytes |
| ASSERT_EQ(730, reader_->metadata()->size()); |
| // This row group must have 8 rows |
| ASSERT_EQ(8, group->metadata()->num_rows()); |
| |
| ASSERT_TRUE(col->HasNext()); |
| int64_t values_read; |
| auto levels_read = col->ReadBatch(4, def_levels, rep_levels, values, &values_read); |
| ASSERT_EQ(4, levels_read); |
| ASSERT_EQ(4, values_read); |
| |
| // Now read past the end of the file |
| ASSERT_TRUE(col->HasNext()); |
| levels_read = col->ReadBatch(5, def_levels, rep_levels, values, &values_read); |
| ASSERT_EQ(4, levels_read); |
| ASSERT_EQ(4, values_read); |
| |
| ASSERT_FALSE(col->HasNext()); |
| } |
| |
| TEST_F(TestAllTypesPlain, RowGroupColumnBoundsChecking) { |
| // Part of PARQUET-1857 |
| ASSERT_THROW(reader_->RowGroup(reader_->metadata()->num_row_groups()), |
| ParquetException); |
| |
| auto row_group = reader_->RowGroup(0); |
| ASSERT_THROW(row_group->Column(row_group->metadata()->num_columns()), ParquetException); |
| ASSERT_THROW(row_group->GetColumnPageReader(row_group->metadata()->num_columns()), |
| ParquetException); |
| } |
| |
| TEST_F(TestAllTypesPlain, TestFlatScannerInt32) { |
| std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0); |
| |
| // column 0, id |
| std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0))); |
| int32_t val; |
| bool is_null; |
| for (int i = 0; i < 8; ++i) { |
| ASSERT_TRUE(scanner->HasNext()); |
| ASSERT_TRUE(scanner->NextValue(&val, &is_null)); |
| ASSERT_FALSE(is_null); |
| } |
| ASSERT_FALSE(scanner->HasNext()); |
| ASSERT_FALSE(scanner->NextValue(&val, &is_null)); |
| } |
| |
| TEST_F(TestAllTypesPlain, TestSetScannerBatchSize) { |
| std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0); |
| |
| // column 0, id |
| std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0))); |
| |
| ASSERT_EQ(128, scanner->batch_size()); |
| scanner->SetBatchSize(1024); |
| ASSERT_EQ(1024, scanner->batch_size()); |
| } |
| |
| TEST_F(TestAllTypesPlain, DebugPrintWorks) { |
| std::stringstream ss; |
| |
| std::list<int> columns; |
| ParquetFilePrinter printer(reader_.get()); |
| printer.DebugPrint(ss, columns); |
| |
| std::string result = ss.str(); |
| ASSERT_GT(result.size(), 0); |
| } |
| |
| TEST_F(TestAllTypesPlain, ColumnSelection) { |
| std::stringstream ss; |
| |
| std::list<int> columns; |
| columns.push_back(5); |
| columns.push_back(0); |
| columns.push_back(10); |
| ParquetFilePrinter printer(reader_.get()); |
| printer.DebugPrint(ss, columns); |
| |
| std::string result = ss.str(); |
| ASSERT_GT(result.size(), 0); |
| } |
| |
| TEST_F(TestAllTypesPlain, ColumnSelectionOutOfRange) { |
| std::stringstream ss; |
| |
| std::list<int> columns; |
| columns.push_back(100); |
| ParquetFilePrinter printer1(reader_.get()); |
| ASSERT_THROW(printer1.DebugPrint(ss, columns), ParquetException); |
| |
| columns.clear(); |
| columns.push_back(-1); |
| ParquetFilePrinter printer2(reader_.get()); |
| ASSERT_THROW(printer2.DebugPrint(ss, columns), ParquetException); |
| } |
| |
| // Tests that read_dense_for_nullable is passed down to the record |
| // reader. The functionality of read_dense_for_nullable is tested |
| // elsewhere. |
| TEST(TestFileReader, RecordReaderReadDenseForNullable) { |
| // We test the default which is false, and also test enabling and disabling |
| // read_dense_for_nullable. |
| std::vector<ReaderProperties> reader_properties(3); |
| reader_properties[1].enable_read_dense_for_nullable(); |
| reader_properties[2].disable_read_dense_for_nullable(); |
| for (const auto& reader_props : reader_properties) { |
| std::unique_ptr<ParquetFileReader> file_reader = ParquetFileReader::OpenFile( |
| alltypes_plain(), /* memory_map = */ false, reader_props); |
| std::shared_ptr<RowGroupReader> group = file_reader->RowGroup(0); |
| std::shared_ptr<internal::RecordReader> col_record_reader = group->RecordReader(0); |
| ASSERT_EQ(reader_props.read_dense_for_nullable(), |
| col_record_reader->read_dense_for_nullable()); |
| } |
| } |
| |
| // Tests getting a record reader from a row group reader. |
| TEST(TestFileReader, GetRecordReader) { |
| ReaderProperties reader_props; |
| std::unique_ptr<ParquetFileReader> file_reader = ParquetFileReader::OpenFile( |
| alltypes_plain(), /* memory_map = */ false, reader_props); |
| std::shared_ptr<RowGroupReader> group = file_reader->RowGroup(0); |
| |
| std::shared_ptr<internal::RecordReader> col_record_reader_ = group->RecordReader(0); |
| |
| ASSERT_TRUE(col_record_reader_->HasMoreData()); |
| auto records_read = col_record_reader_->ReadRecords(4); |
| ASSERT_EQ(records_read, 4); |
| ASSERT_EQ(4, col_record_reader_->values_written()); |
| ASSERT_EQ(4, col_record_reader_->levels_position()); |
| ASSERT_EQ(8, col_record_reader_->levels_written()); |
| } |
| |
| TEST(TestFileReader, RecordReaderWithExposingDictionary) { |
| const int num_rows = 1000; |
| |
| // Make schema |
| schema::NodeVector fields; |
| fields.push_back(PrimitiveNode::Make("field", Repetition::REQUIRED, Type::BYTE_ARRAY, |
| ConvertedType::NONE)); |
| auto schema = std::static_pointer_cast<GroupNode>( |
| GroupNode::Make("schema", Repetition::REQUIRED, fields)); |
| |
| // Write small batches and small data pages |
| std::shared_ptr<WriterProperties> writer_props = WriterProperties::Builder() |
| .write_batch_size(64) |
| ->data_pagesize(128) |
| ->enable_dictionary() |
| ->build(); |
| |
| ASSERT_OK_AND_ASSIGN(auto out_file, ::arrow::io::BufferOutputStream::Create()); |
| std::shared_ptr<ParquetFileWriter> file_writer = |
| ParquetFileWriter::Open(out_file, schema, writer_props); |
| |
| RowGroupWriter* rg_writer = file_writer->AppendRowGroup(); |
| |
| // write one column |
| ::arrow::random::RandomArrayGenerator rag(0); |
| ByteArrayWriter* writer = static_cast<ByteArrayWriter*>(rg_writer->NextColumn()); |
| std::vector<std::string> raw_unique_data = {"a", "bc", "defg"}; |
| std::vector<ByteArray> col_typed; |
| for (int i = 0; i < num_rows; i++) { |
| std::string_view chosed_data = raw_unique_data[i % raw_unique_data.size()]; |
| col_typed.emplace_back(chosed_data); |
| } |
| writer->WriteBatch(num_rows, nullptr, nullptr, col_typed.data()); |
| rg_writer->Close(); |
| file_writer->Close(); |
| |
| // Open the reader |
| ASSERT_OK_AND_ASSIGN(auto file_buf, out_file->Finish()); |
| auto in_file = std::make_shared<::arrow::io::BufferReader>(file_buf); |
| |
| ReaderProperties reader_props; |
| reader_props.enable_buffered_stream(); |
| reader_props.set_buffer_size(64); |
| std::unique_ptr<ParquetFileReader> file_reader = |
| ParquetFileReader::Open(in_file, reader_props); |
| |
| auto row_group = file_reader->RowGroup(0); |
| auto record_reader = std::dynamic_pointer_cast<internal::DictionaryRecordReader>( |
| row_group->RecordReaderWithExposeEncoding(0, ExposedEncoding::DICTIONARY)); |
| ASSERT_NE(record_reader, nullptr); |
| ASSERT_TRUE(record_reader->read_dictionary()); |
| |
| int32_t dict_len = 0; |
| auto dict = |
| reinterpret_cast<const ByteArray*>(record_reader->ReadDictionary(&dict_len)); |
| ASSERT_NE(dict, nullptr); |
| ASSERT_EQ(dict_len, raw_unique_data.size()); |
| ASSERT_EQ(record_reader->ReadRecords(num_rows), num_rows); |
| std::shared_ptr<::arrow::ChunkedArray> result_array = record_reader->GetResult(); |
| ASSERT_EQ(result_array->num_chunks(), 1); |
| const std::shared_ptr<::arrow::Array> chunk = result_array->chunk(0); |
| auto dictionary_array = std::dynamic_pointer_cast<::arrow::DictionaryArray>(chunk); |
| const int32_t* indices = |
| (std::dynamic_pointer_cast<::arrow::Int32Array>(dictionary_array->indices())) |
| ->raw_values(); |
| |
| // Verify values based on the dictionary from ReadDictionary(). |
| int64_t indices_read = chunk->length(); |
| ASSERT_EQ(indices_read, num_rows); |
| for (int i = 0; i < indices_read; ++i) { |
| ASSERT_LT(indices[i], dict_len); |
| ASSERT_EQ(std::string_view(reinterpret_cast<const char* const>(dict[indices[i]].ptr), |
| dict[indices[i]].len), |
| col_typed[i]); |
| } |
| } |
| |
| class TestLocalFile : public ::testing::Test { |
| public: |
| void SetUp() { |
| std::string dir_string(test::get_data_dir()); |
| |
| std::stringstream ss; |
| ss << dir_string << "/" |
| << "alltypes_plain.parquet"; |
| |
| PARQUET_ASSIGN_OR_THROW(handle, ReadableFile::Open(ss.str())); |
| fileno = handle->file_descriptor(); |
| } |
| |
| void TearDown() {} |
| |
| protected: |
| int fileno; |
| std::shared_ptr<::arrow::io::ReadableFile> handle; |
| }; |
| |
| TEST_F(TestLocalFile, OpenWithMetadata) { |
| // PARQUET-808 |
| std::stringstream ss; |
| std::shared_ptr<FileMetaData> metadata = ReadMetaData(handle); |
| |
| auto reader = ParquetFileReader::Open(handle, default_reader_properties(), metadata); |
| |
| // Compare pointers |
| ASSERT_EQ(metadata.get(), reader->metadata().get()); |
| |
| std::list<int> columns; |
| ParquetFilePrinter printer(reader.get()); |
| printer.DebugPrint(ss, columns, true); |
| |
| // Make sure OpenFile passes on the external metadata, too |
| auto reader2 = ParquetFileReader::OpenFile(alltypes_plain(), false, |
| default_reader_properties(), metadata); |
| |
| // Compare pointers |
| ASSERT_EQ(metadata.get(), reader2->metadata().get()); |
| } |
| |
| class TestCheckDataPageCrc : public ::testing::Test { |
| public: |
| void OpenExampleFile(const std::string& file_path) { |
| file_reader_ = ParquetFileReader::OpenFile(file_path, |
| /*memory_map=*/false, reader_props_); |
| auto metadata_ptr = file_reader_->metadata(); |
| EXPECT_EQ(1, metadata_ptr->num_row_groups()); |
| EXPECT_EQ(2, metadata_ptr->num_columns()); |
| row_group_ = file_reader_->RowGroup(0); |
| |
| column_readers_.resize(2); |
| column_readers_[0] = row_group_->Column(0); |
| column_readers_[1] = row_group_->Column(1); |
| |
| page_readers_.resize(2); |
| page_readers_[0] = row_group_->GetColumnPageReader(0); |
| page_readers_[1] = row_group_->GetColumnPageReader(1); |
| } |
| |
| template <typename DType> |
| void CheckReadBatches(int col_index, int expect_values) { |
| ASSERT_GT(column_readers_.size(), col_index); |
| auto column_reader = |
| std::dynamic_pointer_cast<TypedColumnReader<DType>>(column_readers_[col_index]); |
| ASSERT_NE(nullptr, column_reader); |
| int64_t total_values = 0; |
| std::vector<typename DType::c_type> values(1024); |
| |
| while (column_reader->HasNext()) { |
| int64_t values_read; |
| int64_t levels_read = column_reader->ReadBatch(values.size(), nullptr, nullptr, |
| values.data(), &values_read); |
| EXPECT_EQ(levels_read, values_read); |
| total_values += values_read; |
| } |
| EXPECT_EQ(expect_values, total_values); |
| } |
| |
| void CheckCorrectCrc(const std::string& file_path, bool page_checksum_verification) { |
| reader_props_.set_page_checksum_verification(page_checksum_verification); |
| { |
| // Exercise column readers |
| OpenExampleFile(file_path); |
| CheckReadBatches<Int32Type>(/*col_index=*/0, kDataPageValuesPerColumn); |
| CheckReadBatches<Int32Type>(/*col_index=*/1, kDataPageValuesPerColumn); |
| } |
| { |
| // Exercise page readers directly |
| OpenExampleFile(file_path); |
| for (auto& page_reader : page_readers_) { |
| EXPECT_NE(nullptr, page_reader->NextPage()); |
| EXPECT_NE(nullptr, page_reader->NextPage()); |
| EXPECT_EQ(nullptr, page_reader->NextPage()); |
| } |
| } |
| } |
| |
| void CheckCorrectDictCrc(const std::string& file_path, |
| bool page_checksum_verification) { |
| reader_props_.set_page_checksum_verification(page_checksum_verification); |
| { |
| // Exercise column readers |
| OpenExampleFile(file_path); |
| CheckReadBatches<Int64Type>(/*col_index=*/0, kDictPageValuesPerColumn); |
| CheckReadBatches<ByteArrayType>(/*col_index=*/1, kDictPageValuesPerColumn); |
| } |
| { |
| // Exercise page readers directly |
| OpenExampleFile(file_path); |
| for (auto& page_reader : page_readers_) { |
| // read dict page |
| EXPECT_NE(nullptr, page_reader->NextPage()); |
| // read data page |
| EXPECT_NE(nullptr, page_reader->NextPage()); |
| } |
| } |
| } |
| |
| void CheckNextPageCorrupt(PageReader* page_reader) { |
| AssertCrcValidationError([&]() { page_reader->NextPage(); }); |
| } |
| |
| void AssertCrcValidationError(std::function<void()> func) { |
| EXPECT_THROW_THAT( |
| func, ParquetException, |
| ::testing::Property(&ParquetException::what, |
| ::testing::HasSubstr("CRC checksum verification failed"))); |
| } |
| |
| protected: |
| static constexpr int kDataPageSize = 1024 * 10; |
| // Example CRC files have two v1 data pages per column |
| static constexpr int kDataPageValuesPerColumn = kDataPageSize * 2 / sizeof(int32_t); |
| static constexpr int kDictPageValuesPerColumn = 1000; |
| |
| ReaderProperties reader_props_; |
| std::unique_ptr<ParquetFileReader> file_reader_; |
| std::shared_ptr<RowGroupReader> row_group_; |
| std::vector<std::shared_ptr<ColumnReader>> column_readers_; |
| std::vector<std::unique_ptr<PageReader>> page_readers_; |
| }; |
| |
| TEST_F(TestCheckDataPageCrc, CorruptPageV1) { |
| // Works when not checking crc |
| CheckCorrectCrc(data_page_v1_corrupt_checksum(), |
| /*page_checksum_verification=*/false); |
| // Fails when checking crc |
| reader_props_.set_page_checksum_verification(true); |
| { |
| // With column readers |
| OpenExampleFile(data_page_v1_corrupt_checksum()); |
| |
| AssertCrcValidationError([this]() { |
| CheckReadBatches<Int32Type>(/*col_index=*/0, kDataPageValuesPerColumn); |
| }); |
| AssertCrcValidationError([this]() { |
| CheckReadBatches<Int32Type>(/*col_index=*/1, kDataPageValuesPerColumn); |
| }); |
| } |
| { |
| // With page readers |
| OpenExampleFile(data_page_v1_corrupt_checksum()); |
| |
| // First column has a corrupt CRC in first page |
| CheckNextPageCorrupt(page_readers_[0].get()); |
| EXPECT_NE(nullptr, page_readers_[0]->NextPage()); |
| EXPECT_EQ(nullptr, page_readers_[0]->NextPage()); |
| |
| // Second column has a corrupt CRC in second page |
| EXPECT_NE(nullptr, page_readers_[1]->NextPage()); |
| CheckNextPageCorrupt(page_readers_[1].get()); |
| EXPECT_EQ(nullptr, page_readers_[1]->NextPage()); |
| } |
| } |
| |
| TEST_F(TestCheckDataPageCrc, UncompressedPageV1) { |
| CheckCorrectCrc(data_page_v1_uncompressed_checksum(), |
| /*page_checksum_verification=*/false); |
| CheckCorrectCrc(data_page_v1_uncompressed_checksum(), |
| /*page_checksum_verification=*/true); |
| } |
| |
| TEST_F(TestCheckDataPageCrc, SnappyPageV1) { |
| #ifndef ARROW_WITH_SNAPPY |
| GTEST_SKIP() << "Test requires Snappy compression"; |
| #endif |
| CheckCorrectCrc(data_page_v1_snappy_checksum(), |
| /*page_checksum_verification=*/false); |
| CheckCorrectCrc(data_page_v1_snappy_checksum(), |
| /*page_checksum_verification=*/true); |
| } |
| |
| TEST_F(TestCheckDataPageCrc, UncompressedDict) { |
| CheckCorrectDictCrc(plain_dict_uncompressed_checksum(), |
| /*page_checksum_verification=*/false); |
| CheckCorrectDictCrc(plain_dict_uncompressed_checksum(), |
| /*page_checksum_verification=*/true); |
| } |
| |
| TEST_F(TestCheckDataPageCrc, SnappyDict) { |
| #ifndef ARROW_WITH_SNAPPY |
| GTEST_SKIP() << "Test requires Snappy compression"; |
| #endif |
| CheckCorrectDictCrc(rle_dict_snappy_checksum(), |
| /*page_checksum_verification=*/false); |
| CheckCorrectDictCrc(rle_dict_snappy_checksum(), |
| /*page_checksum_verification=*/true); |
| } |
| |
| TEST_F(TestCheckDataPageCrc, CorruptDict) { |
| // Works when not checking crc |
| CheckCorrectDictCrc(rle_dict_uncompressed_corrupt_checksum(), |
| /*page_checksum_verification=*/false); |
| // Fails when checking crc |
| reader_props_.set_page_checksum_verification(true); |
| { |
| // With column readers |
| OpenExampleFile(rle_dict_uncompressed_corrupt_checksum()); |
| |
| AssertCrcValidationError([this]() { |
| CheckReadBatches<Int64Type>(/*col_index=*/0, kDictPageValuesPerColumn); |
| }); |
| AssertCrcValidationError([this]() { |
| CheckReadBatches<ByteArrayType>(/*col_index=*/1, kDictPageValuesPerColumn); |
| }); |
| } |
| { |
| // With page readers |
| OpenExampleFile(rle_dict_uncompressed_corrupt_checksum()); |
| |
| CheckNextPageCorrupt(page_readers_[0].get()); |
| EXPECT_NE(nullptr, page_readers_[0]->NextPage()); |
| |
| CheckNextPageCorrupt(page_readers_[1].get()); |
| EXPECT_NE(nullptr, page_readers_[1]->NextPage()); |
| } |
| } |
| |
| TEST(TestGzipMembersRead, TwoConcatenatedMembers) { |
| #ifndef ARROW_WITH_ZLIB |
| GTEST_SKIP() << "Test requires Zlib compression"; |
| #endif |
| auto file_reader = ParquetFileReader::OpenFile(concatenated_gzip_members(), |
| /*memory_map=*/false); |
| auto col_reader = std::dynamic_pointer_cast<TypedColumnReader<Int64Type>>( |
| file_reader->RowGroup(0)->Column(0)); |
| int64_t num_values = 0; |
| int64_t num_repdef = 0; |
| std::vector<int16_t> reps(1024); |
| std::vector<int16_t> defs(1024); |
| std::vector<int64_t> vals(1024); |
| |
| num_repdef = |
| col_reader->ReadBatch(1024, defs.data(), reps.data(), vals.data(), &num_values); |
| EXPECT_EQ(num_repdef, 513); |
| for (int64_t i = 0; i < num_repdef; i++) { |
| EXPECT_EQ(i + 1, vals[i]); |
| } |
| } |
| |
| TEST(TestFileReaderAdHoc, NationDictTruncatedDataPage) { |
| // PARQUET-816. Some files generated by older Parquet implementations may |
| // contain malformed data page metadata, and we can successfully decode them |
| // if we optimistically proceed to decoding, even if there is not enough data |
| // available in the stream. Before, we had quite aggressive checking of |
| // stream reads, which are not found e.g. in Impala's Parquet implementation |
| auto reader = ParquetFileReader::OpenFile(nation_dict_truncated_data_page(), false); |
| std::stringstream ss; |
| |
| // empty list means print all |
| std::list<int> columns; |
| ParquetFilePrinter printer1(reader.get()); |
| printer1.DebugPrint(ss, columns, true); |
| |
| reader = ParquetFileReader::OpenFile(nation_dict_truncated_data_page(), true); |
| std::stringstream ss2; |
| ParquetFilePrinter printer2(reader.get()); |
| printer2.DebugPrint(ss2, columns, true); |
| |
| // The memory-mapped reads runs over the end of the column chunk and succeeds |
| // by accident |
| ASSERT_EQ(ss2.str(), ss.str()); |
| } |
| |
| TEST(TestDumpWithLocalFile, DumpOutput) { |
| #ifndef ARROW_WITH_SNAPPY |
| GTEST_SKIP() << "Test requires Snappy compression"; |
| #endif |
| |
| std::string header_output = R"###(File Name: nested_lists.snappy.parquet |
| Version: 1.0 |
| Created By: parquet-mr version 1.8.2 (build c6522788629e590a53eb79874b95f6c3ff11f16c) |
| Total rows: 3 |
| Number of RowGroups: 1 |
| Number of Real Columns: 2 |
| Number of Columns: 2 |
| Number of Selected Columns: 2 |
| Column 0: a.list.element.list.element.list.element (BYTE_ARRAY / String / UTF8) |
| Column 1: b (INT32) |
| --- Row Group: 0 --- |
| --- Total Bytes: 155 --- |
| --- Total Compressed Bytes: 0 --- |
| --- Rows: 3 --- |
| Column 0 |
| Values: 18 Statistics Not Set |
| Compression: SNAPPY, Encodings: PLAIN_DICTIONARY(DICT_PAGE) PLAIN_DICTIONARY |
| Uncompressed Size: 103, Compressed Size: 104 |
| Column 1 |
| Values: 3, Null Values: 0, Distinct Values: 0 |
| Max (exact: unknown): 1, Min (exact: unknown): 1 |
| Compression: SNAPPY, Encodings: PLAIN_DICTIONARY(DICT_PAGE) PLAIN_DICTIONARY |
| Uncompressed Size: 52, Compressed Size: 56 |
| )###"; |
| std::string values_output = R"###(--- Values --- |
| element |b | |
| a |1 | |
| b |1 | |
| c |1 | |
| NULL | |
| d | |
| a | |
| b | |
| c | |
| d | |
| NULL | |
| e | |
| a | |
| b | |
| c | |
| d | |
| e | |
| NULL | |
| f | |
| |
| )###"; |
| std::string dump_output = R"###(--- Values --- |
| Column 0 |
| D:7 R:0 V:a |
| D:7 R:3 V:b |
| D:7 R:2 V:c |
| D:4 R:1 NULL |
| D:7 R:2 V:d |
| D:7 R:0 V:a |
| D:7 R:3 V:b |
| D:7 R:2 V:c |
| D:7 R:3 V:d |
| D:4 R:1 NULL |
| D:7 R:2 V:e |
| D:7 R:0 V:a |
| D:7 R:3 V:b |
| D:7 R:2 V:c |
| D:7 R:3 V:d |
| D:7 R:2 V:e |
| D:4 R:1 NULL |
| D:7 R:2 V:f |
| Column 1 |
| D:0 R:0 V:1 |
| D:0 R:0 V:1 |
| D:0 R:0 V:1 |
| )###"; |
| |
| // empty list means print all |
| std::list<int> columns; |
| |
| std::stringstream ss_values, ss_dump; |
| const char* file = "nested_lists.snappy.parquet"; |
| auto reader_props = default_reader_properties(); |
| auto reader = ParquetFileReader::OpenFile(data_file(file), false, reader_props); |
| ParquetFilePrinter printer(reader.get()); |
| |
| printer.DebugPrint(ss_values, columns, true, false, false, file); |
| printer.DebugPrint(ss_dump, columns, true, true, false, file); |
| |
| ASSERT_EQ(header_output + values_output, ss_values.str()); |
| ASSERT_EQ(header_output + dump_output, ss_dump.str()); |
| } |
| |
| class TestJSONWithLocalFile : public ::testing::Test { |
| public: |
| static std::string ReadFromLocalFile(std::string_view local_file_name) { |
| std::stringstream ss; |
| // empty list means print all |
| std::list<int> columns; |
| |
| auto reader = |
| ParquetFileReader::OpenFile(data_file(local_file_name.data()), |
| /*memory_map=*/false, default_reader_properties()); |
| ParquetFilePrinter printer(reader.get()); |
| printer.JSONPrint(ss, columns, local_file_name.data()); |
| |
| return ss.str(); |
| } |
| }; |
| |
| TEST_F(TestJSONWithLocalFile, JSONOutputWithStatistics) { |
| std::string json_output = R"###({ |
| "FileName": "nested_lists.snappy.parquet", |
| "Version": "1.0", |
| "CreatedBy": "parquet-mr version 1.8.2 (build c6522788629e590a53eb79874b95f6c3ff11f16c)", |
| "TotalRows": "3", |
| "NumberOfRowGroups": "1", |
| "NumberOfRealColumns": "2", |
| "NumberOfColumns": "2", |
| "Columns": [ |
| { "Id": "0", "Name": "a.list.element.list.element.list.element", "PhysicalType": "BYTE_ARRAY", "ConvertedType": "UTF8", "LogicalType": {"Type": "String"} }, |
| { "Id": "1", "Name": "b", "PhysicalType": "INT32", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} } |
| ], |
| "RowGroups": [ |
| { |
| "Id": "0", "TotalBytes": "155", "TotalCompressedBytes": "0", "Rows": "3", |
| "ColumnChunks": [ |
| {"Id": "0", "Values": "18", "StatsSet": "False", |
| "Compression": "SNAPPY", "Encodings": "PLAIN_DICTIONARY(DICT_PAGE) PLAIN_DICTIONARY", "UncompressedSize": "103", "CompressedSize": "104" }, |
| {"Id": "1", "Values": "3", "StatsSet": "True", "Stats": {"NumNulls": "0", "Max": "1", "Min": "1", "IsMaxValueExact": "unknown", "IsMinValueExact": "unknown" }, |
| "Compression": "SNAPPY", "Encodings": "PLAIN_DICTIONARY(DICT_PAGE) PLAIN_DICTIONARY", "UncompressedSize": "52", "CompressedSize": "56" } |
| ] |
| } |
| ] |
| } |
| )###"; |
| |
| std::string json_content = ReadFromLocalFile("nested_lists.snappy.parquet"); |
| ASSERT_EQ(json_output, json_content); |
| } |
| |
| TEST_F(TestJSONWithLocalFile, JSONOutput) { |
| std::string json_output = R"###({ |
| "FileName": "alltypes_plain.parquet", |
| "Version": "1.0", |
| "CreatedBy": "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)", |
| "TotalRows": "8", |
| "NumberOfRowGroups": "1", |
| "NumberOfRealColumns": "11", |
| "NumberOfColumns": "11", |
| "Columns": [ |
| { "Id": "0", "Name": "id", "PhysicalType": "INT32", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} }, |
| { "Id": "1", "Name": "bool_col", "PhysicalType": "BOOLEAN", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} }, |
| { "Id": "2", "Name": "tinyint_col", "PhysicalType": "INT32", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} }, |
| { "Id": "3", "Name": "smallint_col", "PhysicalType": "INT32", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} }, |
| { "Id": "4", "Name": "int_col", "PhysicalType": "INT32", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} }, |
| { "Id": "5", "Name": "bigint_col", "PhysicalType": "INT64", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} }, |
| { "Id": "6", "Name": "float_col", "PhysicalType": "FLOAT", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} }, |
| { "Id": "7", "Name": "double_col", "PhysicalType": "DOUBLE", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} }, |
| { "Id": "8", "Name": "date_string_col", "PhysicalType": "BYTE_ARRAY", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} }, |
| { "Id": "9", "Name": "string_col", "PhysicalType": "BYTE_ARRAY", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} }, |
| { "Id": "10", "Name": "timestamp_col", "PhysicalType": "INT96", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} } |
| ], |
| "RowGroups": [ |
| { |
| "Id": "0", "TotalBytes": "671", "TotalCompressedBytes": "0", "Rows": "8", |
| "ColumnChunks": [ |
| {"Id": "0", "Values": "8", "StatsSet": "False", |
| "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "73", "CompressedSize": "73" }, |
| {"Id": "1", "Values": "8", "StatsSet": "False", |
| "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "24", "CompressedSize": "24" }, |
| {"Id": "2", "Values": "8", "StatsSet": "False", |
| "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "47", "CompressedSize": "47" }, |
| {"Id": "3", "Values": "8", "StatsSet": "False", |
| "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "47", "CompressedSize": "47" }, |
| {"Id": "4", "Values": "8", "StatsSet": "False", |
| "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "47", "CompressedSize": "47" }, |
| {"Id": "5", "Values": "8", "StatsSet": "False", |
| "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "55", "CompressedSize": "55" }, |
| {"Id": "6", "Values": "8", "StatsSet": "False", |
| "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "47", "CompressedSize": "47" }, |
| {"Id": "7", "Values": "8", "StatsSet": "False", |
| "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "55", "CompressedSize": "55" }, |
| {"Id": "8", "Values": "8", "StatsSet": "False", |
| "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "88", "CompressedSize": "88" }, |
| {"Id": "9", "Values": "8", "StatsSet": "False", |
| "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "49", "CompressedSize": "49" }, |
| {"Id": "10", "Values": "8", "StatsSet": "False", |
| "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "139", "CompressedSize": "139" } |
| ] |
| } |
| ] |
| } |
| )###"; |
| |
| std::string json_content = ReadFromLocalFile("alltypes_plain.parquet"); |
| ASSERT_EQ(json_output, json_content); |
| } |
| |
| TEST_F(TestJSONWithLocalFile, JSONOutputFLBA) { |
| // min-max stats for FLBA contains non-utf8 output, so we don't check |
| // the whole json output. |
| std::string json_content = ReadFromLocalFile("fixed_length_byte_array.parquet"); |
| |
| std::string json_contains = R"###({ |
| "FileName": "fixed_length_byte_array.parquet", |
| "Version": "1.0", |
| "CreatedBy": "parquet-mr version 1.13.0-SNAPSHOT (build d057b39d93014fe40f5067ee4a33621e65c91552)", |
| "TotalRows": "1000", |
| "NumberOfRowGroups": "1", |
| "NumberOfRealColumns": "1", |
| "NumberOfColumns": "1", |
| "Columns": [ |
| { "Id": "0", "Name": "flba_field", "PhysicalType": "FIXED_LEN_BYTE_ARRAY(4)", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} } |
| ])###"; |
| |
| EXPECT_THAT(json_content, testing::HasSubstr(json_contains)); |
| } |
| |
| TEST_F(TestJSONWithLocalFile, JSONOutputSortColumns) { |
| std::string json_content = ReadFromLocalFile("sort_columns.parquet"); |
| |
| std::string json_contains = R"###("SortColumns": [ |
| {"column_idx": 0, "descending": 1, "nulls_first": 1}, |
| {"column_idx": 1, "descending": 0, "nulls_first": 0} |
| ])###"; |
| EXPECT_THAT(json_content, testing::HasSubstr(json_contains)); |
| } |
| |
| namespace { |
| |
| ::arrow::Status CheckJsonValid(std::string_view json_string) { |
| rj::Document json_doc; |
| constexpr auto kParseFlags = rj::kParseFullPrecisionFlag | rj::kParseNanAndInfFlag; |
| json_doc.Parse<kParseFlags>(json_string.data(), json_string.length()); |
| if (json_doc.HasParseError()) { |
| return ::arrow::Status::Invalid("JSON parse error at offset ", |
| json_doc.GetErrorOffset(), ": ", |
| rj::GetParseError_En(json_doc.GetParseError())); |
| } |
| return ::arrow::Status::OK(); |
| } |
| |
| } // namespace |
| |
| // GH-44101: Test that JSON output is valid JSON |
| TEST_F(TestJSONWithLocalFile, ValidJsonOutput) { |
| std::vector<std::string_view> check_file_lists = { |
| "data_index_bloom_encoding_with_length.parquet", |
| "data_index_bloom_encoding_stats.parquet", |
| "alltypes_tiny_pages_plain.parquet", |
| "concatenated_gzip_members.parquet", |
| "nulls.snappy.parquet", |
| "sort_columns.parquet"}; |
| for (const auto& file : check_file_lists) { |
| std::string json_content = ReadFromLocalFile(file); |
| ASSERT_OK(CheckJsonValid(json_content)) |
| << "Invalid JSON output for file: " << file << ", content:" << json_content; |
| } |
| } |
| |
| TEST(TestJSONWithMemoryFile, ValidJsonOutput) { |
| using ::arrow::internal::checked_cast; |
| auto schema = std::static_pointer_cast<GroupNode>(GroupNode::Make( |
| "schema", Repetition::REQUIRED, |
| schema::NodeVector{PrimitiveNode::Make("string_field", Repetition::REQUIRED, |
| LogicalType::String(), Type::BYTE_ARRAY), |
| PrimitiveNode::Make("binary_field", Repetition::REQUIRED, |
| LogicalType::None(), Type::BYTE_ARRAY)})); |
| |
| ASSERT_OK_AND_ASSIGN(auto out_file, ::arrow::io::BufferOutputStream::Create()); |
| auto file_writer = ParquetFileWriter::Open(out_file, schema); |
| auto row_group_writer = file_writer->AppendRowGroup(); |
| |
| // Write string column with valid UTF8 data |
| auto string_writer = checked_cast<ByteArrayWriter*>(row_group_writer->NextColumn()); |
| std::vector<std::string> utf8_strings = {"Hello", "World", "UTF8 测试", "🌟"}; |
| std::vector<ByteArray> string_values; |
| for (const auto& str : utf8_strings) { |
| string_values.emplace_back(std::string_view(str)); |
| } |
| string_writer->WriteBatch(string_values.size(), nullptr, nullptr, string_values.data()); |
| |
| // Write binary column with non-UTF8 data |
| auto binary_writer = checked_cast<ByteArrayWriter*>(row_group_writer->NextColumn()); |
| std::vector<std::vector<uint8_t>> binary_data = {{0x00, 0x01, 0x02, 0x03}, |
| {0xFF, 0xFE, 0xFD, 0xFC}, |
| {0x80, 0x81, 0x82, 0x83}, |
| {0xC0, 0xC1, 0xF5, 0xF6}}; |
| std::vector<ByteArray> binary_values; |
| for (const auto& data : binary_data) { |
| binary_values.emplace_back( |
| std::string_view(reinterpret_cast<const char*>(data.data()), data.size())); |
| } |
| binary_writer->WriteBatch(binary_values.size(), nullptr, nullptr, binary_values.data()); |
| |
| row_group_writer->Close(); |
| file_writer->Close(); |
| |
| // Read the file back and print as JSON |
| ASSERT_OK_AND_ASSIGN(auto file_buf, out_file->Finish()); |
| auto reader = |
| ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(file_buf)); |
| ParquetFilePrinter printer(reader.get()); |
| |
| // Verify the output is valid JSON |
| std::stringstream json_output; |
| printer.JSONPrint(json_output, {}); |
| std::string json_content = json_output.str(); |
| ASSERT_OK(CheckJsonValid(json_content)) << "Invalid JSON output: " << json_content; |
| } |
| |
| TEST(TestFileReader, BufferedReadsWithDictionary) { |
| const int num_rows = 1000; |
| |
| // Make schema |
| schema::NodeVector fields; |
| fields.push_back(PrimitiveNode::Make("field", Repetition::REQUIRED, Type::DOUBLE, |
| ConvertedType::NONE)); |
| auto schema = std::static_pointer_cast<GroupNode>( |
| GroupNode::Make("schema", Repetition::REQUIRED, fields)); |
| |
| // Write small batches and small data pages |
| std::shared_ptr<WriterProperties> writer_props = WriterProperties::Builder() |
| .write_batch_size(64) |
| ->data_pagesize(128) |
| ->enable_dictionary() |
| ->build(); |
| |
| ASSERT_OK_AND_ASSIGN(auto out_file, ::arrow::io::BufferOutputStream::Create()); |
| std::shared_ptr<ParquetFileWriter> file_writer = |
| ParquetFileWriter::Open(out_file, schema, writer_props); |
| |
| RowGroupWriter* rg_writer = file_writer->AppendRowGroup(); |
| |
| // write one column |
| ::arrow::random::RandomArrayGenerator rag(0); |
| DoubleWriter* writer = static_cast<DoubleWriter*>(rg_writer->NextColumn()); |
| std::shared_ptr<::arrow::Array> col = rag.Float64(num_rows, 0, 100); |
| const auto& col_typed = static_cast<const ::arrow::DoubleArray&>(*col); |
| writer->WriteBatch(num_rows, nullptr, nullptr, col_typed.raw_values()); |
| rg_writer->Close(); |
| file_writer->Close(); |
| |
| // Open the reader |
| ASSERT_OK_AND_ASSIGN(auto file_buf, out_file->Finish()); |
| auto in_file = std::make_shared<::arrow::io::BufferReader>(file_buf); |
| |
| ReaderProperties reader_props; |
| reader_props.enable_buffered_stream(); |
| reader_props.set_buffer_size(64); |
| std::unique_ptr<ParquetFileReader> file_reader = |
| ParquetFileReader::Open(in_file, reader_props); |
| |
| auto row_group = file_reader->RowGroup(0); |
| auto col_reader = std::static_pointer_cast<DoubleReader>( |
| row_group->ColumnWithExposeEncoding(0, ExposedEncoding::DICTIONARY)); |
| EXPECT_EQ(col_reader->GetExposedEncoding(), ExposedEncoding::DICTIONARY); |
| |
| auto indices = std::make_unique<int32_t[]>(num_rows); |
| const double* dict = nullptr; |
| int32_t dict_len = 0; |
| for (int row_index = 0; row_index < num_rows; ++row_index) { |
| const double* tmp_dict = nullptr; |
| int32_t tmp_dict_len = 0; |
| int64_t values_read = 0; |
| int64_t levels_read = col_reader->ReadBatchWithDictionary( |
| /*batch_size=*/1, /*def_levels=*/nullptr, /*rep_levels=*/nullptr, |
| indices.get() + row_index, &values_read, &tmp_dict, &tmp_dict_len); |
| |
| if (tmp_dict != nullptr) { |
| EXPECT_EQ(values_read, 1); |
| dict = tmp_dict; |
| dict_len = tmp_dict_len; |
| } else { |
| EXPECT_EQ(values_read, 0); |
| } |
| |
| ASSERT_EQ(1, levels_read); |
| ASSERT_EQ(1, values_read); |
| } |
| |
| // Check the results |
| for (int row_index = 0; row_index < num_rows; ++row_index) { |
| EXPECT_LT(indices[row_index], dict_len); |
| EXPECT_EQ(dict[indices[row_index]], col_typed.Value(row_index)); |
| } |
| } |
| |
| TEST(TestFileReader, PartiallyDictionaryEncodingNotExposed) { |
| const int num_rows = 1000; |
| |
| // Make schema |
| schema::NodeVector fields; |
| fields.push_back(PrimitiveNode::Make("field", Repetition::REQUIRED, Type::DOUBLE, |
| ConvertedType::NONE)); |
| auto schema = std::static_pointer_cast<GroupNode>( |
| GroupNode::Make("schema", Repetition::REQUIRED, fields)); |
| |
| // Write small batches and small data pages. Explicitly set the dictionary page size |
| // limit such that the column chunk will not be fully dictionary encoded. |
| std::shared_ptr<WriterProperties> writer_props = WriterProperties::Builder() |
| .write_batch_size(64) |
| ->data_pagesize(128) |
| ->enable_dictionary() |
| ->dictionary_pagesize_limit(4) |
| ->build(); |
| |
| ASSERT_OK_AND_ASSIGN(auto out_file, ::arrow::io::BufferOutputStream::Create()); |
| std::shared_ptr<ParquetFileWriter> file_writer = |
| ParquetFileWriter::Open(out_file, schema, writer_props); |
| |
| RowGroupWriter* rg_writer = file_writer->AppendRowGroup(); |
| |
| // write one column |
| ::arrow::random::RandomArrayGenerator rag(0); |
| DoubleWriter* writer = static_cast<DoubleWriter*>(rg_writer->NextColumn()); |
| std::shared_ptr<::arrow::Array> col = rag.Float64(num_rows, 0, 100); |
| const auto& col_typed = static_cast<const ::arrow::DoubleArray&>(*col); |
| writer->WriteBatch(num_rows, nullptr, nullptr, col_typed.raw_values()); |
| rg_writer->Close(); |
| file_writer->Close(); |
| |
| // Open the reader |
| ASSERT_OK_AND_ASSIGN(auto file_buf, out_file->Finish()); |
| auto in_file = std::make_shared<::arrow::io::BufferReader>(file_buf); |
| |
| ReaderProperties reader_props; |
| reader_props.enable_buffered_stream(); |
| reader_props.set_buffer_size(64); |
| std::unique_ptr<ParquetFileReader> file_reader = |
| ParquetFileReader::Open(in_file, reader_props); |
| |
| auto row_group = file_reader->RowGroup(0); |
| auto col_reader = std::static_pointer_cast<DoubleReader>( |
| row_group->ColumnWithExposeEncoding(0, ExposedEncoding::DICTIONARY)); |
| EXPECT_NE(col_reader->GetExposedEncoding(), ExposedEncoding::DICTIONARY); |
| } |
| |
| TEST(TestFileReader, BufferedReads) { |
| // PARQUET-1636: Buffered reads were broken before introduction of |
| // RandomAccessFile::GetStream |
| |
| const int num_columns = 10; |
| const int num_rows = 1000; |
| |
| // Make schema |
| schema::NodeVector fields; |
| for (int i = 0; i < num_columns; ++i) { |
| fields.push_back(PrimitiveNode::Make("field" + std::to_string(i), |
| Repetition::REQUIRED, Type::DOUBLE, |
| ConvertedType::NONE)); |
| } |
| auto schema = std::static_pointer_cast<GroupNode>( |
| GroupNode::Make("schema", Repetition::REQUIRED, fields)); |
| |
| // Write small batches and small data pages |
| std::shared_ptr<WriterProperties> writer_props = |
| WriterProperties::Builder().write_batch_size(64)->data_pagesize(128)->build(); |
| |
| ASSERT_OK_AND_ASSIGN(auto out_file, ::arrow::io::BufferOutputStream::Create()); |
| std::shared_ptr<ParquetFileWriter> file_writer = |
| ParquetFileWriter::Open(out_file, schema, writer_props); |
| |
| RowGroupWriter* rg_writer = file_writer->AppendRowGroup(); |
| |
| ::arrow::ArrayVector column_data; |
| ::arrow::random::RandomArrayGenerator rag(0); |
| |
| // Scratch space for reads |
| ::arrow::BufferVector scratch_space; |
| |
| // write columns |
| for (int col_index = 0; col_index < num_columns; ++col_index) { |
| DoubleWriter* writer = static_cast<DoubleWriter*>(rg_writer->NextColumn()); |
| std::shared_ptr<::arrow::Array> col = rag.Float64(num_rows, 0, 100); |
| const auto& col_typed = static_cast<const ::arrow::DoubleArray&>(*col); |
| writer->WriteBatch(num_rows, nullptr, nullptr, col_typed.raw_values()); |
| column_data.push_back(col); |
| |
| // We use this later for reading back the columns |
| scratch_space.push_back( |
| AllocateBuffer(::arrow::default_memory_pool(), num_rows * sizeof(double))); |
| } |
| rg_writer->Close(); |
| file_writer->Close(); |
| |
| // Open the reader |
| ASSERT_OK_AND_ASSIGN(auto file_buf, out_file->Finish()); |
| auto in_file = std::make_shared<::arrow::io::BufferReader>(file_buf); |
| |
| ReaderProperties reader_props; |
| reader_props.enable_buffered_stream(); |
| reader_props.set_buffer_size(64); |
| std::unique_ptr<ParquetFileReader> file_reader = |
| ParquetFileReader::Open(in_file, reader_props); |
| |
| auto row_group = file_reader->RowGroup(0); |
| std::vector<std::shared_ptr<DoubleReader>> col_readers; |
| for (int col_index = 0; col_index < num_columns; ++col_index) { |
| col_readers.push_back( |
| std::static_pointer_cast<DoubleReader>(row_group->Column(col_index))); |
| } |
| |
| for (int row_index = 0; row_index < num_rows; ++row_index) { |
| for (int col_index = 0; col_index < num_columns; ++col_index) { |
| double* out = |
| reinterpret_cast<double*>(scratch_space[col_index]->mutable_data()) + row_index; |
| int64_t values_read = 0; |
| int64_t levels_read = |
| col_readers[col_index]->ReadBatch(1, nullptr, nullptr, out, &values_read); |
| |
| ASSERT_EQ(1, levels_read); |
| ASSERT_EQ(1, values_read); |
| } |
| } |
| |
| // Check the results |
| for (int col_index = 0; col_index < num_columns; ++col_index) { |
| ASSERT_TRUE( |
| scratch_space[col_index]->Equals(*column_data[col_index]->data()->buffers[1])); |
| } |
| } |
| |
| std::unique_ptr<ParquetFileReader> OpenBuffer(const std::string& contents) { |
| auto buffer = ::arrow::Buffer::FromString(contents); |
| return ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer)); |
| } |
| |
| ::arrow::Future<> OpenBufferAsync(const std::string& contents) { |
| auto buffer = ::arrow::Buffer::FromString(contents); |
| return ::arrow::Future<>( |
| ParquetFileReader::OpenAsync(std::make_shared<::arrow::io::BufferReader>(buffer))); |
| } |
| |
| TEST(TestFileReader, TestOpenErrors) { |
| EXPECT_THROW_THAT( |
| []() { OpenBuffer(""); }, ParquetInvalidOrCorruptedFileException, |
| ::testing::Property(&ParquetInvalidOrCorruptedFileException::what, |
| ::testing::HasSubstr("Parquet file size is 0 bytes"))); |
| EXPECT_THROW_THAT( |
| []() { OpenBuffer("AAAAPAR0"); }, ParquetInvalidOrCorruptedFileException, |
| ::testing::Property(&ParquetInvalidOrCorruptedFileException::what, |
| ::testing::HasSubstr("Parquet magic bytes not found"))); |
| EXPECT_THROW_THAT( |
| []() { OpenBuffer("APAR1"); }, ParquetInvalidOrCorruptedFileException, |
| ::testing::Property( |
| &ParquetInvalidOrCorruptedFileException::what, |
| ::testing::HasSubstr( |
| "Parquet file size is 5 bytes, smaller than the minimum file footer"))); |
| EXPECT_THROW_THAT( |
| []() { OpenBuffer("\xFF\xFF\xFF\x0FPAR1"); }, |
| ParquetInvalidOrCorruptedFileException, |
| ::testing::Property(&ParquetInvalidOrCorruptedFileException::what, |
| ::testing::HasSubstr("Parquet file size is 8 bytes, smaller " |
| "than the size reported by footer's"))); |
| EXPECT_THROW_THAT( |
| []() { OpenBuffer(std::string("\x00\x00\x00\x00PAR1", 8)); }, ParquetException, |
| ::testing::Property( |
| &ParquetException::what, |
| ::testing::HasSubstr("Couldn't deserialize thrift: No more data to read"))); |
| |
| EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT( |
| Invalid, ::testing::HasSubstr("Parquet file size is 0 bytes"), OpenBufferAsync("")); |
| EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT( |
| Invalid, ::testing::HasSubstr("Parquet magic bytes not found"), |
| OpenBufferAsync("AAAAPAR0")); |
| EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT( |
| Invalid, |
| ::testing::HasSubstr( |
| "Parquet file size is 5 bytes, smaller than the minimum file footer"), |
| OpenBufferAsync("APAR1")); |
| EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT( |
| Invalid, |
| ::testing::HasSubstr( |
| "Parquet file size is 8 bytes, smaller than the size reported by footer's"), |
| OpenBufferAsync("\xFF\xFF\xFF\x0FPAR1")); |
| EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT( |
| IOError, ::testing::HasSubstr("Couldn't deserialize thrift: No more data to read"), |
| OpenBufferAsync(std::string("\x00\x00\x00\x00PAR1", 8))); |
| } |
| |
| #undef EXPECT_THROW_THAT |
| |
| #ifdef ARROW_WITH_LZ4 |
| struct TestCodecParam { |
| std::string name; |
| std::string small_data_file; |
| std::string larger_data_file; |
| }; |
| |
| void PrintTo(const TestCodecParam& p, std::ostream* os) { *os << p.name; } |
| |
| class TestCodec : public ::testing::TestWithParam<TestCodecParam> { |
| protected: |
| const std::string& GetSmallDataFile() { return GetParam().small_data_file; } |
| |
| const std::string& GetLargerDataFile() { return GetParam().larger_data_file; } |
| }; |
| |
| TEST_P(TestCodec, SmallFileMetadataAndValues) { |
| std::unique_ptr<ParquetFileReader> reader_ = |
| ParquetFileReader::OpenFile(GetSmallDataFile()); |
| std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0); |
| const auto rg_metadata = group->metadata(); |
| |
| // This file only has 4 rows |
| ASSERT_EQ(4, reader_->metadata()->num_rows()); |
| // This file only has 3 columns |
| ASSERT_EQ(3, reader_->metadata()->num_columns()); |
| // This file only has 1 row group |
| ASSERT_EQ(1, reader_->metadata()->num_row_groups()); |
| |
| // This row group must have 4 rows |
| ASSERT_EQ(4, rg_metadata->num_rows()); |
| |
| // Some parquet-cpp versions are susceptible to PARQUET-2008 |
| const auto& app_ver = reader_->metadata()->writer_version(); |
| const bool allow_uncompressed_mismatch = |
| (app_ver.application_ == "parquet-cpp" && app_ver.version.major == 1 && |
| app_ver.version.minor == 5 && app_ver.version.patch == 1); |
| |
| CheckRowGroupMetadata(rg_metadata, allow_uncompressed_mismatch); |
| |
| // column 0, c0 |
| auto col0 = checked_pointer_cast<Int64Reader>(group->Column(0)); |
| std::vector<int64_t> expected_values = {1593604800, 1593604800, 1593604801, 1593604801}; |
| AssertColumnValues(col0, 4, 4, expected_values, 4); |
| |
| // column 1, c1 |
| std::vector<ByteArray> expected_byte_arrays = {ByteArray("abc"), ByteArray("def"), |
| ByteArray("abc"), ByteArray("def")}; |
| auto col1 = checked_pointer_cast<ByteArrayReader>(group->Column(1)); |
| AssertColumnValues(col1, 4, 4, expected_byte_arrays, 4); |
| |
| // column 2, v11 |
| std::vector<double> expected_double_values = {42.0, 7.7, 42.125, 7.7}; |
| auto col2 = checked_pointer_cast<DoubleReader>(group->Column(2)); |
| AssertColumnValues(col2, 4, 4, expected_double_values, 4); |
| } |
| |
| TEST_P(TestCodec, LargeFileValues) { |
| // Test codec with a larger data file such data may have been compressed |
| // in several "frames" (ARROW-9177) |
| auto file_path = GetParam().larger_data_file; |
| if (file_path.empty()) { |
| GTEST_SKIP() << "Larger data file not available for this codec"; |
| } |
| auto file = ParquetFileReader::OpenFile(file_path); |
| auto group = file->RowGroup(0); |
| |
| const int64_t kNumRows = 10000; |
| |
| ASSERT_EQ(kNumRows, file->metadata()->num_rows()); |
| ASSERT_EQ(1, file->metadata()->num_columns()); |
| ASSERT_EQ(1, file->metadata()->num_row_groups()); |
| ASSERT_EQ(kNumRows, group->metadata()->num_rows()); |
| |
| // column 0 ("a") |
| auto col = checked_pointer_cast<ByteArrayReader>(group->Column(0)); |
| std::vector<ByteArray> values(kNumRows); |
| int64_t values_read; |
| auto levels_read = |
| col->ReadBatch(kNumRows, nullptr, nullptr, values.data(), &values_read); |
| ASSERT_EQ(kNumRows, levels_read); |
| ASSERT_EQ(kNumRows, values_read); |
| ASSERT_EQ(values[0], ByteArray("c7ce6bef-d5b0-4863-b199-8ea8c7fb117b")); |
| ASSERT_EQ(values[1], ByteArray("e8fb9197-cb9f-4118-b67f-fbfa65f61843")); |
| ASSERT_EQ(values[kNumRows - 2], ByteArray("ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c")); |
| ASSERT_EQ(values[kNumRows - 1], ByteArray("85440778-460a-41ac-aa2e-ac3ee41696bf")); |
| } |
| |
| std::vector<TestCodecParam> test_codec_params{ |
| {"LegacyLZ4Hadoop", hadoop_lz4_compressed(), hadoop_lz4_compressed_larger()}, |
| {"LegacyLZ4NonHadoop", non_hadoop_lz4_compressed(), ""}, |
| {"LZ4Raw", lz4_raw_compressed(), lz4_raw_compressed_larger()}}; |
| |
| INSTANTIATE_TEST_SUITE_P(Lz4CodecTests, TestCodec, ::testing::ValuesIn(test_codec_params), |
| testing::PrintToStringParamName()); |
| #endif // ARROW_WITH_LZ4 |
| |
| // Test reading a data file with a ColumnChunk contains more than |
| // INT16_MAX pages. (GH-15074). |
| TEST(TestFileReader, TestOverflowInt16PageOrdinal) { |
| ReaderProperties reader_props; |
| auto file_reader = ParquetFileReader::OpenFile(overflow_i16_page_ordinal(), |
| /*memory_map=*/false, reader_props); |
| auto metadata_ptr = file_reader->metadata(); |
| EXPECT_EQ(1, metadata_ptr->num_row_groups()); |
| EXPECT_EQ(1, metadata_ptr->num_columns()); |
| auto row_group = file_reader->RowGroup(0); |
| |
| { |
| auto column_reader = |
| std::dynamic_pointer_cast<TypedColumnReader<BooleanType>>(row_group->Column(0)); |
| EXPECT_NE(nullptr, column_reader); |
| constexpr int kBatchLength = 1024; |
| std::array<bool, kBatchLength> boolean_values{}; |
| int64_t total_values = 0; |
| int64_t values_read = 0; |
| do { |
| values_read = 0; |
| column_reader->ReadBatch(kBatchLength, nullptr, nullptr, boolean_values.data(), |
| &values_read); |
| total_values += values_read; |
| for (int i = 0; i < values_read; ++i) { |
| EXPECT_FALSE(boolean_values[i]); |
| } |
| } while (values_read != 0); |
| EXPECT_EQ(40000, total_values); |
| } |
| { |
| auto page_reader = row_group->GetColumnPageReader(0); |
| int32_t page_ordinal = 0; |
| while (page_reader->NextPage() != nullptr) { |
| ++page_ordinal; |
| } |
| EXPECT_EQ(40000, page_ordinal); |
| } |
| } |
| |
| #ifdef ARROW_WITH_ZSTD |
| TEST(TestByteStreamSplit, FloatIntegrationFile) { |
| auto file_path = byte_stream_split(); |
| auto file = ParquetFileReader::OpenFile(file_path); |
| |
| const int64_t kNumRows = 300; |
| |
| ASSERT_EQ(kNumRows, file->metadata()->num_rows()); |
| ASSERT_EQ(2, file->metadata()->num_columns()); |
| ASSERT_EQ(1, file->metadata()->num_row_groups()); |
| |
| // column 0 ("f32") |
| { |
| auto values = |
| ReadColumnValues<FloatType>(file.get(), /*row_group=*/0, /*column=*/0, kNumRows); |
| ASSERT_EQ(values[0], 1.7640524f); |
| ASSERT_EQ(values[1], 0.4001572f); |
| ASSERT_EQ(values[kNumRows - 2], -0.39944902f); |
| ASSERT_EQ(values[kNumRows - 1], 0.37005588f); |
| } |
| // column 1 ("f64") |
| { |
| auto values = |
| ReadColumnValues<DoubleType>(file.get(), /*row_group=*/0, /*column=*/1, kNumRows); |
| ASSERT_EQ(values[0], -1.3065268517353166); |
| ASSERT_EQ(values[1], 1.658130679618188); |
| ASSERT_EQ(values[kNumRows - 2], -0.9301565025243212); |
| ASSERT_EQ(values[kNumRows - 1], -0.17858909208732915); |
| } |
| } |
| #endif // ARROW_WITH_ZSTD |
| |
| #ifdef ARROW_WITH_ZLIB |
| TEST(TestByteStreamSplit, ExtendedIntegrationFile) { |
| auto file_path = byte_stream_split_extended(); |
| auto file = ParquetFileReader::OpenFile(file_path); |
| |
| const int64_t kNumRows = 200; |
| |
| ASSERT_EQ(kNumRows, file->metadata()->num_rows()); |
| ASSERT_EQ(14, file->metadata()->num_columns()); |
| ASSERT_EQ(1, file->metadata()->num_row_groups()); |
| |
| AssertColumnValuesEqual<FloatType>(file.get(), "float_plain", "float_byte_stream_split", |
| kNumRows); |
| AssertColumnValuesEqual<DoubleType>(file.get(), "double_plain", |
| "double_byte_stream_split", kNumRows); |
| AssertColumnValuesEqual<Int32Type>(file.get(), "int32_plain", "int32_byte_stream_split", |
| kNumRows); |
| AssertColumnValuesEqual<Int64Type>(file.get(), "int64_plain", "int64_byte_stream_split", |
| kNumRows); |
| AssertColumnValuesEqual<FLBAType>(file.get(), "float16_plain", |
| "float16_byte_stream_split", kNumRows); |
| AssertColumnValuesEqual<FLBAType>(file.get(), "flba5_plain", "flba5_byte_stream_split", |
| kNumRows); |
| AssertColumnValuesEqual<FLBAType>(file.get(), "decimal_plain", |
| "decimal_byte_stream_split", kNumRows); |
| } |
| #endif // ARROW_WITH_ZLIB |
| |
| struct PageIndexReaderParam { |
| std::vector<int32_t> row_group_indices; |
| std::vector<int32_t> column_indices; |
| PageIndexSelection index_selection; |
| }; |
| |
| // For valgrind |
| std::ostream& operator<<(std::ostream& out, const PageIndexReaderParam& params) { |
| out << "PageIndexReaderParam{row_group_indices = "; |
| for (const auto& i : params.row_group_indices) { |
| out << i << ", "; |
| } |
| out << "column_indices = "; |
| for (const auto& i : params.column_indices) { |
| out << i << ", "; |
| } |
| |
| out << "index_selection = " << params.index_selection << "}"; |
| |
| return out; |
| } |
| |
| class ParameterizedPageIndexReaderTest |
| : public ::testing::TestWithParam<PageIndexReaderParam> {}; |
| |
| // Test reading a data file with page index. |
| TEST_P(ParameterizedPageIndexReaderTest, TestReadPageIndex) { |
| ReaderProperties properties; |
| auto file_reader = ParquetFileReader::OpenFile(data_file("alltypes_tiny_pages.parquet"), |
| /*memory_map=*/false, properties); |
| auto metadata = file_reader->metadata(); |
| EXPECT_EQ(1, metadata->num_row_groups()); |
| EXPECT_EQ(13, metadata->num_columns()); |
| |
| // Create the page index reader and provide different read hints. |
| auto page_index_reader = file_reader->GetPageIndexReader(); |
| ASSERT_NE(nullptr, page_index_reader); |
| const auto params = GetParam(); |
| const bool call_will_need = !params.row_group_indices.empty(); |
| if (call_will_need) { |
| page_index_reader->WillNeed(params.row_group_indices, params.column_indices, |
| params.index_selection); |
| } |
| |
| auto row_group_index_reader = page_index_reader->RowGroup(0); |
| if (!call_will_need || params.index_selection.offset_index || |
| params.index_selection.column_index) { |
| ASSERT_NE(nullptr, row_group_index_reader); |
| } else { |
| // None of page index is requested. |
| ASSERT_EQ(nullptr, row_group_index_reader); |
| return; |
| } |
| |
| auto column_index_requested = [&](int32_t column_id) { |
| return !call_will_need || |
| (params.index_selection.column_index && |
| (params.column_indices.empty() || |
| (std::find(params.column_indices.cbegin(), params.column_indices.cend(), |
| column_id) != params.column_indices.cend()))); |
| }; |
| |
| auto offset_index_requested = [&](int32_t column_id) { |
| return !call_will_need || |
| (params.index_selection.offset_index && |
| (params.column_indices.empty() || |
| (std::find(params.column_indices.cbegin(), params.column_indices.cend(), |
| column_id) != params.column_indices.cend()))); |
| }; |
| |
| if (offset_index_requested(0)) { |
| // Verify offset index of column 0 and only partial data as it contains 325 pages. |
| const size_t num_pages = 325; |
| const std::vector<size_t> page_indices = {0, 100, 200, 300}; |
| const std::vector<PageLocation> page_locations = { |
| PageLocation{4, 109, 0}, PageLocation{11480, 133, 2244}, |
| PageLocation{22980, 133, 4494}, PageLocation{34480, 133, 6744}}; |
| |
| auto offset_index = row_group_index_reader->GetOffsetIndex(0); |
| ASSERT_NE(nullptr, offset_index); |
| |
| EXPECT_EQ(num_pages, offset_index->page_locations().size()); |
| for (size_t i = 0; i < page_indices.size(); ++i) { |
| size_t page_id = page_indices.at(i); |
| const auto& read_page_location = offset_index->page_locations().at(page_id); |
| const auto& expected_page_location = page_locations.at(i); |
| EXPECT_EQ(expected_page_location.offset, read_page_location.offset); |
| EXPECT_EQ(expected_page_location.compressed_page_size, |
| read_page_location.compressed_page_size); |
| EXPECT_EQ(expected_page_location.first_row_index, |
| read_page_location.first_row_index); |
| } |
| } else { |
| EXPECT_THROW(row_group_index_reader->GetOffsetIndex(0), ParquetException); |
| } |
| |
| if (column_index_requested(5)) { |
| // Verify column index of column 5 and only partial data as it contains 528 pages. |
| const size_t num_pages = 528; |
| const BoundaryOrder::type boundary_order = BoundaryOrder::Unordered; |
| const std::vector<size_t> page_indices = {0, 99, 426, 520}; |
| const std::vector<bool> null_pages = {false, false, false, false}; |
| const bool has_null_counts = true; |
| const std::vector<int64_t> null_counts = {0, 0, 0, 0}; |
| const std::vector<int64_t> min_values = {0, 10, 0, 0}; |
| const std::vector<int64_t> max_values = {90, 90, 80, 70}; |
| |
| auto column_index = row_group_index_reader->GetColumnIndex(5); |
| ASSERT_NE(nullptr, column_index); |
| auto typed_column_index = std::dynamic_pointer_cast<Int64ColumnIndex>(column_index); |
| ASSERT_NE(nullptr, typed_column_index); |
| |
| EXPECT_EQ(num_pages, column_index->null_pages().size()); |
| EXPECT_EQ(has_null_counts, column_index->has_null_counts()); |
| EXPECT_EQ(boundary_order, column_index->boundary_order()); |
| for (size_t i = 0; i < page_indices.size(); ++i) { |
| size_t page_id = page_indices.at(i); |
| EXPECT_EQ(null_pages.at(i), column_index->null_pages().at(page_id)); |
| if (has_null_counts) { |
| EXPECT_EQ(null_counts.at(i), column_index->null_counts().at(page_id)); |
| } |
| if (!null_pages.at(i)) { |
| EXPECT_EQ(min_values.at(i), typed_column_index->min_values().at(page_id)); |
| EXPECT_EQ(max_values.at(i), typed_column_index->max_values().at(page_id)); |
| } |
| } |
| } else { |
| EXPECT_THROW(row_group_index_reader->GetColumnIndex(5), ParquetException); |
| } |
| |
| // Verify null is returned if column index does not exist. |
| auto column_index = row_group_index_reader->GetColumnIndex(10); |
| EXPECT_EQ(nullptr, column_index); |
| } |
| |
| INSTANTIATE_TEST_SUITE_P( |
| PageIndexReaderTests, ParameterizedPageIndexReaderTest, |
| ::testing::Values(PageIndexReaderParam{{}, {}, {true, true}}, |
| PageIndexReaderParam{{}, {}, {true, false}}, |
| PageIndexReaderParam{{}, {}, {false, true}}, |
| PageIndexReaderParam{{}, {}, {false, false}}, |
| PageIndexReaderParam{{0}, {}, {true, true}}, |
| PageIndexReaderParam{{0}, {}, {true, false}}, |
| PageIndexReaderParam{{0}, {}, {false, true}}, |
| PageIndexReaderParam{{0}, {}, {false, false}}, |
| PageIndexReaderParam{{0}, {0}, {true, true}}, |
| PageIndexReaderParam{{0}, {0}, {true, false}}, |
| PageIndexReaderParam{{0}, {0}, {false, true}}, |
| PageIndexReaderParam{{0}, {0}, {false, false}}, |
| PageIndexReaderParam{{0}, {5}, {true, true}}, |
| PageIndexReaderParam{{0}, {5}, {true, false}}, |
| PageIndexReaderParam{{0}, {5}, {false, true}}, |
| PageIndexReaderParam{{0}, {5}, {false, false}}, |
| PageIndexReaderParam{{0}, {0, 5}, {true, true}}, |
| PageIndexReaderParam{{0}, {0, 5}, {true, false}}, |
| PageIndexReaderParam{{0}, {0, 5}, {false, true}}, |
| PageIndexReaderParam{{0}, {0, 5}, {false, false}})); |
| |
| TEST(PageIndexReaderTest, ReadFileWithoutPageIndex) { |
| ReaderProperties properties; |
| auto file_reader = ParquetFileReader::OpenFile(data_file("int32_decimal.parquet"), |
| /*memory_map=*/false, properties); |
| auto metadata = file_reader->metadata(); |
| EXPECT_EQ(1, metadata->num_row_groups()); |
| |
| auto page_index_reader = file_reader->GetPageIndexReader(); |
| ASSERT_NE(nullptr, page_index_reader); |
| auto row_group_index_reader = page_index_reader->RowGroup(0); |
| ASSERT_EQ(nullptr, row_group_index_reader); |
| } |
| |
| class TestGeometryLogicalType : public ::testing::Test { |
| public: |
| const int kNumRows = 1000; |
| |
| void WriteTestData(std::shared_ptr<const LogicalType> type, bool write_arrow) { |
| // Make schema |
| schema::NodeVector fields; |
| fields.push_back( |
| PrimitiveNode::Make("g", Repetition::REQUIRED, type, Type::BYTE_ARRAY)); |
| auto schema = std::static_pointer_cast<GroupNode>( |
| GroupNode::Make("schema", Repetition::REQUIRED, fields)); |
| |
| // Write small batches and small data pages |
| auto writer_props_builder = WriterProperties::Builder(); |
| writer_props_builder.write_batch_size(64); |
| |
| std::shared_ptr<WriterProperties> writer_props = writer_props_builder.build(); |
| |
| ASSERT_OK_AND_ASSIGN(auto out_file, ::arrow::io::BufferOutputStream::Create()); |
| std::shared_ptr<ParquetFileWriter> file_writer = |
| ParquetFileWriter::Open(out_file, schema, writer_props); |
| RowGroupWriter* rg_writer = file_writer->AppendRowGroup(); |
| |
| // write WKB points to columns |
| auto* writer = |
| ::arrow::internal::checked_cast<ByteArrayWriter*>(rg_writer->NextColumn()); |
| if (!write_arrow) { |
| WriteTestDataUsingWriteBatch(writer); |
| } else { |
| WriteTestDataUsingWriteArrow(writer); |
| } |
| |
| rg_writer->Close(); |
| file_writer->Close(); |
| |
| ASSERT_OK_AND_ASSIGN(file_buf, out_file->Finish()); |
| } |
| |
| void WriteTestDataUsingWriteBatch(ByteArrayWriter* writer) { |
| std::vector<uint8_t> buffer(test::kWkbPointXYSize * kNumRows); |
| uint8_t* ptr = buffer.data(); |
| std::vector<ByteArray> values(kNumRows); |
| for (int k = 0; k < kNumRows; k++) { |
| std::string item = test::MakeWKBPoint( |
| {static_cast<double>(k), static_cast<double>(k + 1)}, false, false); |
| std::memcpy(ptr, item.data(), item.size()); |
| values[k].len = test::kWkbPointXYSize; |
| values[k].ptr = ptr; |
| ptr += test::kWkbPointXYSize; |
| } |
| writer->WriteBatch(kNumRows, nullptr, nullptr, values.data()); |
| } |
| |
| void WriteTestDataUsingWriteArrow(ByteArrayWriter* writer) { |
| ::arrow::BinaryBuilder builder; |
| for (int k = 0; k < kNumRows; k++) { |
| std::string item = test::MakeWKBPoint( |
| {static_cast<double>(k), static_cast<double>(k + 1)}, false, false); |
| |
| ASSERT_OK(builder.Append(item)); |
| } |
| std::shared_ptr<::arrow::BinaryArray> array; |
| ASSERT_OK(builder.Finish(&array)); |
| |
| std::shared_ptr<ArrowWriterProperties> properties = |
| ArrowWriterProperties::Builder().build(); |
| MemoryPool* pool = ::arrow::default_memory_pool(); |
| auto ctx = std::make_unique<ArrowWriteContext>(pool, properties.get()); |
| ASSERT_OK(writer->WriteArrow(nullptr, nullptr, kNumRows, *array, ctx.get(), |
| /*leaf_field_nullable=*/true)); |
| } |
| |
| void TestWriteAndRead(std::shared_ptr<const LogicalType> type, bool write_arrow) { |
| ASSERT_NO_FATAL_FAILURE(WriteTestData(type, write_arrow)); |
| |
| auto in_file = std::make_shared<::arrow::io::BufferReader>(file_buf); |
| |
| ReaderProperties reader_props; |
| reader_props.enable_buffered_stream(); |
| reader_props.set_buffer_size(64); |
| auto file_reader = ParquetFileReader::Open(in_file, reader_props); |
| |
| // Check that the geometry statistics are correctly written and read |
| auto metadata = file_reader->metadata(); |
| ASSERT_TRUE(type->Equals(*metadata->schema()->Column(0)->logical_type())); |
| |
| auto page_index_reader = file_reader->GetPageIndexReader(); |
| int num_row_groups = metadata->num_row_groups(); |
| int64_t start_index = 0; |
| for (int i = 0; i < num_row_groups; i++) { |
| auto row_group_metadata = metadata->RowGroup(i); |
| auto column_chunk_metadata = row_group_metadata->ColumnChunk(0); |
| auto geo_stats = column_chunk_metadata->geo_statistics(); |
| ASSERT_NO_FATAL_FAILURE(CheckGeoStatistics(type, geo_stats, start_index, |
| row_group_metadata->num_rows())); |
| start_index += row_group_metadata->num_rows(); |
| } |
| |
| // Check the geometry values |
| int64_t total_values_read = 0; |
| for (int i = 0; i < num_row_groups; i++) { |
| auto row_group = file_reader->RowGroup(i); |
| std::shared_ptr<ByteArrayReader> reader = |
| std::static_pointer_cast<ByteArrayReader>(row_group->Column(0)); |
| while (reader->HasNext()) { |
| std::vector<ByteArray> out(kNumRows); |
| int64_t values_read = 0; |
| int64_t levels_read = |
| reader->ReadBatch(kNumRows, nullptr, nullptr, out.data(), &values_read); |
| ASSERT_GE(levels_read, 1); |
| ASSERT_GE(values_read, 1); |
| |
| // Check the batch |
| for (int64_t i = 0; i < values_read; i++) { |
| const ByteArray& value = out[i]; |
| auto xy = test::GetWKBPointCoordinateXY(value); |
| EXPECT_TRUE(xy.has_value()); |
| auto expected_x = static_cast<double>(i + total_values_read); |
| auto expected_y = static_cast<double>(i + 1 + total_values_read); |
| EXPECT_EQ(*xy, (std::pair<double, double>(expected_x, expected_y))); |
| } |
| |
| total_values_read += values_read; |
| } |
| } |
| EXPECT_EQ(kNumRows, total_values_read); |
| } |
| |
| void CheckGeoStatistics(std::shared_ptr<const LogicalType> type, |
| std::shared_ptr<geospatial::GeoStatistics> geom_stats, |
| int64_t start_index, int64_t num_rows) { |
| // We don't yet generate statistics for Geography |
| if (type->is_geography()) { |
| ASSERT_EQ(geom_stats, nullptr); |
| return; |
| } |
| |
| ASSERT_NE(geom_stats, nullptr); |
| // We wrote exactly one geometry type (POINT, which has code 1) |
| EXPECT_THAT(*geom_stats->geometry_types(), ::testing::ElementsAre(1)); |
| |
| double expected_xmin = static_cast<double>(start_index); |
| double expected_xmax = expected_xmin + num_rows - 1; |
| double expected_ymin = expected_xmin + 1; |
| double expected_ymax = expected_xmax + 1; |
| |
| EXPECT_EQ(geom_stats->lower_bound()[0], expected_xmin); |
| EXPECT_EQ(geom_stats->upper_bound()[0], expected_xmax); |
| EXPECT_EQ(geom_stats->lower_bound()[1], expected_ymin); |
| EXPECT_EQ(geom_stats->upper_bound()[1], expected_ymax); |
| EXPECT_THAT(geom_stats->dimension_valid(), |
| ::testing::ElementsAre(true, true, false, false)); |
| } |
| |
| protected: |
| std::shared_ptr<Buffer> file_buf; |
| }; |
| |
| TEST_F(TestGeometryLogicalType, TestWriteGeometry) { |
| TestWriteAndRead(GeometryLogicalType::Make("srid:1234"), /*write_arrow=*/false); |
| } |
| |
| TEST_F(TestGeometryLogicalType, TestWriteArrowAndReadGeometry) { |
| TestWriteAndRead(GeometryLogicalType::Make("srid:1234"), /*write_arrow=*/true); |
| } |
| |
| TEST_F(TestGeometryLogicalType, TestWriteGeography) { |
| TestWriteAndRead(GeographyLogicalType::Make("srid:1234"), /*write_arrow=*/false); |
| } |
| |
| TEST_F(TestGeometryLogicalType, TestWriteGeographyArrow) { |
| TestWriteAndRead(GeographyLogicalType::Make("srid:1234"), /*write_arrow=*/true); |
| } |
| |
| } // namespace parquet |