PARQUET-1225: NaN values may lead to incorrect filtering under certaiā€¦

1) `parquet-cpp` does not implement filtering (predicate pushdown). Clients such as Vertica, read the statistics from the metadata and implement their own filtering based on these stats.
Therefore, the read path does not require any changes. We should document that the min/max value can potentially contain NaNs.

2) I made changes to the write path to ignore the NaNs.

Author: Deepak Majeti <deepak.majeti@hpe.com>

Closes #444 from majetideepak/PARQUET-1225 and squashes the following commits:

c29ede2 [Deepak Majeti] refactor code
c02adb2 [Deepak Majeti] fix compiler error
1c229e2 [Deepak Majeti] fix logic for UpdateSpaced
baf6f50 [Deepak Majeti] change api from NotNaN to IsNaN
3144a6a [Deepak Majeti] clang format
ba44611 [Deepak Majeti] review comments and add tests
63e889b [Deepak Majeti] PARQUET-1225: NaN values may lead to incorrect filtering under certain circumstances
diff --git a/src/parquet/statistics-test.cc b/src/parquet/statistics-test.cc
index 1bbef26..5474016 100644
--- a/src/parquet/statistics-test.cc
+++ b/src/parquet/statistics-test.cc
@@ -659,5 +659,124 @@
   ASSERT_FALSE(cc_metadata->is_stats_set());
 }
 
+// PARQUET-1225: Float NaN values may lead to incorrect filtering under certain
+// circumstances
+TEST(TestStatisticsFloatNaN, NaNValues) {
+  constexpr int NUM_VALUES = 10;
+  NodePtr node = PrimitiveNode::Make("nan_float", Repetition::OPTIONAL, Type::FLOAT);
+  ColumnDescriptor descr(node, 1, 1);
+  float values[NUM_VALUES] = {std::nanf(""), -4.0f, -3.0f, -2.0f, -1.0f,
+                              std::nanf(""), 1.0f,  2.0f,  3.0f,  std::nanf("")};
+  float nan_values[NUM_VALUES];
+  for (int i = 0; i < NUM_VALUES; i++) {
+    nan_values[i] = std::nanf("");
+  }
+
+  // Test values
+  TypedRowGroupStatistics<FloatType> nan_stats(&descr);
+  nan_stats.Update(&values[0], NUM_VALUES, 0);
+  float min = nan_stats.min();
+  float max = nan_stats.max();
+  ASSERT_EQ(min, -4.0f);
+  ASSERT_EQ(max, 3.0f);
+
+  // Test all NaNs
+  TypedRowGroupStatistics<FloatType> all_nan_stats(&descr);
+  all_nan_stats.Update(&nan_values[0], NUM_VALUES, 0);
+  min = all_nan_stats.min();
+  max = all_nan_stats.max();
+  ASSERT_TRUE(std::isnan(min));
+  ASSERT_TRUE(std::isnan(max));
+
+  // Test values followed by all NaNs
+  nan_stats.Update(&nan_values[0], NUM_VALUES, 0);
+  min = nan_stats.min();
+  max = nan_stats.max();
+  ASSERT_EQ(min, -4.0f);
+  ASSERT_EQ(max, 3.0f);
+
+  // Test all NaNs followed by values
+  all_nan_stats.Update(&values[0], NUM_VALUES, 0);
+  min = all_nan_stats.min();
+  max = all_nan_stats.max();
+  ASSERT_EQ(min, -4.0f);
+  ASSERT_EQ(max, 3.0f);
+
+  // Test values followed by all NaNs followed by values
+  nan_stats.Update(&values[0], NUM_VALUES, 0);
+  min = nan_stats.min();
+  max = nan_stats.max();
+  ASSERT_EQ(min, -4.0f);
+  ASSERT_EQ(max, 3.0f);
+}
+
+// PARQUET-1225: Float NaN values may lead to incorrect filtering under certain
+// circumstances
+TEST(TestStatisticsFloatNaN, NaNValuesSpaced) {
+  constexpr int NUM_VALUES = 10;
+  NodePtr node = PrimitiveNode::Make("nan_float", Repetition::OPTIONAL, Type::FLOAT);
+  ColumnDescriptor descr(node, 1, 1);
+  float values[NUM_VALUES] = {std::nanf(""), -4.0f, -3.0f, -2.0f, -1.0f,
+                              std::nanf(""), 1.0f,  2.0f,  3.0f,  std::nanf("")};
+  float nan_values[NUM_VALUES];
+  for (int i = 0; i < NUM_VALUES; i++) {
+    nan_values[i] = std::nanf("");
+  }
+  std::vector<uint8_t> valid_bits(BitUtil::RoundUpNumBytes(NUM_VALUES) + 1, 255);
+
+  // Test values
+  TypedRowGroupStatistics<FloatType> nan_stats(&descr);
+  nan_stats.UpdateSpaced(&values[0], valid_bits.data(), 0, NUM_VALUES, 0);
+  float min = nan_stats.min();
+  float max = nan_stats.max();
+  ASSERT_EQ(min, -4.0f);
+  ASSERT_EQ(max, 3.0f);
+
+  // Test all NaNs
+  TypedRowGroupStatistics<FloatType> all_nan_stats(&descr);
+  all_nan_stats.UpdateSpaced(&nan_values[0], valid_bits.data(), 0, NUM_VALUES, 0);
+  min = all_nan_stats.min();
+  max = all_nan_stats.max();
+  ASSERT_TRUE(std::isnan(min));
+  ASSERT_TRUE(std::isnan(max));
+
+  // Test values followed by all NaNs
+  nan_stats.UpdateSpaced(&nan_values[0], valid_bits.data(), 0, NUM_VALUES, 0);
+  min = nan_stats.min();
+  max = nan_stats.max();
+  ASSERT_EQ(min, -4.0f);
+  ASSERT_EQ(max, 3.0f);
+
+  // Test all NaNs followed by values
+  all_nan_stats.UpdateSpaced(&values[0], valid_bits.data(), 0, NUM_VALUES, 0);
+  min = all_nan_stats.min();
+  max = all_nan_stats.max();
+  ASSERT_EQ(min, -4.0f);
+  ASSERT_EQ(max, 3.0f);
+
+  // Test values followed by all NaNs followed by values
+  nan_stats.UpdateSpaced(&values[0], valid_bits.data(), 0, NUM_VALUES, 0);
+  min = nan_stats.min();
+  max = nan_stats.max();
+  ASSERT_EQ(min, -4.0f);
+  ASSERT_EQ(max, 3.0f);
+}
+
+// NaN double values may lead to incorrect filtering under certain circumstances
+TEST(TestStatisticsDoubleNaN, NaNValues) {
+  constexpr int NUM_VALUES = 10;
+  NodePtr node = PrimitiveNode::Make("nan_double", Repetition::OPTIONAL, Type::DOUBLE);
+  ColumnDescriptor descr(node, 1, 1);
+  TypedRowGroupStatistics<DoubleType> nan_stats(&descr);
+  double values[NUM_VALUES] = {std::nan(""), std::nan(""), -3.0, -2.0, -1.0,
+                               0.0,          1.0,          2.0,  3.0,  4.0};
+  double* values_ptr = &values[0];
+  nan_stats.Update(values_ptr, NUM_VALUES, 0);
+  double min = nan_stats.min();
+  double max = nan_stats.max();
+
+  ASSERT_EQ(min, -3.0);
+  ASSERT_EQ(max, 4.0);
+}
 }  // namespace test
 }  // namespace parquet
