PARQUET-1041: Support Arrow's NullArray
Closes #358. This only includes an Arrow version bump to pick up ARROW-1143
Author: Uwe L. Korn <uwe@apache.org>
Author: Wes McKinney <wes.mckinney@twosigma.com>
Closes #360 from wesm/PARQUET-1041 and squashes the following commits:
3f8f0bc [Wes McKinney] Bump Arrow version to master
c134bd6 [Uwe L. Korn] Fix int conversion
1def8a4 [Uwe L. Korn] PARQUET-1041: Support Arrow's NullArray
diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake
index f958620..2b24e93 100644
--- a/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cmake_modules/ThirdpartyToolchain.cmake
@@ -520,7 +520,7 @@
endif()
if ("$ENV{PARQUET_ARROW_VERSION}" STREQUAL "")
- set(ARROW_VERSION "a8f8ba0cbcf5f596f042e90b7a208e7a0c3925b7")
+ set(ARROW_VERSION "e209e5865ea58e57925cae24d4bf3f63d58ee21d")
else()
set(ARROW_VERSION "$ENV{PARQUET_ARROW_VERSION}")
endif()
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 97bb19b..3beca35 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -802,6 +802,27 @@
ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
}
+using TestNullParquetIO = TestParquetIO<::arrow::NullType>;
+
+TEST_F(TestNullParquetIO, NullColumn) {
+ std::shared_ptr<Array> values = std::make_shared<::arrow::NullArray>(SMALL_SIZE);
+ std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+ ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
+ values->length(), default_writer_properties()));
+
+ std::shared_ptr<Table> out;
+ std::unique_ptr<FileReader> reader;
+ this->ReaderFromSink(&reader);
+ this->ReadTableFromFile(std::move(reader), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(100, out->num_rows());
+
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+}
+
template <typename T>
using ParquetCDataType = typename ParquetDataType<T>::c_type;
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 7c1b381..ef9ac34 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -495,10 +495,6 @@
std::shared_ptr<::arrow::Schema> schema;
RETURN_NOT_OK(GetSchema(indices, &schema));
- int num_fields = static_cast<int>(schema->num_fields());
- int nthreads = std::min<int>(num_threads_, num_fields);
- std::vector<std::shared_ptr<Column>> columns(num_fields);
-
// We only need to read schema fields which have columns indicated
// in the indices vector
std::vector<int> field_indices;
@@ -507,6 +503,7 @@
return Status::Invalid("Invalid column index");
}
+ std::vector<std::shared_ptr<Column>> columns(field_indices.size());
auto ReadColumnFunc = [&indices, &field_indices, &schema, &columns, this](int i) {
std::shared_ptr<Array> array;
RETURN_NOT_OK(ReadSchemaField(field_indices[i], indices, &array));
@@ -514,6 +511,8 @@
return Status::OK();
};
+ int num_fields = static_cast<int>(field_indices.size());
+ int nthreads = std::min<int>(num_threads_, num_fields);
if (nthreads == 1) {
for (int i = 0; i < num_fields; i++) {
RETURN_NOT_OK(ReadColumnFunc(i));
@@ -1262,6 +1261,10 @@
}
switch (field_->type()->id()) {
+ case ::arrow::Type::NA:
+ *out = std::make_shared<::arrow::NullArray>(batch_size);
+ return Status::OK();
+ break;
TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType)
TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type)
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index a78a23b..2a4ddcd 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -166,6 +166,11 @@
}
Status FromPrimitive(const PrimitiveNode* primitive, TypePtr* out) {
+ if (primitive->logical_type() == LogicalType::NA) {
+ *out = ::arrow::null();
+ return Status::OK();
+ }
+
switch (primitive->physical_type()) {
case ParquetType::BOOLEAN:
*out = ::arrow::boolean();
@@ -410,9 +415,10 @@
int length = -1;
switch (field->type()->id()) {
- // TODO:
- // case ArrowType::NA:
- // break;
+ case ArrowType::NA:
+ type = ParquetType::INT32;
+ logical_type = LogicalType::NA;
+ break;
case ArrowType::BOOL:
type = ParquetType::BOOLEAN;
break;
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index 3344d1b..af4f754 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -62,6 +62,15 @@
Status VisitInline(const Array& array);
+ Status Visit(const ::arrow::NullArray& array) {
+ array_offsets_.push_back(static_cast<int32_t>(array.offset()));
+ valid_bitmaps_.push_back(array.null_bitmap_data());
+ null_counts_.push_back(array.length());
+ values_type_ = array.type_id();
+ values_array_ = &array;
+ return Status::OK();
+ }
+
Status Visit(const ::arrow::PrimitiveArray& array) {
array_offsets_.push_back(static_cast<int32_t>(array.offset()));
valid_bitmaps_.push_back(array.null_bitmap_data());
@@ -98,7 +107,6 @@
"Level generation for ArrowTypePrefix not supported yet"); \
}
- NOT_IMPLEMENTED_VISIT(Null)
NOT_IMPLEMENTED_VISIT(Struct)
NOT_IMPLEMENTED_VISIT(Union)
NOT_IMPLEMENTED_VISIT(Decimal)
@@ -141,6 +149,8 @@
reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
if (array.null_count() == 0) {
std::fill(def_levels_ptr, def_levels_ptr + array.length(), 1);
+ } else if (array.null_count() == array.length()) {
+ std::fill(def_levels_ptr, def_levels_ptr + array.length(), 0);
} else {
const uint8_t* valid_bits = array.null_bitmap_data();
INIT_BITSET(valid_bits, static_cast<int>(array.offset()));
@@ -510,6 +520,18 @@
}
template <>
+Status FileWriter::Impl::TypedWriteBatch<Int32Type, ::arrow::NullType>(
+ ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels) {
+ auto writer = reinterpret_cast<TypedColumnWriter<Int32Type>*>(column_writer);
+
+ PARQUET_CATCH_NOT_OK(
+ writer->WriteBatch(num_levels, def_levels, rep_levels, nullptr));
+ PARQUET_CATCH_NOT_OK(writer->Close());
+ return Status::OK();
+}
+
+template <>
Status FileWriter::Impl::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>(
ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
const int16_t* def_levels, const int16_t* rep_levels) {
@@ -639,6 +661,7 @@
column_writer, values_array, num_levels, def_levels, rep_levels);
}
}
+ WRITE_BATCH_CASE(NA, NullType, Int32Type)
WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType)
WRITE_BATCH_CASE(INT8, Int8Type, Int32Type)
WRITE_BATCH_CASE(UINT8, UInt8Type, Int32Type)
diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc
index aea7a74..b37ef4f 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/file/metadata.cc
@@ -76,6 +76,7 @@
case LogicalType::BSON:
case LogicalType::JSON:
return SortOrder::UNSIGNED;
+ case LogicalType::NA:
case LogicalType::DECIMAL:
case LogicalType::LIST:
case LogicalType::MAP:
diff --git a/src/parquet/schema.cc b/src/parquet/schema.cc
index 1209ad1..4efa0b2 100644
--- a/src/parquet/schema.cc
+++ b/src/parquet/schema.cc
@@ -190,6 +190,9 @@
throw ParquetException(ss.str());
}
break;
+ case LogicalType::NA:
+ // NA can annotate any type
+ break;
default:
ss << LogicalTypeToString(logical_type);
ss << " can not be applied to a primitive type";
diff --git a/src/parquet/types.h b/src/parquet/types.h
index 2b9b11f..8504f5d 100644
--- a/src/parquet/types.h
+++ b/src/parquet/types.h
@@ -81,7 +81,8 @@
INT_64,
JSON,
BSON,
- INTERVAL
+ INTERVAL,
+ NA = 25
};
};