PARQUET-1655: [C++] Fix comparison of Decimal values in statistics

The prior logic, I don't think is ever correct for signed
comparison.  Signed comparison of bytes as far as I can
tell from the specification is only used by Decimal
encoded values.  Decimals are always encoded as big-endian
two's complement integers.

The new logic reflects this by doing sign extension when
necessary for comparisons, and only using signed byte comparison
for the very first value when appropriate.

This PR also eliminates what appears to be a some dead code.

Closes #9582 from emkornfield/parquet_stats

Lead-authored-by: Micah Kornfield <micahk@google.com>
Co-authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
index ca70215..986f3b0 100644
--- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
+++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
@@ -540,6 +540,23 @@
   return std::static_pointer_cast<GroupNode>(node_);
 }
 
+void ReadSingleColumnFileStatistics(std::unique_ptr<FileReader> file_reader,
+                                    std::shared_ptr<Scalar>* min,
+                                    std::shared_ptr<Scalar>* max) {
+  auto metadata = file_reader->parquet_reader()->metadata();
+  ASSERT_EQ(1, metadata->num_row_groups());
+  ASSERT_EQ(1, metadata->num_columns());
+
+  auto row_group = metadata->RowGroup(0);
+  ASSERT_EQ(1, row_group->num_columns());
+
+  auto column = row_group->ColumnChunk(0);
+  ASSERT_TRUE(column->is_stats_set());
+  auto statistics = column->statistics();
+
+  ASSERT_OK(StatisticsAsScalars(*statistics, min, max));
+}
+
 // Non-template base class for TestParquetIO, to avoid code duplication
 class ParquetIOTestBase : public ::testing::Test {
  public:
@@ -572,23 +589,6 @@
     ASSERT_OK((*out)->ValidateFull());
   }
 