diff --git a/src/parquet/statistics.cc b/src/parquet/statistics.cc
index 416557c..5b014ed 100644
--- a/src/parquet/statistics.cc
+++ b/src/parquet/statistics.cc
@@ -17,6 +17,7 @@
 
 #include <algorithm>
 #include <cstring>
+#include <type_traits>
 
 #include "parquet/encoding-internal.h"
 #include "parquet/exception.h"
@@ -97,6 +98,67 @@
 }
 
 template <typename DType>
+inline void TypedRowGroupStatistics<DType>::SetMinMax(const T& min, const T& max) {
+  if (!has_min_max_) {
+    has_min_max_ = true;
+    Copy(min, &min_, min_buffer_.get());
+    Copy(max, &max_, max_buffer_.get());
+  } else {
+    Copy(std::min(min_, min, std::ref(*(this->comparator_))), &min_, min_buffer_.get());
+    Copy(std::max(max_, max, std::ref(*(this->comparator_))), &max_, max_buffer_.get());
+  }
+}
+
+template <typename T, typename Enable = void>
+struct StatsHelper {
+  inline int64_t GetValueBeginOffset(const T* values, int64_t count) { return 0; }
+
+  inline int64_t GetValueEndOffset(const T* values, int64_t count) { return count; }
+
+  inline bool IsNaN(const T value) { return false; }
+};
+
+template <typename T>
+struct StatsHelper<T, typename std::enable_if<std::is_floating_point<T>::value>::type> {
+  inline int64_t GetValueBeginOffset(const T* values, int64_t count) {
+    // Skip NaNs
+    for (int64_t i = 0; i < count; i++) {
+      if (!std::isnan(values[i])) {
+        return i;
+      }
+    }
+    return count;
+  }
+
+  inline int64_t GetValueEndOffset(const T* values, int64_t count) {
+    // Skip NaNs
+    for (int64_t i = (count - 1); i >= 0; i--) {
+      if (!std::isnan(values[i])) {
+        return (i + 1);
+      }
+    }
+    return 0;
+  }
+
+  inline bool IsNaN(const T value) { return std::isnan(value); }
+};
+
+template <typename T>
+void SetNaN(T* value) {
+  // no-op
+}
+
+template <>
+void SetNaN<float>(float* value) {
+  *value = std::nanf("");
+}
+
+template <>
+void SetNaN<double>(double* value) {
+  *value = std::nan("");
+}
+
+template <typename DType>
 void TypedRowGroupStatistics<DType>::Update(const T* values, int64_t num_not_null,
                                             int64_t num_null) {
   DCHECK(num_not_null >= 0);
@@ -107,18 +169,29 @@
   // TODO: support distinct count?
   if (num_not_null == 0) return;
 
-  auto batch_minmax =
-      std::minmax_element(values, values + num_not_null, std::ref(*(this->comparator_)));
-  if (!has_min_max_) {
-    has_min_max_ = true;
-    Copy(*batch_minmax.first, &min_, min_buffer_.get());
-    Copy(*batch_minmax.second, &max_, max_buffer_.get());
-  } else {
-    Copy(std::min(min_, *batch_minmax.first, std::ref(*(this->comparator_))), &min_,
-         min_buffer_.get());
-    Copy(std::max(max_, *batch_minmax.second, std::ref(*(this->comparator_))), &max_,
-         max_buffer_.get());
+  // PARQUET-1225: Handle NaNs
+  // The problem arises only if the starting/ending value(s)
+  // of the values-buffer contain NaN
+  StatsHelper<T> helper;
+  int64_t begin_offset = helper.GetValueBeginOffset(values, num_not_null);
+  int64_t end_offset = helper.GetValueEndOffset(values, num_not_null);
+
+  // All values are NaN
+  if (end_offset < begin_offset) {
+    // Set min/max to NaNs in this case.
+    // Don't set has_min_max flag since
+    // these values must be over-written by valid stats later
+    if (!has_min_max_) {
+      SetNaN(&min_);
+      SetNaN(&max_);
+    }
+    return;
   }
+
+  auto batch_minmax = std::minmax_element(values + begin_offset, values + end_offset,
+                                          std::ref(*(this->comparator_)));
+
+  SetMinMax(*batch_minmax.first, *batch_minmax.second);
 }
 
 template <typename DType>
@@ -141,12 +214,26 @@
   int64_t i = 0;
   ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
                                                     length);
