GH-43944: [C++][Parquet] Add support for arrow::ArrayStatistics: non zero-copy int based types (#43945)

### Rationale for this change

Statistics is useful for fast processing.

Target types:

* `UInt8`
* `Int8`
* `UInt16`
* `Int16`
* `UInt32`
* `UInt64`
* `Date32`
* `Time32`
* `Time64`
* `Duration`

### What changes are included in this PR?

Map `ColumnChunkMetaData` information to `arrow::ArrayStatistics`.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.

* GitHub Issue: #43944

Authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
diff --git a/cpp/src/parquet/arrow/arrow_statistics_test.cc b/cpp/src/parquet/arrow/arrow_statistics_test.cc
index a19303c..2638358 100644
--- a/cpp/src/parquet/arrow/arrow_statistics_test.cc
+++ b/cpp/src/parquet/arrow/arrow_statistics_test.cc
@@ -17,12 +17,14 @@
 
 #include "gtest/gtest.h"
 
+#include "arrow/array.h"
 #include "arrow/table.h"
 #include "arrow/testing/gtest_util.h"
 
 #include "parquet/api/reader.h"
 #include "parquet/api/writer.h"
 
+#include "parquet/arrow/reader.h"
 #include "parquet/arrow/schema.h"
 #include "parquet/arrow/writer.h"
 #include "parquet/file_writer.h"
@@ -179,4 +181,107 @@
   ASSERT_FALSE(stats->HasMinMax());
 }
 
+namespace {
+::arrow::Result<std::shared_ptr<::arrow::Array>> StatisticsReadArray(
+    std::shared_ptr<::arrow::DataType> data_type, const std::string& json) {
+  auto schema = ::arrow::schema({::arrow::field("column", data_type)});
+  auto array = ::arrow::ArrayFromJSON(data_type, json);
+  auto record_batch = ::arrow::RecordBatch::Make(schema, array->length(), {array});
+  ARROW_ASSIGN_OR_RAISE(auto sink, ::arrow::io::BufferOutputStream::Create());
+  const auto arrow_writer_properties =
+      parquet::ArrowWriterProperties::Builder().store_schema()->build();
+  ARROW_ASSIGN_OR_RAISE(
+      auto writer,
+      FileWriter::Open(*schema, ::arrow::default_memory_pool(), sink,
+                       default_writer_properties(), arrow_writer_properties));
+  ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*record_batch));
+  ARROW_RETURN_NOT_OK(writer->Close());
+  ARROW_ASSIGN_OR_RAISE(auto buffer, sink->Finish());
+
+  auto reader =
+      ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
+  std::unique_ptr<FileReader> file_reader;
+  ARROW_RETURN_NOT_OK(
+      FileReader::Make(::arrow::default_memory_pool(), std::move(reader), &file_reader));
+  std::shared_ptr<::arrow::ChunkedArray> chunked_array;
+  ARROW_RETURN_NOT_OK(file_reader->ReadColumn(0, &chunked_array));
+  return chunked_array->chunk(0);
+}
+
+template <typename ArrowType, typename MinMaxType>
+void TestStatisticsReadArray(std::shared_ptr<::arrow::DataType> arrow_type) {
+  using ArrowArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
+  using ArrowCType = typename ArrowType::c_type;
+  constexpr auto min = std::numeric_limits<ArrowCType>::min();
+  constexpr auto max = std::numeric_limits<ArrowCType>::max();
+
+  std::string json;
+  json += "[";
+  json += std::to_string(max);
+  json += ", null, ";
+  json += std::to_string(min);
+  json += ", ";
+  json += std::to_string(max);
+  json += "]";
+  ASSERT_OK_AND_ASSIGN(auto array, StatisticsReadArray(arrow_type, json));
+  auto typed_array = std::static_pointer_cast<ArrowArrayType>(array);
+  auto statistics = typed_array->statistics();
+  ASSERT_NE(nullptr, statistics);
+  ASSERT_EQ(true, statistics->null_count.has_value());
+  ASSERT_EQ(1, statistics->null_count.value());
+  ASSERT_EQ(false, statistics->distinct_count.has_value());
+  ASSERT_EQ(true, statistics->min.has_value());
+  ASSERT_EQ(true, std::holds_alternative<MinMaxType>(*statistics->min));
+  ASSERT_EQ(min, std::get<MinMaxType>(*statistics->min));
+  ASSERT_EQ(true, statistics->is_min_exact);
+  ASSERT_EQ(true, statistics->max.has_value());
+  ASSERT_EQ(true, std::holds_alternative<MinMaxType>(*statistics->max));
+  ASSERT_EQ(max, std::get<MinMaxType>(*statistics->max));
+  ASSERT_EQ(true, statistics->is_min_exact);
+}
+}  // namespace
+
+TEST(TestStatisticsRead, Int8) {
+  TestStatisticsReadArray<::arrow::Int8Type, int64_t>(::arrow::int8());
+}
+
+TEST(TestStatisticsRead, UInt8) {
+  TestStatisticsReadArray<::arrow::UInt8Type, uint64_t>(::arrow::uint8());
+}
+
+TEST(TestStatisticsRead, Int16) {
+  TestStatisticsReadArray<::arrow::Int16Type, int64_t>(::arrow::int16());
+}
+
+TEST(TestStatisticsRead, UInt16) {
+  TestStatisticsReadArray<::arrow::UInt16Type, uint64_t>(::arrow::uint16());
+}
+
+TEST(TestStatisticsRead, UInt32) {
+  TestStatisticsReadArray<::arrow::UInt32Type, uint64_t>(::arrow::uint32());
+}
+
+TEST(TestStatisticsRead, UInt64) {
+  TestStatisticsReadArray<::arrow::UInt64Type, uint64_t>(::arrow::uint64());
+}
+
+TEST(TestStatisticsRead, Date32) {
+  TestStatisticsReadArray<::arrow::Date32Type, int64_t>(::arrow::date32());
+}
+
+TEST(TestStatisticsRead, Time32) {
+  TestStatisticsReadArray<::arrow::Time32Type, int64_t>(
+      ::arrow::time32(::arrow::TimeUnit::MILLI));
+}
+
+TEST(TestStatisticsRead, Time64) {
+  TestStatisticsReadArray<::arrow::Time64Type, int64_t>(
+      ::arrow::time64(::arrow::TimeUnit::MICRO));
+}
+
+TEST(TestStatisticsRead, Duration) {
+  TestStatisticsReadArray<::arrow::DurationType, int64_t>(
+      ::arrow::duration(::arrow::TimeUnit::NANO));
+}
+
 }  // namespace parquet::arrow
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 285e2a5..4f57c3f 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -485,8 +485,9 @@
         NextRowGroup();
       }
     }
