GH-43944: [C++][Parquet] Add support for arrow::ArrayStatistics: non zero-copy int based types (#43945)
### Rationale for this change
Statistics is useful for fast processing.
Target types:
* `UInt8`
* `Int8`
* `UInt16`
* `Int16`
* `UInt32`
* `UInt64`
* `Date32`
* `Time32`
* `Time64`
* `Duration`
### What changes are included in this PR?
Map `ColumnChunkMetaData` information to `arrow::ArrayStatistics`.
### Are these changes tested?
Yes.
### Are there any user-facing changes?
Yes.
* GitHub Issue: #43944
Authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
diff --git a/cpp/src/parquet/arrow/arrow_statistics_test.cc b/cpp/src/parquet/arrow/arrow_statistics_test.cc
index a19303c..2638358 100644
--- a/cpp/src/parquet/arrow/arrow_statistics_test.cc
+++ b/cpp/src/parquet/arrow/arrow_statistics_test.cc
@@ -17,12 +17,14 @@
#include "gtest/gtest.h"
+#include "arrow/array.h"
#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"
#include "parquet/api/reader.h"
#include "parquet/api/writer.h"
+#include "parquet/arrow/reader.h"
#include "parquet/arrow/schema.h"
#include "parquet/arrow/writer.h"
#include "parquet/file_writer.h"
@@ -179,4 +181,107 @@
ASSERT_FALSE(stats->HasMinMax());
}
+namespace {
+::arrow::Result<std::shared_ptr<::arrow::Array>> StatisticsReadArray(
+ std::shared_ptr<::arrow::DataType> data_type, const std::string& json) {
+ auto schema = ::arrow::schema({::arrow::field("column", data_type)});
+ auto array = ::arrow::ArrayFromJSON(data_type, json);
+ auto record_batch = ::arrow::RecordBatch::Make(schema, array->length(), {array});
+ ARROW_ASSIGN_OR_RAISE(auto sink, ::arrow::io::BufferOutputStream::Create());
+ const auto arrow_writer_properties =
+ parquet::ArrowWriterProperties::Builder().store_schema()->build();
+ ARROW_ASSIGN_OR_RAISE(
+ auto writer,
+ FileWriter::Open(*schema, ::arrow::default_memory_pool(), sink,
+ default_writer_properties(), arrow_writer_properties));
+ ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*record_batch));
+ ARROW_RETURN_NOT_OK(writer->Close());
+ ARROW_ASSIGN_OR_RAISE(auto buffer, sink->Finish());
+
+ auto reader =
+ ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
+ std::unique_ptr<FileReader> file_reader;
+ ARROW_RETURN_NOT_OK(
+ FileReader::Make(::arrow::default_memory_pool(), std::move(reader), &file_reader));
+ std::shared_ptr<::arrow::ChunkedArray> chunked_array;
+ ARROW_RETURN_NOT_OK(file_reader->ReadColumn(0, &chunked_array));
+ return chunked_array->chunk(0);
+}
+
+template <typename ArrowType, typename MinMaxType>
+void TestStatisticsReadArray(std::shared_ptr<::arrow::DataType> arrow_type) {
+ using ArrowArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
+ using ArrowCType = typename ArrowType::c_type;
+ constexpr auto min = std::numeric_limits<ArrowCType>::min();
+ constexpr auto max = std::numeric_limits<ArrowCType>::max();
+
+ std::string json;
+ json += "[";
+ json += std::to_string(max);
+ json += ", null, ";
+ json += std::to_string(min);
+ json += ", ";
+ json += std::to_string(max);
+ json += "]";
+ ASSERT_OK_AND_ASSIGN(auto array, StatisticsReadArray(arrow_type, json));
+ auto typed_array = std::static_pointer_cast<ArrowArrayType>(array);
+ auto statistics = typed_array->statistics();
+ ASSERT_NE(nullptr, statistics);
+ ASSERT_EQ(true, statistics->null_count.has_value());
+ ASSERT_EQ(1, statistics->null_count.value());
+ ASSERT_EQ(false, statistics->distinct_count.has_value());
+ ASSERT_EQ(true, statistics->min.has_value());
+ ASSERT_EQ(true, std::holds_alternative<MinMaxType>(*statistics->min));
+ ASSERT_EQ(min, std::get<MinMaxType>(*statistics->min));
+ ASSERT_EQ(true, statistics->is_min_exact);
+ ASSERT_EQ(true, statistics->max.has_value());
+ ASSERT_EQ(true, std::holds_alternative<MinMaxType>(*statistics->max));
+ ASSERT_EQ(max, std::get<MinMaxType>(*statistics->max));
+ ASSERT_EQ(true, statistics->is_min_exact);
+}
+} // namespace
+
+TEST(TestStatisticsRead, Int8) {
+ TestStatisticsReadArray<::arrow::Int8Type, int64_t>(::arrow::int8());
+}
+
+TEST(TestStatisticsRead, UInt8) {
+ TestStatisticsReadArray<::arrow::UInt8Type, uint64_t>(::arrow::uint8());
+}
+
+TEST(TestStatisticsRead, Int16) {
+ TestStatisticsReadArray<::arrow::Int16Type, int64_t>(::arrow::int16());
+}
+
+TEST(TestStatisticsRead, UInt16) {
+ TestStatisticsReadArray<::arrow::UInt16Type, uint64_t>(::arrow::uint16());
+}
+
+TEST(TestStatisticsRead, UInt32) {
+ TestStatisticsReadArray<::arrow::UInt32Type, uint64_t>(::arrow::uint32());
+}
+
+TEST(TestStatisticsRead, UInt64) {
+ TestStatisticsReadArray<::arrow::UInt64Type, uint64_t>(::arrow::uint64());
+}
+
+TEST(TestStatisticsRead, Date32) {
+ TestStatisticsReadArray<::arrow::Date32Type, int64_t>(::arrow::date32());
+}
+
+TEST(TestStatisticsRead, Time32) {
+ TestStatisticsReadArray<::arrow::Time32Type, int64_t>(
+ ::arrow::time32(::arrow::TimeUnit::MILLI));
+}
+
+TEST(TestStatisticsRead, Time64) {
+ TestStatisticsReadArray<::arrow::Time64Type, int64_t>(
+ ::arrow::time64(::arrow::TimeUnit::MICRO));
+}
+
+TEST(TestStatisticsRead, Duration) {
+ TestStatisticsReadArray<::arrow::DurationType, int64_t>(
+ ::arrow::duration(::arrow::TimeUnit::NANO));
+}
+
} // namespace parquet::arrow
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 285e2a5..4f57c3f 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -485,8 +485,9 @@
NextRowGroup();
}
}
- RETURN_NOT_OK(
- TransferColumnData(record_reader_.get(), field_, descr_, ctx_->pool, &out_));
+ RETURN_NOT_OK(TransferColumnData(record_reader_.get(),
+ input_->column_chunk_metadata(), field_, descr_,
+ ctx_.get(), &out_));
return Status::OK();
END_PARQUET_CATCH_EXCEPTIONS
}
diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc
index e5aef5a..e6c2d95 100644
--- a/cpp/src/parquet/arrow/reader_internal.cc
+++ b/cpp/src/parquet/arrow/reader_internal.cc
@@ -319,26 +319,59 @@
}
template <typename ArrowType, typename ParquetType>
-Status TransferInt(RecordReader* reader, MemoryPool* pool,
- const std::shared_ptr<Field>& field, Datum* out) {
+Status TransferInt(RecordReader* reader,
+ std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
+ const ReaderContext* ctx, const std::shared_ptr<Field>& field,
+ Datum* out) {
using ArrowCType = typename ArrowType::c_type;
using ParquetCType = typename ParquetType::c_type;
int64_t length = reader->values_written();
ARROW_ASSIGN_OR_RAISE(auto data,
- ::arrow::AllocateBuffer(length * sizeof(ArrowCType), pool));
+ ::arrow::AllocateBuffer(length * sizeof(ArrowCType), ctx->pool));
auto values = reinterpret_cast<const ParquetCType*>(reader->values());
auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
std::copy(values, values + length, out_ptr);
+ int64_t null_count = 0;
+ std::vector<std::shared_ptr<Buffer>> buffers = {nullptr, std::move(data)};
if (field->nullable()) {
- *out = std::make_shared<ArrayType<ArrowType>>(field->type(), length, std::move(data),
- reader->ReleaseIsValid(),
- reader->null_count());
- } else {
- *out =
- std::make_shared<ArrayType<ArrowType>>(field->type(), length, std::move(data),
- /*null_bitmap=*/nullptr, /*null_count=*/0);
+ null_count = reader->null_count();
+ buffers[0] = reader->ReleaseIsValid();
}
+ auto array_data =
+ ::arrow::ArrayData::Make(field->type(), length, std::move(buffers), null_count);
+ auto array_statistics = std::make_shared<::arrow::ArrayStatistics>();
+ array_statistics->null_count = null_count;
+ auto statistics = metadata->statistics().get();
+ if (statistics) {
+ if (statistics->HasDistinctCount()) {
+ array_statistics->distinct_count = statistics->distinct_count();
+ }
+ if (statistics->HasMinMax()) {
+ auto typed_statistics =
+ static_cast<::parquet::TypedStatistics<ParquetType>*>(statistics);
+ const ArrowCType min = typed_statistics->min();
+ const ArrowCType max = typed_statistics->max();
+ if (std::is_signed<ArrowCType>::value) {
+ array_statistics->min = static_cast<int64_t>(min);
+ array_statistics->max = static_cast<int64_t>(max);
+ } else {
+ array_statistics->min = static_cast<uint64_t>(min);
+ array_statistics->max = static_cast<uint64_t>(max);
+ }
+ // We can assume that integer based min/max are always exact if
+ // they exist. Apache Parquet's "Statistics" has
+ // "is_min_value_exact" and "is_max_value_exact" but we can
+ // ignore them for integer based min/max.
+ //
+ // See also the discussion at dev@parquet.apache.org:
+ // https://lists.apache.org/thread/zfnmg5p51b7oylft5w5k4670wgkd4zv4
+ array_statistics->is_min_exact = true;
+ array_statistics->is_max_exact = true;
+ }
+ }
+ array_data->statistics = std::move(array_statistics);
+ *out = std::make_shared<ArrayType<ArrowType>>(std::move(array_data));
return Status::OK();
}
@@ -728,21 +761,26 @@
} // namespace
-#define TRANSFER_INT32(ENUM, ArrowType) \
- case ::arrow::Type::ENUM: { \
- Status s = TransferInt<ArrowType, Int32Type>(reader, pool, value_field, &result); \
- RETURN_NOT_OK(s); \
+#define TRANSFER_INT32(ENUM, ArrowType) \
+ case ::arrow::Type::ENUM: { \
+ Status s = TransferInt<ArrowType, Int32Type>(reader, std::move(metadata), ctx, \
+ value_field, &result); \
+ RETURN_NOT_OK(s); \
} break;
-#define TRANSFER_INT64(ENUM, ArrowType) \
- case ::arrow::Type::ENUM: { \
- Status s = TransferInt<ArrowType, Int64Type>(reader, pool, value_field, &result); \
- RETURN_NOT_OK(s); \
+#define TRANSFER_INT64(ENUM, ArrowType) \
+ case ::arrow::Type::ENUM: { \
+ Status s = TransferInt<ArrowType, Int64Type>(reader, std::move(metadata), ctx, \
+ value_field, &result); \
+ RETURN_NOT_OK(s); \
} break;
-Status TransferColumnData(RecordReader* reader, const std::shared_ptr<Field>& value_field,
- const ColumnDescriptor* descr, MemoryPool* pool,
+Status TransferColumnData(RecordReader* reader,
+ std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
+ const std::shared_ptr<Field>& value_field,
+ const ColumnDescriptor* descr, const ReaderContext* ctx,
std::shared_ptr<ChunkedArray>* out) {
+ auto pool = ctx->pool;
Datum result;
std::shared_ptr<ChunkedArray> chunked_result;
switch (value_field->type()->id()) {
diff --git a/cpp/src/parquet/arrow/reader_internal.h b/cpp/src/parquet/arrow/reader_internal.h
index cf9dbb8..fab56c8 100644
--- a/cpp/src/parquet/arrow/reader_internal.h
+++ b/cpp/src/parquet/arrow/reader_internal.h
@@ -66,7 +66,8 @@
: column_index_(column_index),
reader_(reader),
schema_(reader->metadata()->schema()),
- row_groups_(row_groups.begin(), row_groups.end()) {}
+ row_groups_(row_groups.begin(), row_groups.end()),
+ row_group_index_(-1) {}
virtual ~FileColumnIterator() {}
@@ -75,7 +76,8 @@
return nullptr;
}
- auto row_group_reader = reader_->RowGroup(row_groups_.front());
+ row_group_index_ = row_groups_.front();
+ auto row_group_reader = reader_->RowGroup(row_group_index_);
row_groups_.pop_front();
return row_group_reader->GetColumnPageReader(column_index_);
}
@@ -86,23 +88,29 @@
std::shared_ptr<FileMetaData> metadata() const { return reader_->metadata(); }
+ std::unique_ptr<RowGroupMetaData> row_group_metadata() const {
+ return metadata()->RowGroup(row_group_index_);
+ }
+
+ std::unique_ptr<ColumnChunkMetaData> column_chunk_metadata() const {
+ return row_group_metadata()->ColumnChunk(column_index_);
+ }
+
int column_index() const { return column_index_; }
+ int row_group_index() const { return row_group_index_; }
+
protected:
int column_index_;
ParquetFileReader* reader_;
const SchemaDescriptor* schema_;
std::deque<int> row_groups_;
+ int row_group_index_;
};
using FileColumnIteratorFactory =
std::function<FileColumnIterator*(int, ParquetFileReader*)>;
-Status TransferColumnData(::parquet::internal::RecordReader* reader,
- const std::shared_ptr<::arrow::Field>& value_field,
- const ColumnDescriptor* descr, ::arrow::MemoryPool* pool,
- std::shared_ptr<::arrow::ChunkedArray>* out);
-
struct ReaderContext {
ParquetFileReader* reader;
::arrow::MemoryPool* pool;
@@ -118,5 +126,11 @@
}
};
+Status TransferColumnData(::parquet::internal::RecordReader* reader,
+ std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
+ const std::shared_ptr<::arrow::Field>& value_field,
+ const ColumnDescriptor* descr, const ReaderContext* ctx,
+ std::shared_ptr<::arrow::ChunkedArray>* out);
+
} // namespace arrow
} // namespace parquet