+  StatsHelper<T> helper;
   for (; i < length; i++) {
-    if (valid_bits_reader.IsSet()) {
+    // PARQUET-1225: Handle NaNs
+    if (valid_bits_reader.IsSet() && !helper.IsNaN(values[i])) {
       break;
     }
     valid_bits_reader.Next();
   }
+
+  // All are NaNs and stats are not set yet
+  if ((i == length) && helper.IsNaN(values[i - 1])) {
+    // Don't set has_min_max flag since
+    // these values must be over-written by valid stats later
+    if (!has_min_max_) {
+      SetNaN(&min_);
+      SetNaN(&max_);
+    }
+    return;
+  }
+
   T min = values[i];
   T max = values[i];
   for (; i < length; i++) {
@@ -159,14 +246,8 @@
     }
     valid_bits_reader.Next();
   }
-  if (!has_min_max_) {
-    has_min_max_ = true;
-    Copy(min, &min_, min_buffer_.get());
-    Copy(max, &max_, max_buffer_.get());
-  } else {
-    Copy(std::min(min_, min, std::ref(*(this->comparator_))), &min_, min_buffer_.get());
-    Copy(std::max(max_, max, std::ref(*(this->comparator_))), &max_, max_buffer_.get());
-  }
+
+  SetMinMax(min, max);
 }
 
 template <typename DType>
@@ -185,17 +266,7 @@
 
   if (!other.HasMinMax()) return;
 
-  if (!has_min_max_) {
-    Copy(other.min_, &this->min_, min_buffer_.get());
-    Copy(other.max_, &this->max_, max_buffer_.get());
-    has_min_max_ = true;
-    return;
-  }
-
-  Copy(std::min(this->min_, other.min_, std::ref(*(this->comparator_))), &this->min_,
-       min_buffer_.get());
-  Copy(std::max(this->max_, other.max_, std::ref(*(this->comparator_))), &this->max_,
-       max_buffer_.get());
+  SetMinMax(other.min_, other.max_);
 }
 
 template <typename DType>
diff --git a/src/parquet/statistics.h b/src/parquet/statistics.h
index b5466c0..36f9e44 100644
--- a/src/parquet/statistics.h
+++ b/src/parquet/statistics.h
@@ -159,6 +159,7 @@
   void Update(const T* values, int64_t num_not_null, int64_t num_null);
   void UpdateSpaced(const T* values, const uint8_t* valid_bits, int64_t valid_bits_spaced,
                     int64_t num_not_null, int64_t num_null);
+  void SetMinMax(const T& min, const T& max);
 
   const T& min() const;
   const T& max() const;