-    RETURN_NOT_OK(
-        TransferColumnData(record_reader_.get(), field_, descr_, ctx_->pool, &out_));
+    RETURN_NOT_OK(TransferColumnData(record_reader_.get(),
+                                     input_->column_chunk_metadata(), field_, descr_,
+                                     ctx_.get(), &out_));
     return Status::OK();
     END_PARQUET_CATCH_EXCEPTIONS
   }
diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc
index e5aef5a..e6c2d95 100644
--- a/cpp/src/parquet/arrow/reader_internal.cc
+++ b/cpp/src/parquet/arrow/reader_internal.cc
@@ -319,26 +319,59 @@
 }
 
 template <typename ArrowType, typename ParquetType>
-Status TransferInt(RecordReader* reader, MemoryPool* pool,
-                   const std::shared_ptr<Field>& field, Datum* out) {
+Status TransferInt(RecordReader* reader,
+                   std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
+                   const ReaderContext* ctx, const std::shared_ptr<Field>& field,
+                   Datum* out) {
   using ArrowCType = typename ArrowType::c_type;
   using ParquetCType = typename ParquetType::c_type;
   int64_t length = reader->values_written();
   ARROW_ASSIGN_OR_RAISE(auto data,
-                        ::arrow::AllocateBuffer(length * sizeof(ArrowCType), pool));
+                        ::arrow::AllocateBuffer(length * sizeof(ArrowCType), ctx->pool));
 
   auto values = reinterpret_cast<const ParquetCType*>(reader->values());
   auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
   std::copy(values, values + length, out_ptr);
+  int64_t null_count = 0;
+  std::vector<std::shared_ptr<Buffer>> buffers = {nullptr, std::move(data)};
   if (field->nullable()) {
-    *out = std::make_shared<ArrayType<ArrowType>>(field->type(), length, std::move(data),
-                                                  reader->ReleaseIsValid(),
-                                                  reader->null_count());
-  } else {
-    *out =
-        std::make_shared<ArrayType<ArrowType>>(field->type(), length, std::move(data),
-                                               /*null_bitmap=*/nullptr, /*null_count=*/0);
+    null_count = reader->null_count();
+    buffers[0] = reader->ReleaseIsValid();
   }
+  auto array_data =
+      ::arrow::ArrayData::Make(field->type(), length, std::move(buffers), null_count);
+  auto array_statistics = std::make_shared<::arrow::ArrayStatistics>();
+  array_statistics->null_count = null_count;
+  auto statistics = metadata->statistics().get();
+  if (statistics) {
+    if (statistics->HasDistinctCount()) {
+      array_statistics->distinct_count = statistics->distinct_count();
+    }
+    if (statistics->HasMinMax()) {
+      auto typed_statistics =
+          static_cast<::parquet::TypedStatistics<ParquetType>*>(statistics);
+      const ArrowCType min = typed_statistics->min();
+      const ArrowCType max = typed_statistics->max();
+      if (std::is_signed<ArrowCType>::value) {
+        array_statistics->min = static_cast<int64_t>(min);
+        array_statistics->max = static_cast<int64_t>(max);
+      } else {
+        array_statistics->min = static_cast<uint64_t>(min);
+        array_statistics->max = static_cast<uint64_t>(max);
+      }
+      // We can assume that integer based min/max are always exact if
+      // they exist. Apache Parquet's "Statistics" has
+      // "is_min_value_exact" and "is_max_value_exact" but we can
+      // ignore them for integer based min/max.
+      //
+      // See also the discussion at dev@parquet.apache.org:
+      // https://lists.apache.org/thread/zfnmg5p51b7oylft5w5k4670wgkd4zv4
+      array_statistics->is_min_exact = true;
+      array_statistics->is_max_exact = true;
+    }
+  }
+  array_data->statistics = std::move(array_statistics);
+  *out = std::make_shared<ArrayType<ArrowType>>(std::move(array_data));
   return Status::OK();
 }
 