-  void ReadSingleColumnFileStatistics(std::unique_ptr<FileReader> file_reader,
-                                      std::shared_ptr<Scalar>* min,
-                                      std::shared_ptr<Scalar>* max) {
-    auto metadata = file_reader->parquet_reader()->metadata();
-    ASSERT_EQ(1, metadata->num_row_groups());
-    ASSERT_EQ(1, metadata->num_columns());
-
-    auto row_group = metadata->RowGroup(0);
-    ASSERT_EQ(1, row_group->num_columns());
-
-    auto column = row_group->ColumnChunk(0);
-    ASSERT_TRUE(column->is_stats_set());
-    auto statistics = column->statistics();
-
-    ASSERT_OK(StatisticsAsScalars(*statistics, min, max));
-  }
-
   void ReadAndCheckSingleColumnFile(const Array& values) {
     std::shared_ptr<Array> out;
 
@@ -1511,7 +1511,7 @@
     ASSERT_NO_FATAL_FAILURE(MakeTestFile(values, 1, &file_reader));
 
     std::shared_ptr<Scalar> min, max;
-    this->ReadSingleColumnFileStatistics(std::move(file_reader), &min, &max);
+    ReadSingleColumnFileStatistics(std::move(file_reader), &min, &max);
 
     ASSERT_OK_AND_ASSIGN(
         auto value, ::arrow::MakeScalar(::arrow::TypeTraits<TestType>::type_singleton(),
@@ -2673,6 +2673,34 @@
   CheckSimpleRoundtrip(table, 2, props_store_schema);
 }
 
+TEST(ArrowReadWrite, DecimalStats) {
+  using ::arrow::Decimal128;
+  using ::arrow::field;
+
+  auto type = ::arrow::decimal128(/*precision=*/8, /*scale=*/0);
+
+  const char* json = R"(["255", "128", null, "0", "1", "-127", "-128", "-129", "-255"])";
+  auto array = ::arrow::ArrayFromJSON(type, json);
+  auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array});
+
+  std::shared_ptr<Buffer> buffer;
+  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, /*row_grop_size=*/100,
+                                             default_arrow_writer_properties(), &buffer));
+
+  std::unique_ptr<FileReader> reader;
+  ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
+                              ::arrow::default_memory_pool(), &reader));
+
+  std::shared_ptr<Scalar> min, max;
+  ReadSingleColumnFileStatistics(std::move(reader), &min, &max);
+
+  std::shared_ptr<Scalar> expected_min, expected_max;
+  ASSERT_OK_AND_ASSIGN(expected_min, array->GetScalar(array->length() - 1));
+  ASSERT_OK_AND_ASSIGN(expected_max, array->GetScalar(0));
+  ::arrow::AssertScalarsEqual(*expected_min, *min, /*verbose=*/true);
+  ::arrow::AssertScalarsEqual(*expected_max, *max, /*verbose=*/true);
+}
+
 TEST(ArrowReadWrite, NestedNullableField) {
   auto int_field = ::arrow::field("int_array", ::arrow::int32());
   auto int_array =
diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc
index 360078f..1410a5f 100644
--- a/cpp/src/parquet/arrow/reader_internal.cc
+++ b/cpp/src/parquet/arrow/reader_internal.cc
@@ -63,8 +63,12 @@
 using arrow::ChunkedArray;
 using arrow::DataType;
 using arrow::Datum;
+using arrow::Decimal128;
 using arrow::Decimal128Array;
+using arrow::Decimal128Type;
+using arrow::Decimal256;
 using arrow::Decimal256Array;
+using arrow::Decimal256Type;
 using arrow::Field;
 using arrow::Int32Array;
 using arrow::ListArray;
@@ -93,6 +97,7 @@
 
 namespace parquet {
 namespace arrow {
+namespace {
 
 template <typename ArrowType>
 using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
@@ -188,13 +193,61 @@
   return Status::NotImplemented("Cannot extract statistics for type ");
 }
 
-static inline Status ByteArrayStatisticsAsScalars(const Statistics& statistics,
-                                                  std::shared_ptr<::arrow::Scalar>* min,
-                                                  std::shared_ptr<::arrow::Scalar>* max) {
-  auto logical_type = statistics.descr()->logical_type();
-  auto type = logical_type->type() == LogicalType::Type::STRING ? ::arrow::utf8()
-                                                                : ::arrow::binary();
+template <typename DecimalType>
+Result<std::shared_ptr<::arrow::Scalar>> FromBigEndianString(
+    const std::string& data, std::shared_ptr<DataType> arrow_type) {
+  ARROW_ASSIGN_OR_RAISE(
+      DecimalType decimal,
+      DecimalType::FromBigEndian(reinterpret_cast<const uint8_t*>(data.data()),
+                                 static_cast<int32_t>(data.size())));
+  return ::arrow::MakeScalar(std::move(arrow_type), decimal);
+}
 
+// Extracts Min and Max scalar from bytes like types (i.e. types where
+// decimal is encoded as little endian.
+Status ExtractDecimalMinMaxFromBytesType(const Statistics& statistics,
+                                         const LogicalType& logical_type,
+                                         std::shared_ptr<::arrow::Scalar>* min,
+                                         std::shared_ptr<::arrow::Scalar>* max) {
+  const DecimalLogicalType& decimal_type =
+      checked_cast<const DecimalLogicalType&>(logical_type);
+
+  Result<std::shared_ptr<DataType>> maybe_type =
+      Decimal128Type::Make(decimal_type.precision(), decimal_type.scale());
+  std::shared_ptr<DataType> arrow_type;
+  if (maybe_type.ok()) {
+    arrow_type = maybe_type.ValueOrDie();
+    ARROW_ASSIGN_OR_RAISE(
+        *min, FromBigEndianString<Decimal128>(statistics.EncodeMin(), arrow_type));
+    ARROW_ASSIGN_OR_RAISE(*max, FromBigEndianString<Decimal128>(statistics.EncodeMax(),
+                                                                std::move(arrow_type)));
+    return Status::OK();
+  }
+  // Fallback to see if Decimal256 can represent the type.
+  ARROW_ASSIGN_OR_RAISE(
+      arrow_type, Decimal256Type::Make(decimal_type.precision(), decimal_type.scale()));
+  ARROW_ASSIGN_OR_RAISE(
+      *min, FromBigEndianString<Decimal256>(statistics.EncodeMin(), arrow_type));
+  ARROW_ASSIGN_OR_RAISE(*max, FromBigEndianString<Decimal256>(statistics.EncodeMax(),
+                                                              std::move(arrow_type)));
+
+  return Status::OK();
+}
+
+Status ByteArrayStatisticsAsScalars(const Statistics& statistics,
+                                    std::shared_ptr<::arrow::Scalar>* min,
+                                    std::shared_ptr<::arrow::Scalar>* max) {
+  auto logical_type = statistics.descr()->logical_type();
+  if (logical_type->type() == LogicalType::Type::DECIMAL) {
+    return ExtractDecimalMinMaxFromBytesType(statistics, *logical_type, min, max);
+  }
+  std::shared_ptr<::arrow::DataType> type;
+  if (statistics.descr()->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
+    type = ::arrow::fixed_size_binary(statistics.descr()->type_length());
+  } else {
+    type = logical_type->type() == LogicalType::Type::STRING ? ::arrow::utf8()
+                                                             : ::arrow::binary();
+  }
   ARROW_ASSIGN_OR_RAISE(
       *min, ::arrow::MakeScalar(type, Buffer::FromString(statistics.EncodeMin())));
   ARROW_ASSIGN_OR_RAISE(
@@ -203,6 +256,8 @@
   return Status::OK();
 }
 
+}  // namespace
+
 Status StatisticsAsScalars(const Statistics& statistics,
                            std::shared_ptr<::arrow::Scalar>* min,
                            std::shared_ptr<::arrow::Scalar>* max) {
@@ -234,6 +289,7 @@
       return FromInt64Statistics(checked_cast<const Int64Statistics&>(statistics),
                                  *logical_type, min, max);
     case Type::BYTE_ARRAY:
+    case Type::FIXED_LEN_BYTE_ARRAY:
       return ByteArrayStatisticsAsScalars(statistics, min, max);
     default:
       return Status::NotImplemented("Extract statistics unsupported for physical_type ",
@@ -246,6 +302,8 @@
 // ----------------------------------------------------------------------
 // Primitive types
 
+namespace {
+
 template <typename ArrowType, typename ParquetType>
 Status TransferInt(RecordReader* reader, MemoryPool* pool,
                    const std::shared_ptr<DataType>& type, Datum* out) {
@@ -584,6 +642,8 @@
   return Status::OK();
 }
 
+}  // namespace
+
 #define TRANSFER_INT32(ENUM, ArrowType)                                              \
   case ::arrow::Type::ENUM: {                                                        \
     Status s = TransferInt<ArrowType, Int32Type>(reader, pool, value_type, &result); \
diff --git a/cpp/src/parquet/statistics.cc b/cpp/src/parquet/statistics.cc
index d261cb5..bc474e9 100644
--- a/cpp/src/parquet/statistics.cc
+++ b/cpp/src/parquet/statistics.cc
@@ -48,6 +48,9 @@
 // ----------------------------------------------------------------------
 // Comparator implementations
 
+constexpr int value_length(int value_length, const ByteArray& value) { return value.len; }
+constexpr int value_length(int type_length, const FLBA& value) { return type_length; }
+
 template <typename DType, bool is_signed>
 struct CompareHelper {
   using T = typename DType::c_type;
@@ -133,56 +136,110 @@
   static T Max(int type_length, const T& a, const T& b) {
     return Compare(0, a, b) ? b : a;
   }
-};  // namespace parquet
+};
 
-template <typename DType, bool is_signed>
-struct ByteLikeCompareHelperBase {
-  using T = ByteArrayType::c_type;
-  using PtrType = typename std::conditional<is_signed, int8_t, uint8_t>::type;
+template <typename T, bool is_signed>
+struct BinaryLikeComparer {};
 
-  static T DefaultMin() { return {}; }
-  static T DefaultMax() { return {}; }
-  static T Coalesce(T val, T fallback) { return val; }
-
-  static inline bool Compare(int type_length, const T& a, const T& b) {
-    const auto* aptr = reinterpret_cast<const PtrType*>(a.ptr);
-    const auto* bptr = reinterpret_cast<const PtrType*>(b.ptr);
-    return std::lexicographical_compare(aptr, aptr + a.len, bptr, bptr + b.len);
+template <typename T>
+struct BinaryLikeComparer<T, /*is_signed=*/false> {
+  static bool Compare(int type_length, const T& a, const T& b) {
+    int a_length = value_length(type_length, a);
+    int b_length = value_length(type_length, b);
+    // Unsigned comparison is used for non-numeric types so straight
+    // lexiographic comparison makes sense. (a.ptr is always unsigned)....
+    return std::lexicographical_compare(a.ptr, a.ptr + a_length, b.ptr, b.ptr + b_length);
   }
+};
 
-  static T Min(int type_length, const T& a, const T& b) {
-    if (a.ptr == nullptr) return b;
-    if (b.ptr == nullptr) return a;
-    return Compare(type_length, a, b) ? a : b;
-  }
+template <typename T>
+struct BinaryLikeComparer<T, /*is_signed=*/true> {
+  static bool Compare(int type_length, const T& a, const T& b) {
+    // Is signed is used for integers encoded as big-endian twos
+    // complement integers. (e.g. decimals).
+    int a_length = value_length(type_length, a);
+    int b_length = value_length(type_length, b);
 
-  static T Max(int type_length, const T& a, const T& b) {
-    if (a.ptr == nullptr) return b;
-    if (b.ptr == nullptr) return a;
-    return Compare(type_length, a, b) ? b : a;
+    // At least of the lengths is zero.
+    if (a_length == 0 || b_length == 0) {
+      return a_length == 0 && b_length > 0;
+    }
+
+    int8_t first_a = *a.ptr;
+    int8_t first_b = *b.ptr;
+    // We can short circuit for different signed numbers or
+    // for equal length bytes arrays that have different first bytes.
+    // The equality requirement is necessary for sign extension cases.
+    // 0xFF10 should be eqaul to 0x10 (due to big endian sign extension).
+    if ((0x80 & first_a) != (0x80 & first_b) ||
+        (a_length == b_length && first_a != first_b)) {
+      return first_a < first_b;
+    }
+    // When the lengths are unequal and the numbers are of the same
+    // sign we need to do comparison by sign extending the shorter
+    // value first, and once we get to equal sized arrays, lexicographical
+    // unsigned comparison of everything but the first byte is sufficient.
+    const uint8_t* a_start = a.ptr;
+    const uint8_t* b_start = b.ptr;
+    if (a_length != b_length) {
+      const uint8_t* lead_start = nullptr;
+      const uint8_t* lead_end = nullptr;
+      if (a_length > b_length) {
+        int lead_length = a_length - b_length;
+        lead_start = a.ptr;
+        lead_end = a.ptr + lead_length;
+        a_start += lead_length;
+      } else {
+        DCHECK_LT(a_length, b_length);
+        int lead_length = b_length - a_length;
+        lead_start = b.ptr;
+        lead_end = b.ptr + lead_length;
+        b_start += lead_length;
+      }
+      // Compare extra bytes to the sign extension of the first
+      // byte of the other number.
+      uint8_t extension = first_a < 0 ? 0xFF : 0;
+      bool not_equal = std::any_of(lead_start, lead_end,
+                                   [extension](uint8_t a) { return extension != a; });
+      if (not_equal) {
+        // Since sign extension are extrema values for unsigned bytes:
+        //
+        // Four cases exist:
+        //    negative values:
+        //      b is the longer value.
+        //        b must be the lesser value: return false
+        //      else:
+        //        a must be the lesser value: return true
+        //
+        //    positive values:
+        //      b  is the longer value.
+        //        values in b must be greater than a: return true
+        //      else:
+        //        values in a must be greater than b: return false
+        bool negative_values = first_a < 0;
+        bool b_longer = a_length < b_length;
+        return negative_values != b_longer;
+      }
+    } else {
+      a_start++;
+      b_start++;
+    }
+    return std::lexicographical_compare(a_start, a.ptr + a_length, b_start,
+                                        b.ptr + b_length);
   }
 };
 
 template <typename DType, bool is_signed>
 struct BinaryLikeCompareHelperBase {
   using T = typename DType::c_type;
-  using PtrType = typename std::conditional<is_signed, int8_t, uint8_t>::type;
 
   static T DefaultMin() { return {}; }
   static T DefaultMax() { return {}; }
   static T Coalesce(T val, T fallback) { return val; }
 
-  static int value_length(int value_length, const ByteArray& value) { return value.len; }
-
-  static int value_length(int type_length, const FLBA& value) { return type_length; }
-
   static inline bool Compare(int type_length, const T& a, const T& b) {
-    const auto* aptr = reinterpret_cast<const PtrType*>(a.ptr);
-    const auto* bptr = reinterpret_cast<const PtrType*>(b.ptr);
-    return std::lexicographical_compare(aptr, aptr + value_length(type_length, a), bptr,
-                                        bptr + value_length(type_length, b));
+    return BinaryLikeComparer<T, is_signed>::Compare(type_length, a, b);
   }
-
   static T Min(int type_length, const T& a, const T& b) {
     if (a.ptr == nullptr) return b;
     if (b.ptr == nullptr) return a;
diff --git a/cpp/src/parquet/statistics_test.cc b/cpp/src/parquet/statistics_test.cc
index e1c78c7..8a275fd 100644
--- a/cpp/src/parquet/statistics_test.cc
+++ b/cpp/src/parquet/statistics_test.cc
@@ -69,21 +69,51 @@
 }
 
 TEST(Comparison, SignedByteArray) {
+  // Signed byte array comparison is only used for Decimal comparison. When
+  // decimals are encoded as byte arrays they use twos complement big-endian
+  // encoded values. Comparisons of byte arrays of unequal types need to handle
+  // sign extension.
   auto comparator = MakeComparator<ByteArrayType>(Type::BYTE_ARRAY, SortOrder::SIGNED);
+  struct Case {
+    std::vector<uint8_t> bytes;
+    int order;
+    ByteArray ToByteArray() const {
+      return ByteArray(static_cast<int>(bytes.size()), bytes.data());
+    }
+  };
 
-  std::string s1 = "12345";
-  std::string s2 = "12345678";
-  ByteArray s1ba = ByteArrayFromString(s1);
-  ByteArray s2ba = ByteArrayFromString(s2);
-  ASSERT_TRUE(comparator->Compare(s1ba, s2ba));
+  // Test a mix of big-endian comparison values that are both equal and
+  // unequal after sign extension.
+  std::vector<Case> cases = {
+      {{0x80, 0x80, 0, 0}, 0},           {{/*0xFF,*/ 0x80, 0, 0}, 1},
+      {{0xFF, 0x80, 0, 0}, 1},           {{/*0xFF,*/ 0xFF, 0x01, 0}, 2},
+      {{/*0xFF,  0xFF,*/ 0x80, 0}, 3},   {{/*0xFF,*/ 0xFF, 0x80, 0}, 3},
+      {{0xFF, 0xFF, 0x80, 0}, 3},        {{/*0xFF,0xFF,0xFF,*/ 0x80}, 4},
+      {{/*0xFF, 0xFF, 0xFF,*/ 0xFF}, 5}, {{/*0, 0,*/ 0x01, 0x01}, 6},
+      {{/*0,*/ 0, 0x01, 0x01}, 6},       {{0, 0, 0x01, 0x01}, 6},
+      {{/*0,*/ 0x01, 0x01, 0}, 7},       {{0x01, 0x01, 0, 0}, 8}};
 
-  // This is case where signed comparison UTF-8 (PARQUET-686) is incorrect
-  // This example is to only check signed comparison and not UTF-8.
-  s1 = u8"bügeln";
-  s2 = u8"braten";
-  s1ba = ByteArrayFromString(s1);
-  s2ba = ByteArrayFromString(s2);
-  ASSERT_TRUE(comparator->Compare(s1ba, s2ba));
+  for (size_t x = 0; x < cases.size(); x++) {
+    const auto& case1 = cases[x];
+    // Empty array is always the smallest values
+    EXPECT_TRUE(comparator->Compare(ByteArray(), case1.ToByteArray())) << x;
+    EXPECT_FALSE(comparator->Compare(case1.ToByteArray(), ByteArray())) << x;
+    // Equals is always false.
+    EXPECT_FALSE(comparator->Compare(case1.ToByteArray(), case1.ToByteArray())) << x;
+
+    for (size_t y = 0; y < cases.size(); y++) {
+      const auto& case2 = cases[y];
+      if (case1.order < case2.order) {
+        EXPECT_TRUE(comparator->Compare(case1.ToByteArray(), case2.ToByteArray()))
+            << x << " (order: " << case1.order << ") " << y << " (order: " << case2.order
+            << ")";
+      } else {
+        EXPECT_FALSE(comparator->Compare(case1.ToByteArray(), case2.ToByteArray()))
+            << x << " (order: " << case1.order << ") " << y << " (order: " << case2.order
+            << ")";
+      }
+    }
+  }
 }
 
 TEST(Comparison, UnsignedByteArray) {
@@ -111,21 +141,28 @@
 }
 
 TEST(Comparison, SignedFLBA) {
-  int size = 10;
+  int size = 4;
   auto comparator =
       MakeComparator<FLBAType>(Type::FIXED_LEN_BYTE_ARRAY, SortOrder::SIGNED, size);
 
-  std::string s1 = "Anti123456";
-  std::string s2 = "Bunkd123456";
-  FLBA s1flba = FLBAFromString(s1);
-  FLBA s2flba = FLBAFromString(s2);
-  ASSERT_TRUE(comparator->Compare(s1flba, s2flba));
+  std::vector<uint8_t> byte_values[] = {
+      {0x80, 0, 0, 0},          {0xFF, 0xFF, 0x01, 0},    {0xFF, 0xFF, 0x80, 0},
+      {0xFF, 0xFF, 0xFF, 0x80}, {0xFF, 0xFF, 0xFF, 0xFF}, {0, 0, 0x01, 0x01},
+      {0, 0x01, 0x01, 0},       {0x01, 0x01, 0, 0}};
+  std::vector<FLBA> values_to_compare;
+  for (auto& bytes : byte_values) {
+    values_to_compare.emplace_back(FLBA(bytes.data()));
+  }
 
-  s1 = "Bünk123456";
-  s2 = "Bunk123456";
-  s1flba = FLBAFromString(s1);
-  s2flba = FLBAFromString(s2);
-  ASSERT_TRUE(comparator->Compare(s1flba, s2flba));
+  for (size_t x = 0; x < values_to_compare.size(); x++) {
+    EXPECT_FALSE(comparator->Compare(values_to_compare[x], values_to_compare[x])) << x;
+    for (size_t y = x + 1; y < values_to_compare.size(); y++) {
+      EXPECT_TRUE(comparator->Compare(values_to_compare[x], values_to_compare[y]))
+          << x << " " << y;
+      EXPECT_FALSE(comparator->Compare(values_to_compare[y], values_to_compare[x]))
+          << y << " " << x;
+    }
+  }
 }
 
 TEST(Comparison, UnsignedFLBA) {