| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #ifdef _MSC_VER |
| #pragma warning(push) |
| // Disable forcing value to bool warnings |
| #pragma warning(disable : 4800) |
| #endif |
| |
| #include "gtest/gtest.h" |
| |
| #include <arrow/compute/api.h> |
| #include <cstdint> |
| #include <functional> |
| #include <sstream> |
| #include <vector> |
| |
| #include "parquet/api/reader.h" |
| #include "parquet/api/writer.h" |
| |
| #include "parquet/arrow/reader.h" |
| #include "parquet/arrow/schema.h" |
| #include "parquet/arrow/test-util.h" |
| #include "parquet/arrow/writer.h" |
| |
| #include "parquet/file_writer.h" |
| |
| #include "parquet/util/test-common.h" |
| |
| #include "arrow/api.h" |
| #include "arrow/test-util.h" |
| #include "arrow/type_traits.h" |
| #include "arrow/util/decimal.h" |
| |
| using arrow::Array; |
| using arrow::ArrayVisitor; |
| using arrow::Buffer; |
| using arrow::ChunkedArray; |
| using arrow::Column; |
| using arrow::DataType; |
| using arrow::default_memory_pool; |
| using arrow::ListArray; |
| using arrow::PrimitiveArray; |
| using arrow::ResizableBuffer; |
| using arrow::Status; |
| using arrow::Table; |
| using arrow::TimeUnit; |
| using arrow::compute::Datum; |
| using arrow::compute::DictionaryEncode; |
| using arrow::compute::FunctionContext; |
| using arrow::io::BufferReader; |
| |
| using arrow::test::randint; |
| using arrow::test::random_is_valid; |
| |
| using ArrowId = ::arrow::Type; |
| using ParquetType = parquet::Type; |
| using parquet::arrow::FromParquetSchema; |
| using parquet::schema::GroupNode; |
| using parquet::schema::NodePtr; |
| using parquet::schema::PrimitiveNode; |
| |
| using ColumnVector = std::vector<std::shared_ptr<arrow::Column>>; |
| |
| namespace parquet { |
| namespace arrow { |
| |
| static constexpr int SMALL_SIZE = 100; |
| static constexpr int LARGE_SIZE = 10000; |
| |
| static constexpr uint32_t kDefaultSeed = 0; |
| |
| LogicalType::type get_logical_type(const ::DataType& type) { |
| switch (type.id()) { |
| case ArrowId::UINT8: |
| return LogicalType::UINT_8; |
| case ArrowId::INT8: |
| return LogicalType::INT_8; |
| case ArrowId::UINT16: |
| return LogicalType::UINT_16; |
| case ArrowId::INT16: |
| return LogicalType::INT_16; |
| case ArrowId::UINT32: |
| return LogicalType::UINT_32; |
| case ArrowId::INT32: |
| return LogicalType::INT_32; |
| case ArrowId::UINT64: |
| return LogicalType::UINT_64; |
| case ArrowId::INT64: |
| return LogicalType::INT_64; |
| case ArrowId::STRING: |
| return LogicalType::UTF8; |
| case ArrowId::DATE32: |
| return LogicalType::DATE; |
| case ArrowId::DATE64: |
| return LogicalType::DATE; |
| case ArrowId::TIMESTAMP: { |
| const auto& ts_type = static_cast<const ::arrow::TimestampType&>(type); |
| switch (ts_type.unit()) { |
| case TimeUnit::MILLI: |
| return LogicalType::TIMESTAMP_MILLIS; |
| case TimeUnit::MICRO: |
| return LogicalType::TIMESTAMP_MICROS; |
| default: |
| DCHECK(false) << "Only MILLI and MICRO units supported for Arrow timestamps " |
| "with Parquet."; |
| } |
| } |
| case ArrowId::TIME32: |
| return LogicalType::TIME_MILLIS; |
| case ArrowId::TIME64: |
| return LogicalType::TIME_MICROS; |
| case ArrowId::DICTIONARY: { |
| const ::arrow::DictionaryType& dict_type = |
| static_cast<const ::arrow::DictionaryType&>(type); |
| return get_logical_type(*dict_type.dictionary()->type()); |
| } |
| case ArrowId::DECIMAL: |
| return LogicalType::DECIMAL; |
| default: |
| break; |
| } |
| return LogicalType::NONE; |
| } |
| |
| ParquetType::type get_physical_type(const ::DataType& type) { |
| switch (type.id()) { |
| case ArrowId::BOOL: |
| return ParquetType::BOOLEAN; |
| case ArrowId::UINT8: |
| case ArrowId::INT8: |
| case ArrowId::UINT16: |
| case ArrowId::INT16: |
| case ArrowId::UINT32: |
| case ArrowId::INT32: |
| return ParquetType::INT32; |
| case ArrowId::UINT64: |
| case ArrowId::INT64: |
| return ParquetType::INT64; |
| case ArrowId::FLOAT: |
| return ParquetType::FLOAT; |
| case ArrowId::DOUBLE: |
| return ParquetType::DOUBLE; |
| case ArrowId::BINARY: |
| return ParquetType::BYTE_ARRAY; |
| case ArrowId::STRING: |
| return ParquetType::BYTE_ARRAY; |
| case ArrowId::FIXED_SIZE_BINARY: |
| case ArrowId::DECIMAL: |
| return ParquetType::FIXED_LEN_BYTE_ARRAY; |
| case ArrowId::DATE32: |
| return ParquetType::INT32; |
| case ArrowId::DATE64: |
| // Convert to date32 internally |
| return ParquetType::INT32; |
| case ArrowId::TIME32: |
| return ParquetType::INT32; |
| case ArrowId::TIME64: |
| return ParquetType::INT64; |
| case ArrowId::TIMESTAMP: |
| return ParquetType::INT64; |
| case ArrowId::DICTIONARY: { |
| const ::arrow::DictionaryType& dict_type = |
| static_cast<const ::arrow::DictionaryType&>(type); |
| return get_physical_type(*dict_type.dictionary()->type()); |
| } |
| default: |
| break; |
| } |
| DCHECK(false) << "cannot reach this code"; |
| return ParquetType::INT32; |
| } |
| |
| template <typename TestType> |
| struct test_traits {}; |
| |
| template <> |
| struct test_traits<::arrow::BooleanType> { |
| static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN; |
| static uint8_t const value; |
| }; |
| |
| const uint8_t test_traits<::arrow::BooleanType>::value(1); |
| |
| template <> |
| struct test_traits<::arrow::UInt8Type> { |
| static constexpr ParquetType::type parquet_enum = ParquetType::INT32; |
| static uint8_t const value; |
| }; |
| |
| const uint8_t test_traits<::arrow::UInt8Type>::value(64); |
| |
| template <> |
| struct test_traits<::arrow::Int8Type> { |
| static constexpr ParquetType::type parquet_enum = ParquetType::INT32; |
| static int8_t const value; |
| }; |
| |
| const int8_t test_traits<::arrow::Int8Type>::value(-64); |
| |
| template <> |
| struct test_traits<::arrow::UInt16Type> { |
| static constexpr ParquetType::type parquet_enum = ParquetType::INT32; |
| static uint16_t const value; |
| }; |
| |
| const uint16_t test_traits<::arrow::UInt16Type>::value(1024); |
| |
| template <> |
| struct test_traits<::arrow::Int16Type> { |
| static constexpr ParquetType::type parquet_enum = ParquetType::INT32; |
| static int16_t const value; |
| }; |
| |
| const int16_t test_traits<::arrow::Int16Type>::value(-1024); |
| |
| template <> |
| struct test_traits<::arrow::UInt32Type> { |
| static constexpr ParquetType::type parquet_enum = ParquetType::INT32; |
| static uint32_t const value; |
| }; |
| |
| const uint32_t test_traits<::arrow::UInt32Type>::value(1024); |
| |
| template <> |
| struct test_traits<::arrow::Int32Type> { |
| static constexpr ParquetType::type parquet_enum = ParquetType::INT32; |
| static int32_t const value; |
| }; |
| |
| const int32_t test_traits<::arrow::Int32Type>::value(-1024); |
| |
| template <> |
| struct test_traits<::arrow::UInt64Type> { |
| static constexpr ParquetType::type parquet_enum = ParquetType::INT64; |
| static uint64_t const value; |
| }; |
| |
| const uint64_t test_traits<::arrow::UInt64Type>::value(1024); |
| |
| template <> |
| struct test_traits<::arrow::Int64Type> { |
| static constexpr ParquetType::type parquet_enum = ParquetType::INT64; |
| static int64_t const value; |
| }; |
| |
| const int64_t test_traits<::arrow::Int64Type>::value(-1024); |
| |
| template <> |
| struct test_traits<::arrow::TimestampType> { |
| static constexpr ParquetType::type parquet_enum = ParquetType::INT64; |
| static int64_t const value; |
| }; |
| |
| const int64_t test_traits<::arrow::TimestampType>::value(14695634030000); |
| |
| template <> |
| struct test_traits<::arrow::Date32Type> { |
| static constexpr ParquetType::type parquet_enum = ParquetType::INT32; |
| static int32_t const value; |
| }; |
| |
| const int32_t test_traits<::arrow::Date32Type>::value(170000); |
| |
| template <> |
| struct test_traits<::arrow::FloatType> { |
| static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT; |
| static float const value; |
| }; |
| |
| const float test_traits<::arrow::FloatType>::value(2.1f); |
| |
| template <> |
| struct test_traits<::arrow::DoubleType> { |
| static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE; |
| static double const value; |
| }; |
| |
| const double test_traits<::arrow::DoubleType>::value(4.2); |
| |
| template <> |
| struct test_traits<::arrow::StringType> { |
| static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; |
| static std::string const value; |
| }; |
| |
| template <> |
| struct test_traits<::arrow::BinaryType> { |
| static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; |
| static std::string const value; |
| }; |
| |
| template <> |
| struct test_traits<::arrow::FixedSizeBinaryType> { |
| static constexpr ParquetType::type parquet_enum = ParquetType::FIXED_LEN_BYTE_ARRAY; |
| static std::string const value; |
| }; |
| |
| const std::string test_traits<::arrow::StringType>::value("Test"); // NOLINT |
| const std::string test_traits<::arrow::BinaryType>::value("\x00\x01\x02\x03"); // NOLINT |
| const std::string test_traits<::arrow::FixedSizeBinaryType>::value("Fixed"); // NOLINT |
| |
| template <typename T> |
| using ParquetDataType = DataType<test_traits<T>::parquet_enum>; |
| |
| template <typename T> |
| using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>; |
| |
| void WriteTableToBuffer(const std::shared_ptr<Table>& table, int num_threads, |
| int64_t row_group_size, |
| const std::shared_ptr<ArrowWriterProperties>& arrow_properties, |
| std::shared_ptr<Buffer>* out) { |
| auto sink = std::make_shared<InMemoryOutputStream>(); |
| |
| ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink, |
| row_group_size, default_writer_properties(), |
| arrow_properties)); |
| *out = sink->GetBuffer(); |
| } |
| |
| namespace internal { |
| |
| void AssertArraysEqual(const Array& expected, const Array& actual) { |
| if (!actual.Equals(expected)) { |
| std::stringstream pp_result; |
| std::stringstream pp_expected; |
| |
| EXPECT_OK(::arrow::PrettyPrint(actual, 0, &pp_result)); |
| EXPECT_OK(::arrow::PrettyPrint(expected, 0, &pp_expected)); |
| FAIL() << "Got: \n" << pp_result.str() << "\nExpected: \n" << pp_expected.str(); |
| } |
| } |
| |
| } // namespace internal |
| |
| void AssertChunkedEqual(const ChunkedArray& expected, const ChunkedArray& actual) { |
| ASSERT_EQ(expected.num_chunks(), actual.num_chunks()) << "# chunks unequal"; |
| if (!actual.Equals(expected)) { |
| std::stringstream pp_result; |
| std::stringstream pp_expected; |
| |
| for (int i = 0; i < actual.num_chunks(); ++i) { |
| auto c1 = actual.chunk(i); |
| auto c2 = expected.chunk(i); |
| if (!c1->Equals(*c2)) { |
| EXPECT_OK(::arrow::PrettyPrint(*c1, 0, &pp_result)); |
| EXPECT_OK(::arrow::PrettyPrint(*c2, 0, &pp_expected)); |
| FAIL() << "Chunk " << i << " Got: " << pp_result.str() |
| << "\nExpected: " << pp_expected.str(); |
| } |
| } |
| } |
| } |
| |
| void PrintColumn(const Column& col, std::stringstream* ss) { |
| const ChunkedArray& carr = *col.data(); |
| for (int i = 0; i < carr.num_chunks(); ++i) { |
| auto c1 = carr.chunk(i); |
| *ss << "Chunk " << i << std::endl; |
| EXPECT_OK(::arrow::PrettyPrint(*c1, 0, ss)); |
| *ss << std::endl; |
| } |
| } |
| |
| void AssertTablesEqual(const Table& expected, const Table& actual, |
| bool same_chunk_layout = true) { |
| ASSERT_EQ(expected.num_columns(), actual.num_columns()); |
| |
| if (same_chunk_layout) { |
| for (int i = 0; i < actual.num_columns(); ++i) { |
| AssertChunkedEqual(*expected.column(i)->data(), *actual.column(i)->data()); |
| } |
| } else { |
| std::stringstream ss; |
| if (!actual.Equals(expected)) { |
| for (int i = 0; i < expected.num_columns(); ++i) { |
| ss << "Actual column " << i << std::endl; |
| PrintColumn(*actual.column(i), &ss); |
| |
| ss << "Expected column " << i << std::endl; |
| PrintColumn(*expected.column(i), &ss); |
| } |
| FAIL() << ss.str(); |
| } |
| } |
| } |
| |
| void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, int num_threads, |
| int64_t row_group_size, const std::vector<int>& column_subset, |
| std::shared_ptr<Table>* out, |
| const std::shared_ptr<ArrowWriterProperties>& arrow_properties = |
| default_arrow_writer_properties()) { |
| std::shared_ptr<Buffer> buffer; |
| ASSERT_NO_FATAL_FAILURE( |
| WriteTableToBuffer(table, num_threads, row_group_size, arrow_properties, &buffer)); |
| |
| std::unique_ptr<FileReader> reader; |
| ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), |
| ::arrow::default_memory_pool(), |
| ::parquet::default_reader_properties(), nullptr, &reader)); |
| |
| reader->set_num_threads(num_threads); |
| |
| if (column_subset.size() > 0) { |
| ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out)); |
| } else { |
| // Read everything |
| ASSERT_OK_NO_THROW(reader->ReadTable(out)); |
| } |
| } |
| |
| void CheckSimpleRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group_size, |
| const std::shared_ptr<ArrowWriterProperties>& arrow_properties = |
| default_arrow_writer_properties()) { |
| std::shared_ptr<Table> result; |
| DoSimpleRoundtrip(table, 1, row_group_size, {}, &result, arrow_properties); |
| ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*table, *result, false)); |
| } |
| |
| static std::shared_ptr<GroupNode> MakeSimpleSchema(const ::DataType& type, |
| Repetition::type repetition) { |
| int32_t byte_width = -1; |
| int32_t precision = -1; |
| int32_t scale = -1; |
| |
| switch (type.id()) { |
| case ::arrow::Type::DICTIONARY: { |
| const auto& dict_type = static_cast<const ::arrow::DictionaryType&>(type); |
| const ::DataType& values_type = *dict_type.dictionary()->type(); |
| switch (values_type.id()) { |
| case ::arrow::Type::FIXED_SIZE_BINARY: |
| byte_width = |
| static_cast<const ::arrow::FixedSizeBinaryType&>(values_type).byte_width(); |
| break; |
| case ::arrow::Type::DECIMAL: { |
| const auto& decimal_type = |
| static_cast<const ::arrow::Decimal128Type&>(values_type); |
| precision = decimal_type.precision(); |
| scale = decimal_type.scale(); |
| byte_width = DecimalSize(precision); |
| } break; |
| default: |
| break; |
| } |
| } break; |
| case ::arrow::Type::FIXED_SIZE_BINARY: |
| byte_width = static_cast<const ::arrow::FixedSizeBinaryType&>(type).byte_width(); |
| break; |
| case ::arrow::Type::DECIMAL: { |
| const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(type); |
| precision = decimal_type.precision(); |
| scale = decimal_type.scale(); |
| byte_width = DecimalSize(precision); |
| } break; |
| default: |
| break; |
| } |
| auto pnode = PrimitiveNode::Make("column1", repetition, get_physical_type(type), |
| get_logical_type(type), byte_width, precision, scale); |
| NodePtr node_ = |
| GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode})); |
| return std::static_pointer_cast<GroupNode>(node_); |
| } |
| |
| template <typename TestType> |
| class TestParquetIO : public ::testing::Test { |
| public: |
| virtual void SetUp() {} |
| |
| std::unique_ptr<ParquetFileWriter> MakeWriter( |
| const std::shared_ptr<GroupNode>& schema) { |
| sink_ = std::make_shared<InMemoryOutputStream>(); |
| return ParquetFileWriter::Open(sink_, schema); |
| } |
| |
| void ReaderFromSink(std::unique_ptr<FileReader>* out) { |
| std::shared_ptr<Buffer> buffer = sink_->GetBuffer(); |
| ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), |
| ::arrow::default_memory_pool(), |
| ::parquet::default_reader_properties(), nullptr, out)); |
| } |
| |
| void ReadSingleColumnFile(std::unique_ptr<FileReader> file_reader, |
| std::shared_ptr<Array>* out) { |
| std::unique_ptr<ColumnReader> column_reader; |
| ASSERT_OK_NO_THROW(file_reader->GetColumn(0, &column_reader)); |
| ASSERT_NE(nullptr, column_reader.get()); |
| |
| ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out)); |
| ASSERT_NE(nullptr, out->get()); |
| } |
| |
| void ReadAndCheckSingleColumnFile(const Array& values) { |
| std::shared_ptr<Array> out; |
| |
| std::unique_ptr<FileReader> reader; |
| ReaderFromSink(&reader); |
| ReadSingleColumnFile(std::move(reader), &out); |
| |
| internal::AssertArraysEqual(values, *out); |
| } |
| |
| void ReadTableFromFile(std::unique_ptr<FileReader> reader, |
| std::shared_ptr<Table>* out) { |
| ASSERT_OK_NO_THROW(reader->ReadTable(out)); |
| auto key_value_metadata = |
| reader->parquet_reader()->metadata()->key_value_metadata().get(); |
| ASSERT_EQ(nullptr, key_value_metadata); |
| ASSERT_NE(nullptr, out->get()); |
| } |
| |
| void PrepareListTable(int64_t size, bool nullable_lists, bool nullable_elements, |
| int64_t null_count, std::shared_ptr<Table>* out) { |
| std::shared_ptr<Array> values; |
| ASSERT_OK(NullableArray<TestType>(size * size, nullable_elements ? null_count : 0, |
| kDefaultSeed, &values)); |
| // Also test that slice offsets are respected |
| values = values->Slice(5, values->length() - 5); |
| std::shared_ptr<ListArray> lists; |
| ASSERT_OK(MakeListArray(values, size, nullable_lists ? null_count : 0, |
| nullable_elements, &lists)); |
| *out = MakeSimpleTable(lists->Slice(3, size - 6), nullable_lists); |
| } |
| |
| // Prepare table of empty lists, with null values array (ARROW-2744) |
| void PrepareEmptyListsTable(int64_t size, std::shared_ptr<Table>* out) { |
| std::shared_ptr<Array> lists; |
| ASSERT_OK(MakeEmptyListsArray(size, &lists)); |
| *out = MakeSimpleTable(lists, true /* nullable_lists */); |
| } |
| |
| void PrepareListOfListTable(int64_t size, bool nullable_parent_lists, |
| bool nullable_lists, bool nullable_elements, |
| int64_t null_count, std::shared_ptr<Table>* out) { |
| std::shared_ptr<Array> values; |
| ASSERT_OK(NullableArray<TestType>(size * 6, nullable_elements ? null_count : 0, |
| kDefaultSeed, &values)); |
| std::shared_ptr<ListArray> lists; |
| ASSERT_OK(MakeListArray(values, size * 3, nullable_lists ? null_count : 0, |
| nullable_elements, &lists)); |
| std::shared_ptr<ListArray> parent_lists; |
| ASSERT_OK(MakeListArray(lists, size, nullable_parent_lists ? null_count : 0, |
| nullable_lists, &parent_lists)); |
| *out = MakeSimpleTable(parent_lists, nullable_parent_lists); |
| } |
| |
| void ReadAndCheckSingleColumnTable(const std::shared_ptr<Array>& values) { |
| std::shared_ptr<::arrow::Table> out; |
| std::unique_ptr<FileReader> reader; |
| ReaderFromSink(&reader); |
| ReadTableFromFile(std::move(reader), &out); |
| ASSERT_EQ(1, out->num_columns()); |
| ASSERT_EQ(values->length(), out->num_rows()); |
| |
| std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); |
| ASSERT_EQ(1, chunked_array->num_chunks()); |
| auto result = chunked_array->chunk(0); |
| |
| internal::AssertArraysEqual(*values, *result); |
| } |
| |
| void CheckRoundTrip(const std::shared_ptr<Table>& table) { |
| CheckSimpleRoundtrip(table, table->num_rows()); |
| } |
| |
| template <typename ArrayType> |
| void WriteColumn(const std::shared_ptr<GroupNode>& schema, |
| const std::shared_ptr<ArrayType>& values) { |
| FileWriter writer(::arrow::default_memory_pool(), MakeWriter(schema)); |
| ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length())); |
| ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*values)); |
| ASSERT_OK_NO_THROW(writer.Close()); |
| // writer.Close() should be idempotent |
| ASSERT_OK_NO_THROW(writer.Close()); |
| } |
| |
| std::shared_ptr<InMemoryOutputStream> sink_; |
| }; |
| |
| // We have separate tests for UInt32Type as this is currently the only type |
| // where a roundtrip does not yield the identical Array structure. |
| // There we write an UInt32 Array but receive an Int64 Array as result for |
| // Parquet version 1.0. |
| |
| typedef ::testing::Types< |
| ::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, |
| ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type, |
| ::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType, ::arrow::StringType, |
| ::arrow::BinaryType, ::arrow::FixedSizeBinaryType, DecimalWithPrecisionAndScale<1>, |
| DecimalWithPrecisionAndScale<3>, DecimalWithPrecisionAndScale<5>, |
| DecimalWithPrecisionAndScale<7>, DecimalWithPrecisionAndScale<10>, |
| DecimalWithPrecisionAndScale<12>, DecimalWithPrecisionAndScale<15>, |
| DecimalWithPrecisionAndScale<17>, DecimalWithPrecisionAndScale<19>, |
| DecimalWithPrecisionAndScale<22>, DecimalWithPrecisionAndScale<23>, |
| DecimalWithPrecisionAndScale<24>, DecimalWithPrecisionAndScale<27>, |
| DecimalWithPrecisionAndScale<29>, DecimalWithPrecisionAndScale<32>, |
| DecimalWithPrecisionAndScale<34>, DecimalWithPrecisionAndScale<38>> |
| TestTypes; |
| |
| TYPED_TEST_CASE(TestParquetIO, TestTypes); |
| |
| TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) { |
| std::shared_ptr<Array> values; |
| ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values)); |
| |
| std::shared_ptr<GroupNode> schema = |
| MakeSimpleSchema(*values->type(), Repetition::REQUIRED); |
| ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, values)); |
| |
| ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values)); |
| } |
| |
| TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { |
| std::shared_ptr<Array> values; |
| ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values)); |
| std::shared_ptr<Table> table = MakeSimpleTable(values, false); |
| this->sink_ = std::make_shared<InMemoryOutputStream>(); |
| ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, |
| values->length(), default_writer_properties())); |
| |
| std::shared_ptr<Table> out; |
| std::unique_ptr<FileReader> reader; |
| ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader)); |
| ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out)); |
| ASSERT_EQ(1, out->num_columns()); |
| ASSERT_EQ(100, out->num_rows()); |
| |
| std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); |
| ASSERT_EQ(1, chunked_array->num_chunks()); |
| |
| internal::AssertArraysEqual(*values, *chunked_array->chunk(0)); |
| } |
| |
| TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { |
| // This also tests max_definition_level = 1 |
| std::shared_ptr<Array> values; |
| |
| ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values)); |
| |
| std::shared_ptr<GroupNode> schema = |
| MakeSimpleSchema(*values->type(), Repetition::OPTIONAL); |
| ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, values)); |
| |
| ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values)); |
| } |
| |
| TYPED_TEST(TestParquetIO, SingleColumnOptionalDictionaryWrite) { |
| // Skip tests for BOOL as we don't create dictionaries for it. |
| if (TypeParam::type_id == ::arrow::Type::BOOL) { |
| return; |
| } |
| |
| std::shared_ptr<Array> values; |
| |
| ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values)); |
| |
| Datum out; |
| FunctionContext ctx(default_memory_pool()); |
| ASSERT_OK(DictionaryEncode(&ctx, Datum(values), &out)); |
| std::shared_ptr<Array> dict_values = MakeArray(out.array()); |
| std::shared_ptr<GroupNode> schema = |
| MakeSimpleSchema(*dict_values->type(), Repetition::OPTIONAL); |
| ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, dict_values)); |
| |
| ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values)); |
| } |
| |
| TYPED_TEST(TestParquetIO, SingleColumnRequiredSliceWrite) { |
| std::shared_ptr<Array> values; |
| ASSERT_OK(NonNullArray<TypeParam>(2 * SMALL_SIZE, &values)); |
| std::shared_ptr<GroupNode> schema = |
| MakeSimpleSchema(*values->type(), Repetition::REQUIRED); |
| |
| std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE); |
| ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, sliced_values)); |
| ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*sliced_values)); |
| |
| // Slice offset 1 higher |
| sliced_values = values->Slice(SMALL_SIZE / 2 + 1, SMALL_SIZE); |
| ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, sliced_values)); |
| ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*sliced_values)); |
| } |
| |
| TYPED_TEST(TestParquetIO, SingleColumnOptionalSliceWrite) { |
| std::shared_ptr<Array> values; |
| ASSERT_OK(NullableArray<TypeParam>(2 * SMALL_SIZE, SMALL_SIZE, kDefaultSeed, &values)); |
| std::shared_ptr<GroupNode> schema = |
| MakeSimpleSchema(*values->type(), Repetition::OPTIONAL); |
| |
| std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE); |
| ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, sliced_values)); |
| ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*sliced_values)); |
| |
| // Slice offset 1 higher, thus different null bitmap. |
| sliced_values = values->Slice(SMALL_SIZE / 2 + 1, SMALL_SIZE); |
| ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, sliced_values)); |
| ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*sliced_values)); |
| } |
| |
| TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) { |
| // This also tests max_definition_level = 1 |
| std::shared_ptr<Array> values; |
| |
| ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values)); |
| std::shared_ptr<Table> table = MakeSimpleTable(values, true); |
| ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table)); |
| } |
| |
| TYPED_TEST(TestParquetIO, SingleEmptyListsColumnReadWrite) { |
| std::shared_ptr<Table> table; |
| ASSERT_NO_FATAL_FAILURE(this->PrepareEmptyListsTable(SMALL_SIZE, &table)); |
| ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table)); |
| } |
| |
| TYPED_TEST(TestParquetIO, SingleNullableListNullableColumnReadWrite) { |
| std::shared_ptr<Table> table; |
| ASSERT_NO_FATAL_FAILURE(this->PrepareListTable(SMALL_SIZE, true, true, 10, &table)); |
| ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table)); |
| } |
| |
| TYPED_TEST(TestParquetIO, SingleRequiredListNullableColumnReadWrite) { |
| std::shared_ptr<Table> table; |
| ASSERT_NO_FATAL_FAILURE(this->PrepareListTable(SMALL_SIZE, false, true, 10, &table)); |
| ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table)); |
| } |
| |
| TYPED_TEST(TestParquetIO, SingleNullableListRequiredColumnReadWrite) { |
| std::shared_ptr<Table> table; |
| ASSERT_NO_FATAL_FAILURE(this->PrepareListTable(SMALL_SIZE, true, false, 10, &table)); |
| ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table)); |
| } |
| |
| TYPED_TEST(TestParquetIO, SingleRequiredListRequiredColumnReadWrite) { |
| std::shared_ptr<Table> table; |
| ASSERT_NO_FATAL_FAILURE(this->PrepareListTable(SMALL_SIZE, false, false, 0, &table)); |
| ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table)); |
| } |
| |
| TYPED_TEST(TestParquetIO, SingleNullableListRequiredListRequiredColumnReadWrite) { |
| std::shared_ptr<Table> table; |
| ASSERT_NO_FATAL_FAILURE( |
| this->PrepareListOfListTable(SMALL_SIZE, true, false, false, 0, &table)); |
| ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table)); |
| } |
| |
| TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) { |
| std::shared_ptr<Array> values; |
| ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values)); |
| int64_t chunk_size = values->length() / 4; |
| |
| std::shared_ptr<GroupNode> schema = |
| MakeSimpleSchema(*values->type(), Repetition::REQUIRED); |
| FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); |
| for (int i = 0; i < 4; i++) { |
| ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); |
| std::shared_ptr<Array> sliced_array = values->Slice(i * chunk_size, chunk_size); |
| ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*sliced_array)); |
| } |
| ASSERT_OK_NO_THROW(writer.Close()); |
| |
| ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values)); |
| } |
| |
| TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) { |
| std::shared_ptr<Array> values; |
| ASSERT_OK(NonNullArray<TypeParam>(LARGE_SIZE, &values)); |
| std::shared_ptr<Table> table = MakeSimpleTable(values, false); |
| this->sink_ = std::make_shared<InMemoryOutputStream>(); |
| ASSERT_OK_NO_THROW(WriteTable(*table, default_memory_pool(), this->sink_, 512, |
| default_writer_properties())); |
| |
| ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values)); |
| } |
| |
| TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) { |
| std::shared_ptr<Array> values; |
| ASSERT_OK(NonNullArray<TypeParam>(LARGE_SIZE, &values)); |
| std::shared_ptr<Table> table = MakeSimpleTable(values, false); |
| this->sink_ = std::make_shared<InMemoryOutputStream>(); |
| auto buffer = AllocateBuffer(); |
| |
| { |
| // BufferOutputStream closed on gc |
| auto arrow_sink_ = std::make_shared<::arrow::io::BufferOutputStream>(buffer); |
| ASSERT_OK_NO_THROW(WriteTable(*table, default_memory_pool(), arrow_sink_, 512, |
| default_writer_properties())); |
| |
| // XXX: Remove this after ARROW-455 completed |
| ASSERT_OK(arrow_sink_->Close()); |
| } |
| |
| auto pbuffer = std::make_shared<Buffer>(buffer->data(), buffer->size()); |
| |
| auto source = std::make_shared<BufferReader>(pbuffer); |
| std::shared_ptr<::arrow::Table> out; |
| std::unique_ptr<FileReader> reader; |
| ASSERT_OK_NO_THROW(OpenFile(source, ::arrow::default_memory_pool(), &reader)); |
| ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out)); |
| ASSERT_EQ(1, out->num_columns()); |
| ASSERT_EQ(values->length(), out->num_rows()); |
| |
| std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); |
| ASSERT_EQ(1, chunked_array->num_chunks()); |
| |
| internal::AssertArraysEqual(*values, *chunked_array->chunk(0)); |
| } |
| |
| TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { |
| int64_t chunk_size = SMALL_SIZE / 4; |
| std::shared_ptr<Array> values; |
| |
| ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values)); |
| |
| std::shared_ptr<GroupNode> schema = |
| MakeSimpleSchema(*values->type(), Repetition::OPTIONAL); |
| FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema)); |
| for (int i = 0; i < 4; i++) { |
| ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); |
| std::shared_ptr<Array> sliced_array = values->Slice(i * chunk_size, chunk_size); |
| ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*sliced_array)); |
| } |
| ASSERT_OK_NO_THROW(writer.Close()); |
| |
| ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values)); |
| } |
| |
| TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) { |
| // This also tests max_definition_level = 1 |
| std::shared_ptr<Array> values; |
| |
| ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values)); |
| std::shared_ptr<Table> table = MakeSimpleTable(values, true); |
| this->sink_ = std::make_shared<InMemoryOutputStream>(); |
| ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, 512, |
| default_writer_properties())); |
| |
| ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values)); |
| } |
| |
| TYPED_TEST(TestParquetIO, FileMetaDataWrite) { |
| std::shared_ptr<Array> values; |
| ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values)); |
| std::shared_ptr<Table> table = MakeSimpleTable(values, false); |
| this->sink_ = std::make_shared<InMemoryOutputStream>(); |
| ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, |
| values->length(), default_writer_properties())); |
| |
| std::unique_ptr<FileReader> reader; |
| ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader)); |
| auto metadata = reader->parquet_reader()->metadata(); |
| ASSERT_EQ(1, metadata->num_columns()); |
| ASSERT_EQ(100, metadata->num_rows()); |
| |
| this->sink_ = std::make_shared<InMemoryOutputStream>(); |
| |
| ASSERT_OK_NO_THROW(::parquet::arrow::WriteFileMetaData(*metadata, this->sink_.get())); |
| |
| ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader)); |
| auto metadata_written = reader->parquet_reader()->metadata(); |
| ASSERT_EQ(metadata->size(), metadata_written->size()); |
| ASSERT_EQ(metadata->num_row_groups(), metadata_written->num_row_groups()); |
| ASSERT_EQ(metadata->num_rows(), metadata_written->num_rows()); |
| ASSERT_EQ(metadata->num_columns(), metadata_written->num_columns()); |
| ASSERT_EQ(metadata->RowGroup(0)->num_rows(), metadata_written->RowGroup(0)->num_rows()); |
| } |
| |
| using TestInt96ParquetIO = TestParquetIO<::arrow::TimestampType>; |
| |
| TEST_F(TestInt96ParquetIO, ReadIntoTimestamp) { |
| // This test explicitly tests the conversion from an Impala-style timestamp |
| // to a nanoseconds-since-epoch one. |
| |
| // 2nd January 1970, 11:35min 145738543ns |
| Int96 day; |
| day.value[2] = UINT32_C(2440589); |
| int64_t seconds = (11 * 60 + 35) * 60; |
| *(reinterpret_cast<int64_t*>(&(day.value))) = |
| seconds * INT64_C(1000) * INT64_C(1000) * INT64_C(1000) + 145738543; |
| // Compute the corresponding nanosecond timestamp |
| struct tm datetime; |
| memset(&datetime, 0, sizeof(struct tm)); |
| datetime.tm_year = 70; |
| datetime.tm_mon = 0; |
| datetime.tm_mday = 2; |
| datetime.tm_hour = 11; |
| datetime.tm_min = 35; |
| struct tm epoch; |
| memset(&epoch, 0, sizeof(struct tm)); |
| |
| epoch.tm_year = 70; |
| epoch.tm_mday = 1; |
| // Nanoseconds since the epoch |
| int64_t val = lrint(difftime(mktime(&datetime), mktime(&epoch))) * INT64_C(1000000000); |
| val += 145738543; |
| |
| std::vector<std::shared_ptr<schema::Node>> fields( |
| {schema::PrimitiveNode::Make("int96", Repetition::REQUIRED, ParquetType::INT96)}); |
| std::shared_ptr<schema::GroupNode> schema = std::static_pointer_cast<GroupNode>( |
| schema::GroupNode::Make("schema", Repetition::REQUIRED, fields)); |
| |
| // We cannot write this column with Arrow, so we have to use the plain parquet-cpp API |
| // to write an Int96 file. |
| this->sink_ = std::make_shared<InMemoryOutputStream>(); |
| auto writer = ParquetFileWriter::Open(this->sink_, schema); |
| RowGroupWriter* rg_writer = writer->AppendRowGroup(); |
| ColumnWriter* c_writer = rg_writer->NextColumn(); |
| auto typed_writer = dynamic_cast<TypedColumnWriter<Int96Type>*>(c_writer); |
| ASSERT_NE(typed_writer, nullptr); |
| typed_writer->WriteBatch(1, nullptr, nullptr, &day); |
| c_writer->Close(); |
| rg_writer->Close(); |
| writer->Close(); |
| |
| ::arrow::TimestampBuilder builder(::arrow::timestamp(TimeUnit::NANO), |
| ::arrow::default_memory_pool()); |
| ASSERT_OK(builder.Append(val)); |
| std::shared_ptr<Array> values; |
| ASSERT_OK(builder.Finish(&values)); |
| ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values)); |
| } |
| |
| using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>; |
| |
| TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) { |
| // This also tests max_definition_level = 1 |
| std::shared_ptr<Array> values; |
| |
| ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, kDefaultSeed, &values)); |
| std::shared_ptr<Table> table = MakeSimpleTable(values, true); |
| |
| // Parquet 2.0 roundtrip should yield an uint32_t column again |
| this->sink_ = std::make_shared<InMemoryOutputStream>(); |
| std::shared_ptr<::parquet::WriterProperties> properties = |
| ::parquet::WriterProperties::Builder() |
| .version(ParquetVersion::PARQUET_2_0) |
| ->build(); |
| ASSERT_OK_NO_THROW( |
| WriteTable(*table, default_memory_pool(), this->sink_, 512, properties)); |
| ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values)); |
| } |
| |
| TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) { |
| // This also tests max_definition_level = 1 |
| std::shared_ptr<Array> arr; |
| ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, kDefaultSeed, &arr)); |
| |
| std::shared_ptr<::arrow::UInt32Array> values = |
| std::dynamic_pointer_cast<::arrow::UInt32Array>(arr); |
| |
| std::shared_ptr<Table> table = MakeSimpleTable(values, true); |
| |
| // Parquet 1.0 returns an int64_t column as there is no way to tell a Parquet 1.0 |
| // reader that a column is unsigned. |
| this->sink_ = std::make_shared<InMemoryOutputStream>(); |
| std::shared_ptr<::parquet::WriterProperties> properties = |
| ::parquet::WriterProperties::Builder() |
| .version(ParquetVersion::PARQUET_1_0) |
| ->build(); |
| ASSERT_OK_NO_THROW( |
| WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, 512, properties)); |
| |
| std::shared_ptr<ResizableBuffer> int64_data = AllocateBuffer(); |
| { |
| ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length())); |
| auto int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data()); |
| auto uint32_data_ptr = reinterpret_cast<const uint32_t*>(values->values()->data()); |
| const auto cast_uint32_to_int64 = [](uint32_t value) { |
| return static_cast<int64_t>(value); |
| }; |
| std::transform(uint32_data_ptr, uint32_data_ptr + values->length(), int64_data_ptr, |
| cast_uint32_to_int64); |
| } |
| |
| std::vector<std::shared_ptr<Buffer>> buffers{values->null_bitmap(), int64_data}; |
| auto arr_data = std::make_shared<::arrow::ArrayData>(::arrow::int64(), values->length(), |
| buffers, values->null_count()); |
| std::shared_ptr<Array> expected_values = MakeArray(arr_data); |
| ASSERT_NE(expected_values, NULLPTR); |
| |
| const auto& expected = static_cast<const ::arrow::Int64Array&>(*expected_values); |
| ASSERT_GT(values->length(), 0); |
| ASSERT_EQ(values->length(), expected.length()); |
| |
| // TODO(phillipc): Is there a better way to compare these two arrays? |
| // AssertArraysEqual requires the same type, but we only care about values in this case |
| for (int i = 0; i < expected.length(); ++i) { |
| const bool value_is_valid = values->IsValid(i); |
| const bool expected_value_is_valid = expected.IsValid(i); |
| |
| ASSERT_EQ(expected_value_is_valid, value_is_valid); |
| |
| if (value_is_valid) { |
| uint32_t value = values->Value(i); |
| int64_t expected_value = expected.Value(i); |
| ASSERT_EQ(expected_value, static_cast<int64_t>(value)); |
| } |
| } |
| } |
| |
| using TestStringParquetIO = TestParquetIO<::arrow::StringType>; |
| |
| TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) { |
| std::shared_ptr<Array> values; |
| ::arrow::StringBuilder builder; |
| for (size_t i = 0; i < SMALL_SIZE; i++) { |
| ASSERT_OK(builder.Append("")); |
| } |
| ASSERT_OK(builder.Finish(&values)); |
| std::shared_ptr<Table> table = MakeSimpleTable(values, false); |
| this->sink_ = std::make_shared<InMemoryOutputStream>(); |
| ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, |
| values->length(), default_writer_properties())); |
| |
| std::shared_ptr<Table> out; |
| std::unique_ptr<FileReader> reader; |
| ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader)); |
| ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out)); |
| ASSERT_EQ(1, out->num_columns()); |
| ASSERT_EQ(100, out->num_rows()); |
| |
| std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); |
| ASSERT_EQ(1, chunked_array->num_chunks()); |
| |
| internal::AssertArraysEqual(*values, *chunked_array->chunk(0)); |
| } |
| |
| using TestNullParquetIO = TestParquetIO<::arrow::NullType>; |
| |
| TEST_F(TestNullParquetIO, NullColumn) { |
| for (int32_t num_rows : {0, SMALL_SIZE}) { |
| std::shared_ptr<Array> values = std::make_shared<::arrow::NullArray>(num_rows); |
| std::shared_ptr<Table> table = MakeSimpleTable(values, true /* nullable */); |
| this->sink_ = std::make_shared<InMemoryOutputStream>(); |
| |
| const int64_t chunk_size = std::max(static_cast<int64_t>(1), table->num_rows()); |
| ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, |
| chunk_size, default_writer_properties())); |
| |
| std::shared_ptr<Table> out; |
| std::unique_ptr<FileReader> reader; |
| ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader)); |
| ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out)); |
| ASSERT_EQ(1, out->num_columns()); |
| ASSERT_EQ(num_rows, out->num_rows()); |
| |
| std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); |
| ASSERT_EQ(1, chunked_array->num_chunks()); |
| internal::AssertArraysEqual(*values, *chunked_array->chunk(0)); |
| } |
| } |
| |
| TEST_F(TestNullParquetIO, NullListColumn) { |
| std::vector<int32_t> offsets1 = {0}; |
| std::vector<int32_t> offsets2 = {0, 2, 2, 3, 115}; |
| for (std::vector<int32_t> offsets : {offsets1, offsets2}) { |
| std::shared_ptr<Array> offsets_array, values_array, list_array; |
| ::arrow::ArrayFromVector<::arrow::Int32Type, int32_t>(offsets, &offsets_array); |
| values_array = std::make_shared<::arrow::NullArray>(offsets.back()); |
| ASSERT_OK(::arrow::ListArray::FromArrays(*offsets_array, *values_array, |
| default_memory_pool(), &list_array)); |
| |
| std::shared_ptr<Table> table = MakeSimpleTable(list_array, false /* nullable */); |
| this->sink_ = std::make_shared<InMemoryOutputStream>(); |
| |
| const int64_t chunk_size = std::max(static_cast<int64_t>(1), table->num_rows()); |
| ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, |
| chunk_size, default_writer_properties())); |
| |
| std::shared_ptr<Table> out; |
| std::unique_ptr<FileReader> reader; |
| this->ReaderFromSink(&reader); |
| this->ReadTableFromFile(std::move(reader), &out); |
| ASSERT_EQ(1, out->num_columns()); |
| ASSERT_EQ(offsets.size() - 1, out->num_rows()); |
| |
| std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); |
| ASSERT_EQ(1, chunked_array->num_chunks()); |
| internal::AssertArraysEqual(*list_array, *chunked_array->chunk(0)); |
| } |
| } |
| |
| TEST_F(TestNullParquetIO, NullDictionaryColumn) { |
| std::shared_ptr<Array> values = std::make_shared<::arrow::NullArray>(0); |
| std::shared_ptr<Array> indices = |
| std::make_shared<::arrow::Int8Array>(SMALL_SIZE, nullptr, nullptr, SMALL_SIZE); |
| std::shared_ptr<::arrow::DictionaryType> dict_type = |
| std::make_shared<::arrow::DictionaryType>(::arrow::int8(), values); |
| std::shared_ptr<Array> dict_values = |
| std::make_shared<::arrow::DictionaryArray>(dict_type, indices); |
| std::shared_ptr<Table> table = MakeSimpleTable(dict_values, true); |
| this->sink_ = std::make_shared<InMemoryOutputStream>(); |
| ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, |
| dict_values->length(), default_writer_properties())); |
| |
| std::shared_ptr<Table> out; |
| std::unique_ptr<FileReader> reader; |
| ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader)); |
| ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out)); |
| ASSERT_EQ(1, out->num_columns()); |
| ASSERT_EQ(100, out->num_rows()); |
| |
| std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); |
| ASSERT_EQ(1, chunked_array->num_chunks()); |
| |
| std::shared_ptr<Array> expected_values = |
| std::make_shared<::arrow::NullArray>(SMALL_SIZE); |
| internal::AssertArraysEqual(*expected_values, *chunked_array->chunk(0)); |
| } |
| |
| template <typename T> |
| using ParquetCDataType = typename ParquetDataType<T>::c_type; |
| |
| template <typename T> |
| struct c_type_trait { |
| using ArrowCType = typename T::c_type; |
| }; |
| |
| template <> |
| struct c_type_trait<::arrow::BooleanType> { |
| using ArrowCType = uint8_t; |
| }; |
| |
| template <typename TestType> |
| class TestPrimitiveParquetIO : public TestParquetIO<TestType> { |
| public: |
| typedef typename c_type_trait<TestType>::ArrowCType T; |
| |
| void MakeTestFile(std::vector<T>& values, int num_chunks, |
| std::unique_ptr<FileReader>* reader) { |
| TestType dummy; |
| |
| std::shared_ptr<GroupNode> schema = MakeSimpleSchema(dummy, Repetition::REQUIRED); |
| std::unique_ptr<ParquetFileWriter> file_writer = this->MakeWriter(schema); |
| size_t chunk_size = values.size() / num_chunks; |
| // Convert to Parquet's expected physical type |
| std::vector<uint8_t> values_buffer(sizeof(ParquetCDataType<TestType>) * |
| values.size()); |
| auto values_parquet = |
| reinterpret_cast<ParquetCDataType<TestType>*>(values_buffer.data()); |
| std::copy(values.cbegin(), values.cend(), values_parquet); |
| for (int i = 0; i < num_chunks; i++) { |
| auto row_group_writer = file_writer->AppendRowGroup(); |
| auto column_writer = |
| static_cast<ParquetWriter<TestType>*>(row_group_writer->NextColumn()); |
| ParquetCDataType<TestType>* data = values_parquet + i * chunk_size; |
| column_writer->WriteBatch(chunk_size, nullptr, nullptr, data); |
| column_writer->Close(); |
| row_group_writer->Close(); |
| } |
| file_writer->Close(); |
| this->ReaderFromSink(reader); |
| } |
| |
| void CheckSingleColumnRequiredTableRead(int num_chunks) { |
| std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value); |
| std::unique_ptr<FileReader> file_reader; |
| ASSERT_NO_FATAL_FAILURE(MakeTestFile(values, num_chunks, &file_reader)); |
| |
| std::shared_ptr<Table> out; |
| this->ReadTableFromFile(std::move(file_reader), &out); |
| ASSERT_EQ(1, out->num_columns()); |
| ASSERT_EQ(SMALL_SIZE, out->num_rows()); |
| |
| std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); |
| ASSERT_EQ(1, chunked_array->num_chunks()); |
| ExpectArrayT<TestType>(values.data(), chunked_array->chunk(0).get()); |
| } |
| |
| void CheckSingleColumnRequiredRead(int num_chunks) { |
| std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value); |
| std::unique_ptr<FileReader> file_reader; |
| ASSERT_NO_FATAL_FAILURE(MakeTestFile(values, num_chunks, &file_reader)); |
| |
| std::shared_ptr<Array> out; |
| this->ReadSingleColumnFile(std::move(file_reader), &out); |
| |
| ExpectArrayT<TestType>(values.data(), out.get()); |
| } |
| }; |
| |
| typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, |
| ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::UInt32Type, |
| ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type, |
| ::arrow::FloatType, ::arrow::DoubleType> |
| PrimitiveTestTypes; |
| |
| TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes); |
| |
| TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredRead) { |
| ASSERT_NO_FATAL_FAILURE(this->CheckSingleColumnRequiredRead(1)); |
| } |
| |
| TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredTableRead) { |
| ASSERT_NO_FATAL_FAILURE(this->CheckSingleColumnRequiredTableRead(1)); |
| } |
| |
| TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedRead) { |
| ASSERT_NO_FATAL_FAILURE(this->CheckSingleColumnRequiredRead(4)); |
| } |
| |
| TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) { |
| ASSERT_NO_FATAL_FAILURE(this->CheckSingleColumnRequiredTableRead(4)); |
| } |
| |
| void MakeDateTimeTypesTable(std::shared_ptr<Table>* out, bool nanos_as_micros = false) { |
| using ::arrow::ArrayFromVector; |
| |
| std::vector<bool> is_valid = {true, true, true, false, true, true}; |
| |
| // These are only types that roundtrip without modification |
| auto f0 = field("f0", ::arrow::date32()); |
| auto f1 = field("f1", ::arrow::timestamp(TimeUnit::MILLI)); |
| auto f2 = field("f2", ::arrow::timestamp(TimeUnit::MICRO)); |
| std::shared_ptr<::arrow::Field> f3; |
| if (nanos_as_micros) { |
| f3 = field("f3", ::arrow::timestamp(TimeUnit::MICRO)); |
| } else { |
| f3 = field("f3", ::arrow::timestamp(TimeUnit::NANO)); |
| } |
| auto f4 = field("f4", ::arrow::time32(TimeUnit::MILLI)); |
| auto f5 = field("f5", ::arrow::time64(TimeUnit::MICRO)); |
| std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1, f2, f3, f4, f5})); |
| |
| std::vector<int32_t> t32_values = {1489269000, 1489270000, 1489271000, |
| 1489272000, 1489272000, 1489273000}; |
| std::vector<int64_t> t64_values = {1489269000000, 1489270000000, 1489271000000, |
| 1489272000000, 1489272000000, 1489273000000}; |
| std::vector<int64_t> t64_us_values = {1489269000, 1489270000, 1489271000, |
| 1489272000, 1489272000, 1489273000}; |
| |
| std::shared_ptr<Array> a0, a1, a2, a3, a4, a5; |
| ArrayFromVector<::arrow::Date32Type, int32_t>(f0->type(), is_valid, t32_values, &a0); |
| ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_values, &a1); |
| ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_values, &a2); |
| if (nanos_as_micros) { |
| ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, t64_us_values, |
| &a3); |
| } else { |
| ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, t64_values, |
| &a3); |
| } |
| ArrayFromVector<::arrow::Time32Type, int32_t>(f4->type(), is_valid, t32_values, &a4); |
| ArrayFromVector<::arrow::Time64Type, int64_t>(f5->type(), is_valid, t64_values, &a5); |
| |
| std::vector<std::shared_ptr<::arrow::Column>> columns = { |
| std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1), |
| std::make_shared<Column>("f2", a2), std::make_shared<Column>("f3", a3), |
| std::make_shared<Column>("f4", a4), std::make_shared<Column>("f5", a5)}; |
| *out = Table::Make(schema, columns); |
| } |
| |
| TEST(TestArrowReadWrite, DateTimeTypes) { |
| std::shared_ptr<Table> table; |
| MakeDateTimeTypesTable(&table); |
| |
| // Use deprecated INT96 type |
| std::shared_ptr<Table> result; |
| ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip( |
| table, 1, table->num_rows(), {}, &result, |
| ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build())); |
| |
| ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*table, *result)); |
| |
| // Cast nanaoseconds to microseconds and use INT64 physical type |
| ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result)); |
| std::shared_ptr<Table> expected; |
| MakeDateTimeTypesTable(&table, true); |
| |
| ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*table, *result)); |
| } |
| |
| TEST(TestArrowReadWrite, CoerceTimestamps) { |
| using ::arrow::ArrayFromVector; |
| using ::arrow::field; |
| |
| // PARQUET-1078, coerce Arrow timestamps to either TIMESTAMP_MILLIS or TIMESTAMP_MICROS |
| std::vector<bool> is_valid = {true, true, true, false, true, true}; |
| |
| auto t_s = ::arrow::timestamp(TimeUnit::SECOND); |
| auto t_ms = ::arrow::timestamp(TimeUnit::MILLI); |
| auto t_us = ::arrow::timestamp(TimeUnit::MICRO); |
| auto t_ns = ::arrow::timestamp(TimeUnit::NANO); |
| |
| std::vector<int64_t> s_values = {1489269, 1489270, 1489271, 1489272, 1489272, 1489273}; |
| std::vector<int64_t> ms_values = {1489269000, 1489270000, 1489271000, |
| 1489272001, 1489272000, 1489273000}; |
| std::vector<int64_t> us_values = {1489269000000, 1489270000000, 1489271000000, |
| 1489272000001, 1489272000000, 1489273000000}; |
| std::vector<int64_t> ns_values = {1489269000000000LL, 1489270000000000LL, |
| 1489271000000000LL, 1489272000000001LL, |
| 1489272000000000LL, 1489273000000000LL}; |
| |
| std::shared_ptr<Array> a_s, a_ms, a_us, a_ns; |
| ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, &a_s); |
| ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, &a_ms); |
| ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, is_valid, us_values, &a_us); |
| ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, &a_ns); |
| |
| // Input table, all data as is |
| auto s1 = std::shared_ptr<::arrow::Schema>( |
| new ::arrow::Schema({field("f_s", t_s), field("f_ms", t_ms), field("f_us", t_us), |
| field("f_ns", t_ns)})); |
| auto input = Table::Make( |
| s1, |
| {std::make_shared<Column>("f_s", a_s), std::make_shared<Column>("f_ms", a_ms), |
| std::make_shared<Column>("f_us", a_us), std::make_shared<Column>("f_ns", a_ns)}); |
| |
| // Result when coercing to milliseconds |
| auto s2 = std::shared_ptr<::arrow::Schema>( |
| new ::arrow::Schema({field("f_s", t_ms), field("f_ms", t_ms), field("f_us", t_ms), |
| field("f_ns", t_ms)})); |
| auto ex_milli_result = Table::Make( |
| s2, |
| {std::make_shared<Column>("f_s", a_ms), std::make_shared<Column>("f_ms", a_ms), |
| std::make_shared<Column>("f_us", a_ms), std::make_shared<Column>("f_ns", a_ms)}); |
| |
| // Result when coercing to microseconds |
| auto s3 = std::shared_ptr<::arrow::Schema>( |
| new ::arrow::Schema({field("f_s", t_us), field("f_ms", t_us), field("f_us", t_us), |
| field("f_ns", t_us)})); |
| auto ex_micro_result = Table::Make( |
| s3, |
| {std::make_shared<Column>("f_s", a_us), std::make_shared<Column>("f_ms", a_us), |
| std::make_shared<Column>("f_us", a_us), std::make_shared<Column>("f_ns", a_us)}); |
| |
| std::shared_ptr<Table> milli_result; |
| ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip( |
| input, 1, input->num_rows(), {}, &milli_result, |
| ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build())); |
| |
| ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*ex_milli_result, *milli_result)); |
| |
| std::shared_ptr<Table> micro_result; |
| ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip( |
| input, 1, input->num_rows(), {}, µ_result, |
| ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build())); |
| ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*ex_micro_result, *micro_result)); |
| } |
| |
| TEST(TestArrowReadWrite, CoerceTimestampsLosePrecision) { |
| using ::arrow::ArrayFromVector; |
| using ::arrow::field; |
| |
| // PARQUET-1078, coerce Arrow timestamps to either TIMESTAMP_MILLIS or TIMESTAMP_MICROS |
| std::vector<bool> is_valid = {true, true, true, false, true, true}; |
| |
| auto t_s = ::arrow::timestamp(TimeUnit::SECOND); |
| auto t_ms = ::arrow::timestamp(TimeUnit::MILLI); |
| auto t_us = ::arrow::timestamp(TimeUnit::MICRO); |
| auto t_ns = ::arrow::timestamp(TimeUnit::NANO); |
| |
| std::vector<int64_t> s_values = {1489269, 1489270, 1489271, 1489272, 1489272, 1489273}; |
| std::vector<int64_t> ms_values = {1489269001, 1489270001, 1489271001, |
| 1489272001, 1489272001, 1489273001}; |
| std::vector<int64_t> us_values = {1489269000001, 1489270000001, 1489271000001, |
| 1489272000001, 1489272000001, 1489273000001}; |
| std::vector<int64_t> ns_values = {1489269000000001LL, 1489270000000001LL, |
| 1489271000000001LL, 1489272000000001LL, |
| 1489272000000001LL, 1489273000000001LL}; |
| |
| std::shared_ptr<Array> a_s, a_ms, a_us, a_ns; |
| ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, &a_s); |
| ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, &a_ms); |
| ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, is_valid, us_values, &a_us); |
| ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, &a_ns); |
| |
| auto s1 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_s", t_s)})); |
| auto s2 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_ms", t_ms)})); |
| auto s3 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_us", t_us)})); |
| auto s4 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_ns", t_ns)})); |
| |
| auto c1 = std::make_shared<Column>("f_s", a_s); |
| auto c2 = std::make_shared<Column>("f_ms", a_ms); |
| auto c3 = std::make_shared<Column>("f_us", a_us); |
| auto c4 = std::make_shared<Column>("f_ns", a_ns); |
| |
| auto t1 = Table::Make(s1, {c1}); |
| auto t2 = Table::Make(s2, {c2}); |
| auto t3 = Table::Make(s3, {c3}); |
| auto t4 = Table::Make(s4, {c4}); |
| |
| auto sink = std::make_shared<InMemoryOutputStream>(); |
| |
| // OK to write to millis |
| auto coerce_millis = |
| (ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build()); |
| ASSERT_OK_NO_THROW(WriteTable(*t1, ::arrow::default_memory_pool(), sink, 10, |
| default_writer_properties(), coerce_millis)); |
| ASSERT_OK_NO_THROW(WriteTable(*t2, ::arrow::default_memory_pool(), sink, 10, |
| default_writer_properties(), coerce_millis)); |
| |
| // Loss of precision |
| ASSERT_RAISES(Invalid, WriteTable(*t3, ::arrow::default_memory_pool(), sink, 10, |
| default_writer_properties(), coerce_millis)); |
| ASSERT_RAISES(Invalid, WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10, |
| default_writer_properties(), coerce_millis)); |
| |
| // OK to write micros to micros |
| auto coerce_micros = |
| (ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build()); |
| ASSERT_OK_NO_THROW(WriteTable(*t3, ::arrow::default_memory_pool(), sink, 10, |
| default_writer_properties(), coerce_micros)); |
| |
| // Loss of precision |
| ASSERT_RAISES(Invalid, WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10, |
| default_writer_properties(), coerce_micros)); |
| } |
| |
| TEST(TestArrowReadWrite, ConvertedDateTimeTypes) { |
| using ::arrow::ArrayFromVector; |
| |
| std::vector<bool> is_valid = {true, true, true, false, true, true}; |
| |
| auto f0 = field("f0", ::arrow::date64()); |
| auto f1 = field("f1", ::arrow::time32(TimeUnit::SECOND)); |
| auto f2 = field("f2", ::arrow::date64()); |
| auto f3 = field("f3", ::arrow::time32(TimeUnit::SECOND)); |
| |
| auto schema = ::arrow::schema({f0, f1, f2, f3}); |
| |
| std::vector<int64_t> a0_values = {1489190400000, 1489276800000, 1489363200000, |
| 1489449600000, 1489536000000, 1489622400000}; |
| std::vector<int32_t> a1_values = {0, 1, 2, 3, 4, 5}; |
| |
| std::shared_ptr<Array> a0, a1, a0_nonnull, a1_nonnull, x0, x1, x0_nonnull, x1_nonnull; |
| |
| ArrayFromVector<::arrow::Date64Type, int64_t>(f0->type(), is_valid, a0_values, &a0); |
| ArrayFromVector<::arrow::Date64Type, int64_t>(f0->type(), a0_values, &a0_nonnull); |
| |
| ArrayFromVector<::arrow::Time32Type, int32_t>(f1->type(), is_valid, a1_values, &a1); |
| ArrayFromVector<::arrow::Time32Type, int32_t>(f1->type(), a1_values, &a1_nonnull); |
| |
| std::vector<std::shared_ptr<::arrow::Column>> columns = { |
| std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1), |
| std::make_shared<Column>("f2", a0_nonnull), |
| std::make_shared<Column>("f3", a1_nonnull)}; |
| auto table = Table::Make(schema, columns); |
| |
| // Expected schema and values |
| auto e0 = field("f0", ::arrow::date32()); |
| auto e1 = field("f1", ::arrow::time32(TimeUnit::MILLI)); |
| auto e2 = field("f2", ::arrow::date32()); |
| auto e3 = field("f3", ::arrow::time32(TimeUnit::MILLI)); |
| auto ex_schema = ::arrow::schema({e0, e1, e2, e3}); |
| |
| std::vector<int32_t> x0_values = {17236, 17237, 17238, 17239, 17240, 17241}; |
| std::vector<int32_t> x1_values = {0, 1000, 2000, 3000, 4000, 5000}; |
| ArrayFromVector<::arrow::Date32Type, int32_t>(e0->type(), is_valid, x0_values, &x0); |
| ArrayFromVector<::arrow::Date32Type, int32_t>(e0->type(), x0_values, &x0_nonnull); |
| |
| ArrayFromVector<::arrow::Time32Type, int32_t>(e1->type(), is_valid, x1_values, &x1); |
| ArrayFromVector<::arrow::Time32Type, int32_t>(e1->type(), x1_values, &x1_nonnull); |
| |
| std::vector<std::shared_ptr<::arrow::Column>> ex_columns = { |
| std::make_shared<Column>("f0", x0), std::make_shared<Column>("f1", x1), |
| std::make_shared<Column>("f2", x0_nonnull), |
| std::make_shared<Column>("f3", x1_nonnull)}; |
| auto ex_table = Table::Make(ex_schema, ex_columns); |
| |
| std::shared_ptr<Table> result; |
| ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result)); |
| |
| ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*ex_table, *result)); |
| } |
| |
| // Regression for ARROW-2802 |
| TEST(TestArrowReadWrite, CoerceTimestampsAndSupportDeprecatedInt96) { |
| using ::arrow::Column; |
| using ::arrow::default_memory_pool; |
| using ::arrow::Field; |
| using ::arrow::Schema; |
| using ::arrow::Table; |
| using ::arrow::TimestampBuilder; |
| using ::arrow::TimestampType; |
| using ::arrow::TimeUnit; |
| |
| auto timestamp_type = std::make_shared<TimestampType>(TimeUnit::NANO); |
| |
| TimestampBuilder builder(timestamp_type, default_memory_pool()); |
| for (std::int64_t ii = 0; ii < 10; ++ii) { |
| ASSERT_OK(builder.Append(1000000000L * ii)); |
| } |
| std::shared_ptr<Array> values; |
| ASSERT_OK(builder.Finish(&values)); |
| |
| std::vector<std::shared_ptr<Field>> fields; |
| auto field = std::make_shared<Field>("nanos", timestamp_type); |
| fields.emplace_back(field); |
| |
| auto schema = std::make_shared<Schema>(fields); |
| |
| std::vector<std::shared_ptr<Column>> columns; |
| auto column = std::make_shared<Column>("nanos", values); |
| columns.emplace_back(column); |
| |
| auto table = Table::Make(schema, columns); |
| |
| auto arrow_writer_properties = ArrowWriterProperties::Builder() |
| .coerce_timestamps(TimeUnit::MICRO) |
| ->enable_deprecated_int96_timestamps() |
| ->build(); |
| |
| std::shared_ptr<Table> result; |
| DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result, arrow_writer_properties); |
| |
| ASSERT_EQ(table->num_columns(), result->num_columns()); |
| ASSERT_EQ(table->num_rows(), result->num_rows()); |
| |
| auto actual_column = result->column(0); |
| auto data = actual_column->data(); |
| auto expected_values = |
| static_cast<::arrow::NumericArray<TimestampType>*>(values.get())->raw_values(); |
| for (int ii = 0; ii < data->num_chunks(); ++ii) { |
| auto chunk = |
| static_cast<::arrow::NumericArray<TimestampType>*>(data->chunk(ii).get()); |
| auto values = chunk->raw_values(); |
| for (int64_t jj = 0; jj < chunk->length(); ++jj, ++expected_values) { |
| // Check that the nanos have been converted to micros |
| ASSERT_EQ(*expected_values / 1000, values[jj]); |
| } |
| } |
| } |
| |
| void MakeDoubleTable(int num_columns, int num_rows, int nchunks, |
| std::shared_ptr<Table>* out) { |
| std::shared_ptr<::arrow::Column> column; |
| std::vector<std::shared_ptr<::arrow::Column>> columns(num_columns); |
| std::vector<std::shared_ptr<::arrow::Field>> fields(num_columns); |
| |
| for (int i = 0; i < num_columns; ++i) { |
| std::vector<std::shared_ptr<Array>> arrays; |
| std::shared_ptr<Array> values; |
| ASSERT_OK(NullableArray<::arrow::DoubleType>(num_rows, num_rows / 10, |
| static_cast<uint32_t>(i), &values)); |
| std::stringstream ss; |
| ss << "col" << i; |
| |
| for (int j = 0; j < nchunks; ++j) { |
| arrays.push_back(values); |
| } |
| column = MakeColumn(ss.str(), arrays, true); |
| |
| columns[i] = column; |
| fields[i] = column->field(); |
| } |
| auto schema = std::make_shared<::arrow::Schema>(fields); |
| *out = Table::Make(schema, columns); |
| } |
| |
| void MakeListArray(int num_rows, std::shared_ptr<::DataType>* out_type, |
| std::shared_ptr<Array>* out_array) { |
| std::vector<int32_t> length_draws; |
| randint(num_rows, 0, 100, &length_draws); |
| |
| std::vector<int32_t> offset_values; |
| |
| // Make sure some of them are length 0 |
| int32_t total_elements = 0; |
| for (size_t i = 0; i < length_draws.size(); ++i) { |
| if (length_draws[i] < 10) { |
| length_draws[i] = 0; |
| } |
| offset_values.push_back(total_elements); |
| total_elements += length_draws[i]; |
| } |
| offset_values.push_back(total_elements); |
| |
| std::vector<int8_t> value_draws; |
| randint(total_elements, 0, 100, &value_draws); |
| |
| std::vector<bool> is_valid; |
| random_is_valid(total_elements, 0.1, &is_valid); |
| |
| std::shared_ptr<Array> values, offsets; |
| ::arrow::ArrayFromVector<::arrow::Int8Type, int8_t>(::arrow::int8(), is_valid, |
| value_draws, &values); |
| ::arrow::ArrayFromVector<::arrow::Int32Type, int32_t>(offset_values, &offsets); |
| |
| ASSERT_OK(::arrow::ListArray::FromArrays(*offsets, *values, default_memory_pool(), |
| out_array)); |
| |
| *out_type = ::arrow::list(::arrow::int8()); |
| } |
| |
| TEST(TestArrowReadWrite, MultithreadedRead) { |
| const int num_columns = 20; |
| const int num_rows = 1000; |
| const int num_threads = 4; |
| |
| std::shared_ptr<Table> table; |
| ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table)); |
| |
| std::shared_ptr<Table> result; |
| ASSERT_NO_FATAL_FAILURE( |
| DoSimpleRoundtrip(table, num_threads, table->num_rows(), {}, &result)); |
| |
| ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*table, *result)); |
| } |
| |
| TEST(TestArrowReadWrite, ReadSingleRowGroup) { |
| const int num_columns = 20; |
| const int num_rows = 1000; |
| |
| std::shared_ptr<Table> table; |
| ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table)); |
| |
| std::shared_ptr<Buffer> buffer; |
| ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, 1, num_rows / 2, |
| default_arrow_writer_properties(), &buffer)); |
| |
| std::unique_ptr<FileReader> reader; |
| ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), |
| ::arrow::default_memory_pool(), |
| ::parquet::default_reader_properties(), nullptr, &reader)); |
| |
| ASSERT_EQ(2, reader->num_row_groups()); |
| |
| std::shared_ptr<Table> r1, r2; |
| // Read everything |
| ASSERT_OK_NO_THROW(reader->ReadRowGroup(0, &r1)); |
| ASSERT_OK_NO_THROW(reader->RowGroup(1)->ReadTable(&r2)); |
| |
| std::shared_ptr<Table> concatenated; |
| ASSERT_OK(ConcatenateTables({r1, r2}, &concatenated)); |
| |
| ASSERT_TRUE(table->Equals(*concatenated)); |
| } |
| |
| TEST(TestArrowReadWrite, GetRecordBatchReader) { |
| const int num_columns = 20; |
| const int num_rows = 1000; |
| |
| std::shared_ptr<Table> table; |
| ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table)); |
| |
| std::shared_ptr<Buffer> buffer; |
| ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, 1, num_rows / 2, |
| default_arrow_writer_properties(), &buffer)); |
| |
| std::unique_ptr<FileReader> reader; |
| ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), |
| ::arrow::default_memory_pool(), |
| ::parquet::default_reader_properties(), nullptr, &reader)); |
| |
| std::shared_ptr<::arrow::RecordBatchReader> rb_reader; |
| ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({0, 1}, &rb_reader)); |
| |
| std::shared_ptr<::arrow::RecordBatch> batch; |
| |
| ASSERT_OK(rb_reader->ReadNext(&batch)); |
| ASSERT_EQ(500, batch->num_rows()); |
| ASSERT_EQ(20, batch->num_columns()); |
| |
| ASSERT_OK(rb_reader->ReadNext(&batch)); |
| ASSERT_EQ(500, batch->num_rows()); |
| ASSERT_EQ(20, batch->num_columns()); |
| |
| ASSERT_OK(rb_reader->ReadNext(&batch)); |
| ASSERT_EQ(nullptr, batch); |
| } |
| |
| TEST(TestArrowReadWrite, ScanContents) { |
| const int num_columns = 20; |
| const int num_rows = 1000; |
| |
| std::shared_ptr<Table> table; |
| ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table)); |
| |
| std::shared_ptr<Buffer> buffer; |
| ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, 1, num_rows / 2, |
| default_arrow_writer_properties(), &buffer)); |
| |
| std::unique_ptr<FileReader> reader; |
| ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), |
| ::arrow::default_memory_pool(), |
| ::parquet::default_reader_properties(), nullptr, &reader)); |
| |
| int64_t num_rows_returned = 0; |
| ASSERT_OK_NO_THROW(reader->ScanContents({}, 256, &num_rows_returned)); |
| ASSERT_EQ(num_rows, num_rows_returned); |
| |
| ASSERT_OK_NO_THROW(reader->ScanContents({0, 1, 2}, 256, &num_rows_returned)); |
| ASSERT_EQ(num_rows, num_rows_returned); |
| } |
| |
| TEST(TestArrowReadWrite, ReadColumnSubset) { |
| const int num_columns = 20; |
| const int num_rows = 1000; |
| const int num_threads = 4; |
| |
| std::shared_ptr<Table> table; |
| ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table)); |
| |
| std::shared_ptr<Table> result; |
| std::vector<int> column_subset = {0, 4, 8, 10}; |
| ASSERT_NO_FATAL_FAILURE( |
| DoSimpleRoundtrip(table, num_threads, table->num_rows(), column_subset, &result)); |
| |
| std::vector<std::shared_ptr<::arrow::Column>> ex_columns; |
| std::vector<std::shared_ptr<::arrow::Field>> ex_fields; |
| for (int i : column_subset) { |
| ex_columns.push_back(table->column(i)); |
| ex_fields.push_back(table->column(i)->field()); |
| } |
| |
| auto ex_schema = ::arrow::schema(ex_fields); |
| auto expected = Table::Make(ex_schema, ex_columns); |
| ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*expected, *result)); |
| } |
| |
| TEST(TestArrowReadWrite, ListLargeRecords) { |
| const int num_rows = 50; |
| |
| std::shared_ptr<Array> list_array; |
| std::shared_ptr<::DataType> list_type; |
| |
| MakeListArray(num_rows, &list_type, &list_array); |
| |
| auto schema = ::arrow::schema({::arrow::field("a", list_type)}); |
| std::shared_ptr<Table> table = Table::Make(schema, {list_array}); |
| |
| std::shared_ptr<Buffer> buffer; |
| ASSERT_NO_FATAL_FAILURE( |
| WriteTableToBuffer(table, 1, 100, default_arrow_writer_properties(), &buffer)); |
| |
| std::unique_ptr<FileReader> reader; |
| ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), |
| ::arrow::default_memory_pool(), |
| ::parquet::default_reader_properties(), nullptr, &reader)); |
| |
| // Read everything |
| std::shared_ptr<Table> result; |
| ASSERT_OK_NO_THROW(reader->ReadTable(&result)); |
| ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*table, *result)); |
| |
| ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), |
| ::arrow::default_memory_pool(), |
| ::parquet::default_reader_properties(), nullptr, &reader)); |
| |
| std::unique_ptr<ColumnReader> col_reader; |
| ASSERT_OK(reader->GetColumn(0, &col_reader)); |
| |
| auto expected = table->column(0)->data()->chunk(0); |
| |
| std::vector<std::shared_ptr<Array>> pieces; |
| for (int i = 0; i < num_rows; ++i) { |
| std::shared_ptr<Array> piece; |
| ASSERT_OK(col_reader->NextBatch(1, &piece)); |
| ASSERT_EQ(1, piece->length()); |
| pieces.push_back(piece); |
| } |
| auto chunked = std::make_shared<::arrow::ChunkedArray>(pieces); |
| |
| auto chunked_col = |
| std::make_shared<::arrow::Column>(table->schema()->field(0), chunked); |
| std::vector<std::shared_ptr<::arrow::Column>> columns = {chunked_col}; |
| auto chunked_table = Table::Make(table->schema(), columns); |
| |
| ASSERT_TRUE(table->Equals(*chunked_table)); |
| } |
| |
| typedef std::function<void(int, std::shared_ptr<::DataType>*, std::shared_ptr<Array>*)> |
| ArrayFactory; |
| |
| template <typename ArrowType> |
| struct GenerateArrayFunctor { |
| explicit GenerateArrayFunctor(double pct_null = 0.1) : pct_null(pct_null) {} |
| |
| void operator()(int length, std::shared_ptr<::DataType>* type, |
| std::shared_ptr<Array>* array) { |
| using T = typename ArrowType::c_type; |
| |
| // TODO(wesm): generate things other than integers |
| std::vector<T> draws; |
| randint(length, 0, 100, &draws); |
| |
| std::vector<bool> is_valid; |
| random_is_valid(length, this->pct_null, &is_valid); |
| |
| *type = ::arrow::TypeTraits<ArrowType>::type_singleton(); |
| ::arrow::ArrayFromVector<ArrowType, T>(*type, is_valid, draws, array); |
| } |
| |
| double pct_null; |
| }; |
| |
| typedef std::function<void(int, std::shared_ptr<::DataType>*, std::shared_ptr<Array>*)> |
| ArrayFactory; |
| |
| auto GenerateInt32 = [](int length, std::shared_ptr<::DataType>* type, |
| std::shared_ptr<Array>* array) { |
| GenerateArrayFunctor<::arrow::Int32Type> func; |
| func(length, type, array); |
| }; |
| |
| auto GenerateList = [](int length, std::shared_ptr<::DataType>* type, |
| std::shared_ptr<Array>* array) { |
| MakeListArray(length, type, array); |
| }; |
| |
| TEST(TestArrowReadWrite, TableWithChunkedColumns) { |
| std::vector<ArrayFactory> functions = {GenerateInt32, GenerateList}; |
| |
| std::vector<int> chunk_sizes = {2, 4, 10, 2}; |
| const int64_t total_length = 18; |
| |
| for (const auto& datagen_func : functions) { |
| ::arrow::ArrayVector arrays; |
| std::shared_ptr<Array> arr; |
| std::shared_ptr<::DataType> type; |
| datagen_func(total_length, &type, &arr); |
| |
| int64_t offset = 0; |
| for (int chunk_size : chunk_sizes) { |
| arrays.push_back(arr->Slice(offset, chunk_size)); |
| offset += chunk_size; |
| } |
| |
| auto field = ::arrow::field("fname", type); |
| auto schema = ::arrow::schema({field}); |
| auto col = std::make_shared<::arrow::Column>(field, arrays); |
| auto table = Table::Make(schema, {col}); |
| |
| ASSERT_NO_FATAL_FAILURE(CheckSimpleRoundtrip(table, 2)); |
| ASSERT_NO_FATAL_FAILURE(CheckSimpleRoundtrip(table, 3)); |
| ASSERT_NO_FATAL_FAILURE(CheckSimpleRoundtrip(table, 10)); |
| } |
| } |
| |
| TEST(TestArrowReadWrite, TableWithDuplicateColumns) { |
| // See ARROW-1974 |
| using ::arrow::ArrayFromVector; |
| |
| auto f0 = field("duplicate", ::arrow::int8()); |
| auto f1 = field("duplicate", ::arrow::int16()); |
| auto schema = ::arrow::schema({f0, f1}); |
| |
| std::vector<int8_t> a0_values = {1, 2, 3}; |
| std::vector<int16_t> a1_values = {14, 15, 16}; |
| |
| std::shared_ptr<Array> a0, a1; |
| |
| ArrayFromVector<::arrow::Int8Type, int8_t>(a0_values, &a0); |
| ArrayFromVector<::arrow::Int16Type, int16_t>(a1_values, &a1); |
| |
| auto table = Table::Make(schema, {std::make_shared<Column>(f0->name(), a0), |
| std::make_shared<Column>(f1->name(), a1)}); |
| ASSERT_NO_FATAL_FAILURE(CheckSimpleRoundtrip(table, table->num_rows())); |
| } |
| |
| TEST(TestArrowReadWrite, DictionaryColumnChunkedWrite) { |
| // This is a regression test for this: |
| // |
| // https://issues.apache.org/jira/browse/ARROW-1938 |
| // |
| // As of the writing of this test, columns of type |
| // dictionary are written as their raw/expanded values. |
| // The regression was that the whole column was being |
| // written for each chunk. |
| using ::arrow::ArrayFromVector; |
| |
| std::vector<std::string> values = {"first", "second", "third"}; |
| auto type = ::arrow::utf8(); |
| std::shared_ptr<Array> dict_values; |
| ArrayFromVector<::arrow::StringType, std::string>(values, &dict_values); |
| |
| auto dict_type = ::arrow::dictionary(::arrow::int32(), dict_values); |
| auto f0 = field("dictionary", dict_type); |
| std::vector<std::shared_ptr<::arrow::Field>> fields; |
| fields.emplace_back(f0); |
| auto schema = ::arrow::schema(fields); |
| |
| std::shared_ptr<Array> f0_values, f1_values; |
| ArrayFromVector<::arrow::Int32Type, int32_t>({0, 1, 0, 2, 1}, &f0_values); |
| ArrayFromVector<::arrow::Int32Type, int32_t>({2, 0, 1, 0, 2}, &f1_values); |
| ::arrow::ArrayVector dict_arrays = { |
| std::make_shared<::arrow::DictionaryArray>(dict_type, f0_values), |
| std::make_shared<::arrow::DictionaryArray>(dict_type, f1_values)}; |
| |
| std::vector<std::shared_ptr<::arrow::Column>> columns; |
| auto column = MakeColumn("column", dict_arrays, false); |
| columns.emplace_back(column); |
| |
| auto table = Table::Make(schema, columns); |
| |
| std::shared_ptr<Table> result; |
| DoSimpleRoundtrip(table, 1, |
| // Just need to make sure that we make |
| // a chunk size that is smaller than the |
| // total number of values |
| 2, {}, &result); |
| |
| std::vector<std::string> expected_values = {"first", "second", "first", "third", |
| "second", "third", "first", "second", |
| "first", "third"}; |
| columns.clear(); |
| |
| std::shared_ptr<Array> expected_array; |
| ArrayFromVector<::arrow::StringType, std::string>(expected_values, &expected_array); |
| |
| // The column name gets changed on output to the name of the |
| // field, and it also turns into a nullable column |
| columns.emplace_back(MakeColumn("dictionary", expected_array, true)); |
| |
| fields.clear(); |
| fields.emplace_back(::arrow::field("dictionary", ::arrow::utf8())); |
| schema = ::arrow::schema(fields); |
| |
| auto expected_table = Table::Make(schema, columns); |
| |
| AssertTablesEqual(*expected_table, *result, false); |
| } |
| |
| TEST(TestArrowWrite, CheckChunkSize) { |
| const int num_columns = 2; |
| const int num_rows = 128; |
| const int64_t chunk_size = 0; // note the chunk_size is 0 |
| std::shared_ptr<Table> table; |
| ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table)); |
| |
| auto sink = std::make_shared<InMemoryOutputStream>(); |
| |
| ASSERT_RAISES(Invalid, |
| WriteTable(*table, ::arrow::default_memory_pool(), sink, chunk_size)); |
| } |
| |
| class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> { |
| protected: |
| // make it *3 to make it easily divisible by 3 |
| const int NUM_SIMPLE_TEST_ROWS = SMALL_SIZE * 3; |
| std::shared_ptr<::arrow::Int32Array> values_array_ = nullptr; |
| |
| void InitReader() { |
| std::shared_ptr<Buffer> buffer = nested_parquet_->GetBuffer(); |
| ASSERT_OK_NO_THROW( |
| OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(), |
| ::parquet::default_reader_properties(), nullptr, &reader_)); |
| } |
| |
| void InitNewParquetFile(const std::shared_ptr<GroupNode>& schema, int num_rows) { |
| nested_parquet_ = std::make_shared<InMemoryOutputStream>(); |
| |
| writer_ = parquet::ParquetFileWriter::Open(nested_parquet_, schema, |
| default_writer_properties()); |
| row_group_writer_ = writer_->AppendRowGroup(); |
| } |
| |
| void FinalizeParquetFile() { |
| row_group_writer_->Close(); |
| writer_->Close(); |
| } |
| |
| void MakeValues(int num_rows) { |
| std::shared_ptr<Array> arr; |
| ASSERT_OK(NullableArray<::arrow::Int32Type>(num_rows, 0, kDefaultSeed, &arr)); |
| values_array_ = std::dynamic_pointer_cast<::arrow::Int32Array>(arr); |
| } |
| |
| void WriteColumnData(size_t num_rows, int16_t* def_levels, int16_t* rep_levels, |
| int32_t* values) { |
| auto typed_writer = |
| static_cast<TypedColumnWriter<Int32Type>*>(row_group_writer_->NextColumn()); |
| typed_writer->WriteBatch(num_rows, def_levels, rep_levels, values); |
| } |
| |
| void ValidateArray(const Array& array, size_t expected_nulls) { |
| ASSERT_EQ(array.length(), values_array_->length()); |
| ASSERT_EQ(array.null_count(), expected_nulls); |
| // Also independently count the nulls |
| auto local_null_count = 0; |
| for (int i = 0; i < array.length(); i++) { |
| if (array.IsNull(i)) { |
| local_null_count++; |
| } |
| } |
| ASSERT_EQ(local_null_count, expected_nulls); |
| } |
| |
| void ValidateColumnArray(const ::arrow::Int32Array& array, size_t expected_nulls) { |
| ValidateArray(array, expected_nulls); |
| int j = 0; |
| for (int i = 0; i < values_array_->length(); i++) { |
| if (array.IsNull(i)) { |
| continue; |
| } |
| ASSERT_EQ(array.Value(i), values_array_->Value(j)); |
| j++; |
| } |
| } |
| |
| void ValidateTableArrayTypes(const Table& table) { |
| for (int i = 0; i < table.num_columns(); i++) { |
| const std::shared_ptr<::arrow::Field> schema_field = table.schema()->field(i); |
| const std::shared_ptr<Column> column = table.column(i); |
| // Compare with the column field |
| ASSERT_TRUE(schema_field->Equals(column->field())); |
| // Compare with the array type |
| ASSERT_TRUE(schema_field->type()->Equals(column->data()->chunk(0)->type())); |
| } |
| } |
| |
| // A parquet with a simple nested schema |
| void CreateSimpleNestedParquet(Repetition::type struct_repetition) { |
| std::vector<NodePtr> parquet_fields; |
| // TODO(itaiin): We are using parquet low-level file api to create the nested parquet |
| // this needs to change when a nested writes are implemented |
| |
| // create the schema: |
| // <struct_repetition> group group1 { |
| // required int32 leaf1; |
| // optional int32 leaf2; |
| // } |
| // required int32 leaf3; |
| |
| parquet_fields.push_back(GroupNode::Make( |
| "group1", struct_repetition, |
| {PrimitiveNode::Make("leaf1", Repetition::REQUIRED, ParquetType::INT32), |
| PrimitiveNode::Make("leaf2", Repetition::OPTIONAL, ParquetType::INT32)})); |
| parquet_fields.push_back( |
| PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT32)); |
| |
| auto schema_node = GroupNode::Make("schema", Repetition::REQUIRED, parquet_fields); |
| |
| // Create definition levels for the different columns that contain interleaved |
| // nulls and values at all nesting levels |
| |
| // definition levels for optional fields |
| std::vector<int16_t> leaf1_def_levels(NUM_SIMPLE_TEST_ROWS); |
| std::vector<int16_t> leaf2_def_levels(NUM_SIMPLE_TEST_ROWS); |
| std::vector<int16_t> leaf3_def_levels(NUM_SIMPLE_TEST_ROWS); |
| for (int i = 0; i < NUM_SIMPLE_TEST_ROWS; i++) { |
| // leaf1 is required within the optional group1, so it is only null |
| // when the group is null |
| leaf1_def_levels[i] = (i % 3 == 0) ? 0 : 1; |
| // leaf2 is optional, can be null in the primitive (def-level 1) or |
| // struct level (def-level 0) |
| leaf2_def_levels[i] = static_cast<int16_t>(i % 3); |
| // leaf3 is required |
| leaf3_def_levels[i] = 0; |
| } |
| |
| std::vector<int16_t> rep_levels(NUM_SIMPLE_TEST_ROWS, 0); |
| |
| // Produce values for the columns |
| MakeValues(NUM_SIMPLE_TEST_ROWS); |
| int32_t* values = reinterpret_cast<int32_t*>(values_array_->values()->mutable_data()); |
| |
| // Create the actual parquet file |
| InitNewParquetFile(std::static_pointer_cast<GroupNode>(schema_node), |
| NUM_SIMPLE_TEST_ROWS); |
| |
| // leaf1 column |
| WriteColumnData(NUM_SIMPLE_TEST_ROWS, leaf1_def_levels.data(), rep_levels.data(), |
| values); |
| // leaf2 column |
| WriteColumnData(NUM_SIMPLE_TEST_ROWS, leaf2_def_levels.data(), rep_levels.data(), |
| values); |
| // leaf3 column |
| WriteColumnData(NUM_SIMPLE_TEST_ROWS, leaf3_def_levels.data(), rep_levels.data(), |
| values); |
| |
| FinalizeParquetFile(); |
| InitReader(); |
| } |
| |
| NodePtr CreateSingleTypedNestedGroup(int index, int depth, int num_children, |
| Repetition::type node_repetition, |
| ParquetType::type leaf_type) { |
| std::vector<NodePtr> children; |
| |
| for (int i = 0; i < num_children; i++) { |
| if (depth <= 1) { |
| children.push_back(PrimitiveNode::Make("leaf", node_repetition, leaf_type)); |
| } else { |
| children.push_back(CreateSingleTypedNestedGroup(i, depth - 1, num_children, |
| node_repetition, leaf_type)); |
| } |
| } |
| |
| std::stringstream ss; |
| ss << "group-" << depth << "-" << index; |
| return NodePtr(GroupNode::Make(ss.str(), node_repetition, children)); |
| } |
| |
| // A deeply nested schema |
| void CreateMultiLevelNestedParquet(int num_trees, int tree_depth, int num_children, |
| int num_rows, Repetition::type node_repetition) { |
| // Create the schema |
| std::vector<NodePtr> parquet_fields; |
| for (int i = 0; i < num_trees; i++) { |
| parquet_fields.push_back(CreateSingleTypedNestedGroup( |
| i, tree_depth, num_children, node_repetition, ParquetType::INT32)); |
| } |
| auto schema_node = GroupNode::Make("schema", Repetition::REQUIRED, parquet_fields); |
| |
| int num_columns = num_trees * static_cast<int>((std::pow(num_children, tree_depth))); |
| |
| std::vector<int16_t> def_levels; |
| std::vector<int16_t> rep_levels; |
| |
| int num_levels = 0; |
| while (num_levels < num_rows) { |
| if (node_repetition == Repetition::REQUIRED) { |
| def_levels.push_back(0); // all are required |
| } else { |
| int16_t level = static_cast<int16_t>(num_levels % (tree_depth + 2)); |
| def_levels.push_back(level); // all are optional |
| } |
| rep_levels.push_back(0); // none is repeated |
| ++num_levels; |
| } |
| |
| // Produce values for the columns |
| MakeValues(num_rows); |
| int32_t* values = reinterpret_cast<int32_t*>(values_array_->values()->mutable_data()); |
| |
| // Create the actual parquet file |
| InitNewParquetFile(std::static_pointer_cast<GroupNode>(schema_node), num_rows); |
| |
| for (int i = 0; i < num_columns; i++) { |
| WriteColumnData(num_rows, def_levels.data(), rep_levels.data(), values); |
| } |
| FinalizeParquetFile(); |
| InitReader(); |
| } |
| |
| class DeepParquetTestVisitor : public ArrayVisitor { |
| public: |
| DeepParquetTestVisitor(Repetition::type node_repetition, |
| std::shared_ptr<::arrow::Int32Array> expected) |
| : node_repetition_(node_repetition), expected_(expected) {} |
| |
| Status Validate(std::shared_ptr<Array> tree) { return tree->Accept(this); } |
| |
| virtual Status Visit(const ::arrow::Int32Array& array) { |
| if (node_repetition_ == Repetition::REQUIRED) { |
| if (!array.Equals(expected_)) { |
| return Status::Invalid("leaf array data mismatch"); |
| } |
| } else if (node_repetition_ == Repetition::OPTIONAL) { |
| if (array.length() != expected_->length()) { |
| return Status::Invalid("Bad leaf array length"); |
| } |
| // expect only 1 value every `depth` row |
| if (array.null_count() != SMALL_SIZE) { |
| return Status::Invalid("Unexpected null count"); |
| } |
| } else { |
| return Status::NotImplemented("Unsupported repetition"); |
| } |
| return Status::OK(); |
| } |
| |
| virtual Status Visit(const ::arrow::StructArray& array) { |
| for (int32_t i = 0; i < array.num_fields(); ++i) { |
| auto child = array.field(i); |
| if (node_repetition_ == Repetition::REQUIRED) { |
| RETURN_NOT_OK(child->Accept(this)); |
| } else if (node_repetition_ == Repetition::OPTIONAL) { |
| // Null count Must be a multiple of SMALL_SIZE |
| if (array.null_count() % SMALL_SIZE != 0) { |
| return Status::Invalid("Unexpected struct null count"); |
| } |
| } else { |
| return Status::NotImplemented("Unsupported repetition"); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| Repetition::type node_repetition_; |
| std::shared_ptr<::arrow::Int32Array> expected_; |
| }; |
| |
| std::shared_ptr<InMemoryOutputStream> nested_parquet_; |
| std::unique_ptr<FileReader> reader_; |
| std::unique_ptr<ParquetFileWriter> writer_; |
| RowGroupWriter* row_group_writer_; |
| }; |
| |
| TEST_F(TestNestedSchemaRead, ReadIntoTableFull) { |
| ASSERT_NO_FATAL_FAILURE(CreateSimpleNestedParquet(Repetition::OPTIONAL)); |
| |
| std::shared_ptr<Table> table; |
| ASSERT_OK_NO_THROW(reader_->ReadTable(&table)); |
| ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS); |
| ASSERT_EQ(table->num_columns(), 2); |
| ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 2); |
| ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table)); |
| |
| auto struct_field_array = |
| std::static_pointer_cast<::arrow::StructArray>(table->column(0)->data()->chunk(0)); |
| auto leaf1_array = |
| std::static_pointer_cast<::arrow::Int32Array>(struct_field_array->field(0)); |
| auto leaf2_array = |
| std::static_pointer_cast<::arrow::Int32Array>(struct_field_array->field(1)); |
| auto leaf3_array = |
| std::static_pointer_cast<::arrow::Int32Array>(table->column(1)->data()->chunk(0)); |
| |
| // validate struct and leaf arrays |
| |
| // validate struct array |
| ASSERT_NO_FATAL_FAILURE(ValidateArray(*struct_field_array, NUM_SIMPLE_TEST_ROWS / 3)); |
| // validate leaf1 |
| ASSERT_NO_FATAL_FAILURE(ValidateColumnArray(*leaf1_array, NUM_SIMPLE_TEST_ROWS / 3)); |
| // validate leaf2 |
| ASSERT_NO_FATAL_FAILURE( |
| ValidateColumnArray(*leaf2_array, NUM_SIMPLE_TEST_ROWS * 2 / 3)); |
| // validate leaf3 |
| ASSERT_NO_FATAL_FAILURE(ValidateColumnArray(*leaf3_array, 0)); |
| } |
| |
| TEST_F(TestNestedSchemaRead, ReadTablePartial) { |
| ASSERT_NO_FATAL_FAILURE(CreateSimpleNestedParquet(Repetition::OPTIONAL)); |
| std::shared_ptr<Table> table; |
| |
| // columns: {group1.leaf1, leaf3} |
| ASSERT_OK_NO_THROW(reader_->ReadTable({0, 2}, &table)); |
| ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS); |
| ASSERT_EQ(table->num_columns(), 2); |
| ASSERT_EQ(table->schema()->field(0)->name(), "group1"); |
| ASSERT_EQ(table->schema()->field(1)->name(), "leaf3"); |
| ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 1); |
| ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table)); |
| |
| // columns: {group1.leaf1, group1.leaf2} |
| ASSERT_OK_NO_THROW(reader_->ReadTable({0, 1}, &table)); |
| ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS); |
| ASSERT_EQ(table->num_columns(), 1); |
| ASSERT_EQ(table->schema()->field(0)->name(), "group1"); |
| ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 2); |
| ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table)); |
| |
| // columns: {leaf3} |
| ASSERT_OK_NO_THROW(reader_->ReadTable({2}, &table)); |
| ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS); |
| ASSERT_EQ(table->num_columns(), 1); |
| ASSERT_EQ(table->schema()->field(0)->name(), "leaf3"); |
| ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 0); |
| ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table)); |
| |
| // Test with different ordering |
| ASSERT_OK_NO_THROW(reader_->ReadTable({2, 0}, &table)); |
| ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS); |
| ASSERT_EQ(table->num_columns(), 2); |
| ASSERT_EQ(table->schema()->field(0)->name(), "leaf3"); |
| ASSERT_EQ(table->schema()->field(1)->name(), "group1"); |
| ASSERT_EQ(table->schema()->field(1)->type()->num_children(), 1); |
| ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table)); |
| } |
| |
| TEST_F(TestNestedSchemaRead, StructAndListTogetherUnsupported) { |
| ASSERT_NO_FATAL_FAILURE(CreateSimpleNestedParquet(Repetition::REPEATED)); |
| std::shared_ptr<Table> table; |
| ASSERT_RAISES(NotImplemented, reader_->ReadTable(&table)); |
| } |
| |
| TEST_P(TestNestedSchemaRead, DeepNestedSchemaRead) { |
| const int num_trees = 10; |
| const int depth = 5; |
| const int num_children = 3; |
| int num_rows = SMALL_SIZE * (depth + 2); |
| ASSERT_NO_FATAL_FAILURE(CreateMultiLevelNestedParquet(num_trees, depth, num_children, |
| num_rows, GetParam())); |
| std::shared_ptr<Table> table; |
| ASSERT_OK_NO_THROW(reader_->ReadTable(&table)); |
| ASSERT_EQ(table->num_columns(), num_trees); |
| ASSERT_EQ(table->num_rows(), num_rows); |
| |
| DeepParquetTestVisitor visitor(GetParam(), values_array_); |
| for (int i = 0; i < table->num_columns(); i++) { |
| auto tree = table->column(i)->data()->chunk(0); |
| ASSERT_OK_NO_THROW(visitor.Validate(tree)); |
| } |
| } |
| |
| INSTANTIATE_TEST_CASE_P(Repetition_type, TestNestedSchemaRead, |
| ::testing::Values(Repetition::REQUIRED, Repetition::OPTIONAL)); |
| |
| TEST(TestImpalaConversion, NanosecondToImpala) { |
| // June 20, 2017 16:32:56 and 123456789 nanoseconds |
| int64_t nanoseconds = INT64_C(1497976376123456789); |
| Int96 expected = {{UINT32_C(632093973), UINT32_C(13871), UINT32_C(2457925)}}; |
| Int96 calculated; |
| internal::NanosecondsToImpalaTimestamp(nanoseconds, &calculated); |
| ASSERT_EQ(expected, calculated); |
| } |
| |
| TEST(TestArrowReaderAdHoc, Int96BadMemoryAccess) { |
| // PARQUET-995 |
| std::string dir_string(test::get_data_dir()); |
| std::stringstream ss; |
| ss << dir_string << "/" |
| << "alltypes_plain.parquet"; |
| auto path = ss.str(); |
| |
| auto pool = ::arrow::default_memory_pool(); |
| |
| std::unique_ptr<FileReader> arrow_reader; |
| ASSERT_NO_THROW( |
| arrow_reader.reset(new FileReader(pool, ParquetFileReader::OpenFile(path, false)))); |
| std::shared_ptr<::arrow::Table> table; |
| ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table)); |
| } |
| |
| class TestArrowReaderAdHocSpark |
| : public ::testing::TestWithParam< |
| std::tuple<std::string, std::shared_ptr<::DataType>>> {}; |
| |
| TEST_P(TestArrowReaderAdHocSpark, ReadDecimals) { |
| std::string path(test::get_data_dir()); |
| |
| std::string filename; |
| std::shared_ptr<::DataType> decimal_type; |
| std::tie(filename, decimal_type) = GetParam(); |
| |
| path += "/" + filename; |
| ASSERT_GT(path.size(), 0); |
| |
| auto pool = ::arrow::default_memory_pool(); |
| |
| std::unique_ptr<FileReader> arrow_reader; |
| ASSERT_NO_THROW( |
| arrow_reader.reset(new FileReader(pool, ParquetFileReader::OpenFile(path, false)))); |
| std::shared_ptr<::arrow::Table> table; |
| ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table)); |
| |
| ASSERT_EQ(1, table->num_columns()); |
| |
| constexpr int32_t expected_length = 24; |
| |
| auto value_column = table->column(0); |
| ASSERT_EQ(expected_length, value_column->length()); |
| |
| auto raw_array = value_column->data(); |
| ASSERT_EQ(1, raw_array->num_chunks()); |
| |
| auto chunk = raw_array->chunk(0); |
| |
| std::shared_ptr<Array> expected_array; |
| |
| ::arrow::Decimal128Builder builder(decimal_type, pool); |
| |
| for (int32_t i = 0; i < expected_length; ++i) { |
| ::arrow::Decimal128 value((i + 1) * 100); |
| ASSERT_OK(builder.Append(value)); |
| } |
| ASSERT_OK(builder.Finish(&expected_array)); |
| |
| internal::AssertArraysEqual(*expected_array, *chunk); |
| } |
| |
| INSTANTIATE_TEST_CASE_P( |
| ReadDecimals, TestArrowReaderAdHocSpark, |
| ::testing::Values( |
| std::make_tuple("int32_decimal.parquet", ::arrow::decimal(4, 2)), |
| std::make_tuple("int64_decimal.parquet", ::arrow::decimal(10, 2)), |
| std::make_tuple("fixed_length_decimal.parquet", ::arrow::decimal(25, 2)), |
| std::make_tuple("fixed_length_decimal_legacy.parquet", ::arrow::decimal(13, 2)))); |
| |
| } // namespace arrow |
| |
| } // namespace parquet |