@@ -728,21 +761,26 @@
 
 }  // namespace
 
-#define TRANSFER_INT32(ENUM, ArrowType)                                               \
-  case ::arrow::Type::ENUM: {                                                         \
-    Status s = TransferInt<ArrowType, Int32Type>(reader, pool, value_field, &result); \
-    RETURN_NOT_OK(s);                                                                 \
+#define TRANSFER_INT32(ENUM, ArrowType)                                            \
+  case ::arrow::Type::ENUM: {                                                      \
+    Status s = TransferInt<ArrowType, Int32Type>(reader, std::move(metadata), ctx, \
+                                                 value_field, &result);            \
+    RETURN_NOT_OK(s);                                                              \
   } break;
 
-#define TRANSFER_INT64(ENUM, ArrowType)                                               \
-  case ::arrow::Type::ENUM: {                                                         \
-    Status s = TransferInt<ArrowType, Int64Type>(reader, pool, value_field, &result); \
-    RETURN_NOT_OK(s);                                                                 \
+#define TRANSFER_INT64(ENUM, ArrowType)                                            \
+  case ::arrow::Type::ENUM: {                                                      \
+    Status s = TransferInt<ArrowType, Int64Type>(reader, std::move(metadata), ctx, \
+                                                 value_field, &result);            \
+    RETURN_NOT_OK(s);                                                              \
   } break;
 
-Status TransferColumnData(RecordReader* reader, const std::shared_ptr<Field>& value_field,
-                          const ColumnDescriptor* descr, MemoryPool* pool,
+Status TransferColumnData(RecordReader* reader,
+                          std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
+                          const std::shared_ptr<Field>& value_field,
+                          const ColumnDescriptor* descr, const ReaderContext* ctx,
                           std::shared_ptr<ChunkedArray>* out) {
+  auto pool = ctx->pool;
   Datum result;
   std::shared_ptr<ChunkedArray> chunked_result;
   switch (value_field->type()->id()) {
diff --git a/cpp/src/parquet/arrow/reader_internal.h b/cpp/src/parquet/arrow/reader_internal.h
index cf9dbb8..fab56c8 100644
--- a/cpp/src/parquet/arrow/reader_internal.h
+++ b/cpp/src/parquet/arrow/reader_internal.h
@@ -66,7 +66,8 @@
       : column_index_(column_index),
         reader_(reader),
         schema_(reader->metadata()->schema()),
-        row_groups_(row_groups.begin(), row_groups.end()) {}
+        row_groups_(row_groups.begin(), row_groups.end()),
+        row_group_index_(-1) {}
 
   virtual ~FileColumnIterator() {}
 
@@ -75,7 +76,8 @@
       return nullptr;
     }
 
-    auto row_group_reader = reader_->RowGroup(row_groups_.front());
+    row_group_index_ = row_groups_.front();
+    auto row_group_reader = reader_->RowGroup(row_group_index_);
     row_groups_.pop_front();
     return row_group_reader->GetColumnPageReader(column_index_);
   }
@@ -86,23 +88,29 @@
 
   std::shared_ptr<FileMetaData> metadata() const { return reader_->metadata(); }
 
+  std::unique_ptr<RowGroupMetaData> row_group_metadata() const {
+    return metadata()->RowGroup(row_group_index_);
+  }
+
+  std::unique_ptr<ColumnChunkMetaData> column_chunk_metadata() const {
+    return row_group_metadata()->ColumnChunk(column_index_);
+  }
+
   int column_index() const { return column_index_; }
 
+  int row_group_index() const { return row_group_index_; }
+
  protected:
   int column_index_;
   ParquetFileReader* reader_;
   const SchemaDescriptor* schema_;
   std::deque<int> row_groups_;
+  int row_group_index_;
 };
 
 using FileColumnIteratorFactory =
     std::function<FileColumnIterator*(int, ParquetFileReader*)>;
 
-Status TransferColumnData(::parquet::internal::RecordReader* reader,
-                          const std::shared_ptr<::arrow::Field>& value_field,
-                          const ColumnDescriptor* descr, ::arrow::MemoryPool* pool,
-                          std::shared_ptr<::arrow::ChunkedArray>* out);
-
 struct ReaderContext {
   ParquetFileReader* reader;
   ::arrow::MemoryPool* pool;
@@ -118,5 +126,11 @@
   }
 };
 
+Status TransferColumnData(::parquet::internal::RecordReader* reader,
+                          std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
+                          const std::shared_ptr<::arrow::Field>& value_field,
+                          const ColumnDescriptor* descr, const ReaderContext* ctx,
+                          std::shared_ptr<::arrow::ChunkedArray>* out);
+
 }  // namespace arrow
 }  // namespace parquet