PARQUET-1002: Compute statistics based on Sort Order

@lomereiter,  You might also want to take a look since you previously implemented the Statistics API.

Author: Deepak Majeti <deepak.majeti@hpe.com>

Closes #383 from majetideepak/PARQUET-1002 and squashes the following commits:

5a93fe3 [Deepak Majeti] fix error
48dd22d [Deepak Majeti] change fix version from 1.2.1 to 1.3.0
712ea90 [Deepak Majeti] Move test to statistics-test
75ea475 [Deepak Majeti] Fix formatting
c5d9610 [Deepak Majeti] Rename compare to comparator
ba18ae6 [Deepak Majeti] Review comments
f87bda6 [Deepak Majeti] Avoid UTF-8 file encoding mismatch between windows and linux
3cb8a3e [Deepak Majeti] Review comments
17a79f3 [Deepak Majeti] fix failures
85a9052 [Deepak Majeti] fix Clang failure and improve test Fix Warnings on Windows
bdd8d37 [Deepak Majeti] Add another test
6a985de [Deepak Majeti] Comments
808e764 [Deepak Majeti] fix failure
55960fe [Deepak Majeti] format
b4fba8b [Deepak Majeti] Add test for Unknown sort order
d58658d [Deepak Majeti] make format
c1abb69 [Deepak Majeti] rename to Make
3df2686 [Deepak Majeti] Add Reader Writer Statistics Test
c4b1827 [Deepak Majeti] extend testing fix tests
4287565 [Deepak Majeti] Fix reader read new max and min values
046f8d3 [Deepak Majeti] Move SortOrder to types.h add int32 comparison INT96 fix statistics in metadata Use templates
diff --git a/CMakeLists.txt b/CMakeLists.txt
index b7a41d8..1ecf877 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -646,6 +646,7 @@
 
   src/parquet/parquet_constants.cpp
   src/parquet/parquet_types.cpp
+  src/parquet/util/comparison.cc
   src/parquet/util/memory.cc
 )
 
diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc
index 3ec3663..48f243e 100644
--- a/src/parquet/column_writer-test.cc
+++ b/src/parquet/column_writer-test.cc
@@ -151,14 +151,16 @@
   void ReadAndCompare(Compression::type compression, int64_t num_rows) {
     this->SetupValuesOut(num_rows);
     this->ReadColumnFully(compression);
-    Compare<T> compare(this->descr_);
+    std::shared_ptr<CompareDefault<TestType>> compare;
+    compare = std::static_pointer_cast<CompareDefault<TestType>>(
+        Comparator::Make(this->descr_));
     for (size_t i = 0; i < this->values_.size(); i++) {
-      if (compare(this->values_[i], this->values_out_[i]) ||
-          compare(this->values_out_[i], this->values_[i])) {
+      if ((*compare)(this->values_[i], this->values_out_[i]) ||
+          (*compare)(this->values_out_[i], this->values_[i])) {
         std::cout << "Failed at " << i << std::endl;
       }
-      ASSERT_FALSE(compare(this->values_[i], this->values_out_[i]));
-      ASSERT_FALSE(compare(this->values_out_[i], this->values_[i]));
+      ASSERT_FALSE((*compare)(this->values_[i], this->values_out_[i]));
+      ASSERT_FALSE((*compare)(this->values_out_[i], this->values_[i]));
     }
     ASSERT_EQ(this->values_, this->values_out_);
   }
diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc
index b36f395..ac7e2ba 100644
--- a/src/parquet/column_writer.cc
+++ b/src/parquet/column_writer.cc
@@ -267,7 +267,10 @@
     FlushBufferedDataPages();
 
     EncodedStatistics chunk_statistics = GetChunkStatistics();
-    if (chunk_statistics.is_set()) metadata_->SetStatistics(chunk_statistics);
+    if (chunk_statistics.is_set()) {
+      metadata_->SetStatistics(SortOrder::SIGNED == descr_->sort_order(),
+                               chunk_statistics);
+    }
     pager_->Close(has_dictionary_, fallback_);
   }
 
@@ -317,7 +320,8 @@
       ParquetException::NYI("Selected encoding is not supported");
   }
 
-  if (properties->statistics_enabled(descr_->path())) {
+  if (properties->statistics_enabled(descr_->path()) &&
+      (SortOrder::UNKNOWN != descr_->sort_order())) {
     page_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
     chunk_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
   }
diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/file/file-metadata-test.cc
index a7c438c..5bd8419 100644
--- a/src/parquet/file/file-metadata-test.cc
+++ b/src/parquet/file/file-metadata-test.cc
@@ -63,8 +63,8 @@
   auto col1_builder = rg1_builder->NextColumnChunk();
   auto col2_builder = rg1_builder->NextColumnChunk();
   // column metadata
-  col1_builder->SetStatistics(stats_int);
-  col2_builder->SetStatistics(stats_float);
+  col1_builder->SetStatistics(true, stats_int);
+  col2_builder->SetStatistics(true, stats_float);
   col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, true, false);
   col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, true, false);
   rg1_builder->Finish(1024);
@@ -73,8 +73,8 @@
   col1_builder = rg2_builder->NextColumnChunk();
   col2_builder = rg2_builder->NextColumnChunk();
   // column metadata
-  col1_builder->SetStatistics(stats_int);
-  col2_builder->SetStatistics(stats_float);
+  col1_builder->SetStatistics(true, stats_int);
+  col2_builder->SetStatistics(true, stats_float);
   col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, true, false);
   col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, true, false);
   rg2_builder->Finish(1024);
@@ -215,11 +215,12 @@
 
   ASSERT_EQ(true, version.VersionLt(version1));
 
-  ASSERT_FALSE(version1.HasCorrectStatistics(Type::INT96));
-  ASSERT_TRUE(version.HasCorrectStatistics(Type::INT32));
-  ASSERT_FALSE(version.HasCorrectStatistics(Type::BYTE_ARRAY));
-  ASSERT_TRUE(version1.HasCorrectStatistics(Type::BYTE_ARRAY));
-  ASSERT_TRUE(version3.HasCorrectStatistics(Type::FIXED_LEN_BYTE_ARRAY));
+  ASSERT_FALSE(version1.HasCorrectStatistics(Type::INT96, SortOrder::SIGNED));
+  ASSERT_TRUE(version.HasCorrectStatistics(Type::INT32, SortOrder::SIGNED));
+  ASSERT_FALSE(version.HasCorrectStatistics(Type::BYTE_ARRAY, SortOrder::SIGNED));
+  ASSERT_TRUE(version1.HasCorrectStatistics(Type::BYTE_ARRAY, SortOrder::SIGNED));
+  ASSERT_TRUE(
+      version3.HasCorrectStatistics(Type::FIXED_LEN_BYTE_ARRAY, SortOrder::SIGNED));
 }
 
 }  // namespace metadata
diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc
index d5a96f3..6e7fc3b 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/file/metadata.cc
@@ -35,62 +35,20 @@
     ApplicationVersion("parquet-mr version 1.8.0");
 const ApplicationVersion ApplicationVersion::PARQUET_816_FIXED_VERSION =
     ApplicationVersion("parquet-mr version 1.2.9");
-
-// Return the Sort Order of the Parquet Physical Types
-SortOrder default_sort_order(Type::type primitive) {
-  switch (primitive) {
-    case Type::BOOLEAN:
-    case Type::INT32:
-    case Type::INT64:
-    case Type::FLOAT:
-    case Type::DOUBLE:
-      return SortOrder::SIGNED;
-    case Type::BYTE_ARRAY:
-    case Type::FIXED_LEN_BYTE_ARRAY:
-    case Type::INT96:  // only used for timestamp, which uses unsigned values
-      return SortOrder::UNSIGNED;
-  }
-  return SortOrder::UNKNOWN;
-}
-
-// Return the SortOrder of the Parquet Types using Logical or Physical Types
-SortOrder get_sort_order(LogicalType::type converted, Type::type primitive) {
-  if (converted == LogicalType::NONE) return default_sort_order(primitive);
-  switch (converted) {
-    case LogicalType::INT_8:
-    case LogicalType::INT_16:
-    case LogicalType::INT_32:
-    case LogicalType::INT_64:
-    case LogicalType::DATE:
-    case LogicalType::TIME_MICROS:
-    case LogicalType::TIME_MILLIS:
-    case LogicalType::TIMESTAMP_MICROS:
-    case LogicalType::TIMESTAMP_MILLIS:
-      return SortOrder::SIGNED;
-    case LogicalType::UINT_8:
-    case LogicalType::UINT_16:
-    case LogicalType::UINT_32:
-    case LogicalType::UINT_64:
-    case LogicalType::ENUM:
-    case LogicalType::UTF8:
-    case LogicalType::BSON:
-    case LogicalType::JSON:
-      return SortOrder::UNSIGNED;
-    case LogicalType::NA:
-    case LogicalType::DECIMAL:
-    case LogicalType::LIST:
-    case LogicalType::MAP:
-    case LogicalType::MAP_KEY_VALUE:
-    case LogicalType::INTERVAL:
-    case LogicalType::NONE:  // required instead of default
-      return SortOrder::UNKNOWN;
-  }
-  return SortOrder::UNKNOWN;
-}
+const ApplicationVersion ApplicationVersion::PARQUET_CPP_FIXED_STATS_VERSION =
+    ApplicationVersion("parquet-cpp version 1.3.0");
 
 template <typename DType>
 static std::shared_ptr<RowGroupStatistics> MakeTypedColumnStats(
     const format::ColumnMetaData& metadata, const ColumnDescriptor* descr) {
+  // If new fields max_value/min_value are set, then return them.
+  if (metadata.statistics.__isset.max_value || metadata.statistics.__isset.min_value) {
+    return std::make_shared<TypedRowGroupStatistics<DType>>(
+        descr, metadata.statistics.min_value, metadata.statistics.max_value,
+        metadata.num_values - metadata.statistics.null_count,
+        metadata.statistics.null_count, metadata.statistics.distinct_count, true);
+  }
+  // Default behavior
   return std::make_shared<TypedRowGroupStatistics<DType>>(
       descr, metadata.statistics.min, metadata.statistics.max,
       metadata.num_values - metadata.statistics.null_count,
@@ -159,9 +117,7 @@
   inline bool is_stats_set() const {
     DCHECK(writer_version_ != nullptr);
     return column_->meta_data.__isset.statistics &&
-           writer_version_->HasCorrectStatistics(type()) &&
-           SortOrder::SIGNED ==
-               get_sort_order(descr_->logical_type(), descr_->physical_type());
+           writer_version_->HasCorrectStatistics(type(), descr_->sort_order());
   }
 
   inline std::shared_ptr<RowGroupStatistics> statistics() const {
@@ -534,15 +490,21 @@
 // Reference:
 // parquet-mr/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
 // PARQUET-686 has more disussion on statistics
-bool ApplicationVersion::HasCorrectStatistics(Type::type col_type) const {
-  // None of the current tools write INT96 Statistics correctly
-  if (col_type == Type::INT96) return false;
+bool ApplicationVersion::HasCorrectStatistics(Type::type col_type,
+                                              SortOrder::type sort_order) const {
+  // Parquet cpp version 1.3.0 onwards stats are computed correctly for all types
+  if ((application_ != "parquet-cpp") || (VersionLt(PARQUET_CPP_FIXED_STATS_VERSION))) {
+    // Only SIGNED are valid
+    if (SortOrder::SIGNED != sort_order) return false;
 
-  // Statistics of other types are OK
-  if (col_type != Type::FIXED_LEN_BYTE_ARRAY && col_type != Type::BYTE_ARRAY) {
-    return true;
+    // None of the current tools write INT96 Statistics correctly
+    if (col_type == Type::INT96) return false;
+
+    // Statistics of other types are OK
+    if (col_type != Type::FIXED_LEN_BYTE_ARRAY && col_type != Type::BYTE_ARRAY) {
+      return true;
+    }
   }
-
   // created_by is not populated, which could have been caused by
   // parquet-mr during the same time as PARQUET-251, see PARQUET-297
   if (application_ == "unknown") {
@@ -577,16 +539,24 @@
   void set_file_path(const std::string& val) { column_chunk_->__set_file_path(val); }
 
   // column metadata
-  void SetStatistics(const EncodedStatistics& val) {
+  void SetStatistics(bool is_signed, const EncodedStatistics& val) {
     format::Statistics stats;
     stats.null_count = val.null_count;
     stats.distinct_count = val.distinct_count;
-    stats.max = val.max();
-    stats.min = val.min();
-    stats.__isset.min = val.has_min;
-    stats.__isset.max = val.has_max;
+    stats.max_value = val.max();
+    stats.min_value = val.min();
+    stats.__isset.min_value = val.has_min;
+    stats.__isset.max_value = val.has_max;
     stats.__isset.null_count = val.has_null_count;
     stats.__isset.distinct_count = val.has_distinct_count;
+    // If the order is SIGNED, then the old min/max values must be set too.
+    // This for backward compatibility
+    if (is_signed) {
+      stats.max = val.max();
+      stats.min = val.min();
+      stats.__isset.min = val.has_min;
+      stats.__isset.max = val.has_max;
+    }
 
     column_chunk_->meta_data.__set_statistics(stats);
   }
@@ -674,8 +644,9 @@
   return impl_->descr();
 }
 
-void ColumnChunkMetaDataBuilder::SetStatistics(const EncodedStatistics& result) {
-  impl_->SetStatistics(result);
+void ColumnChunkMetaDataBuilder::SetStatistics(bool is_signed,
+                                               const EncodedStatistics& result) {
+  impl_->SetStatistics(is_signed, result);
 }
 
 class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h
index 4250f6b..0d8e10e 100644
--- a/src/parquet/file/metadata.h
+++ b/src/parquet/file/metadata.h
@@ -35,22 +35,12 @@
 
 using KeyValueMetadata = ::arrow::KeyValueMetadata;
 
-// Reference:
-// parquet-mr/parquet-hadoop/src/main/java/org/apache/parquet/
-//                            format/converter/ParquetMetadataConverter.java
-// Sort order for page and column statistics. Types are associated with sort
-// orders (e.g., UTF8 columns should use UNSIGNED) and column stats are
-// aggregated using a sort order. As of parquet-format version 2.3.1, the
-// order used to aggregate stats is always SIGNED and is not stored in the
-// Parquet file. These stats are discarded for types that need unsigned.
-// See PARQUET-686.
-enum SortOrder { SIGNED, UNSIGNED, UNKNOWN };
-
 class ApplicationVersion {
  public:
   // Known Versions with Issues
   static const ApplicationVersion PARQUET_251_FIXED_VERSION;
   static const ApplicationVersion PARQUET_816_FIXED_VERSION;
+  static const ApplicationVersion PARQUET_CPP_FIXED_STATS_VERSION;
   // Regular expression for the version format
   // major . minor . patch unknown - prerelease.x + build info
   // Eg: 1.5.0ab-cdh5.5.0+cd
@@ -92,7 +82,8 @@
   bool VersionEq(const ApplicationVersion& other_version) const;
 
   // Checks if the Version has the correct statistics for a given column
-  bool HasCorrectStatistics(Type::type primitive) const;
+  bool HasCorrectStatistics(Type::type primitive,
+                            SortOrder::type sort_order = SortOrder::SIGNED) const;
 };
 
 class PARQUET_EXPORT ColumnChunkMetaData {
@@ -209,7 +200,7 @@
   // Used when a dataset is spread across multiple files
   void set_file_path(const std::string& path);
   // column metadata
-  void SetStatistics(const EncodedStatistics& stats);
+  void SetStatistics(bool is_signed, const EncodedStatistics& stats);
   // get the column descriptor
   const ColumnDescriptor* descr() const;
   // commit the metadata
diff --git a/src/parquet/parquet_version.h.in b/src/parquet/parquet_version.h.in
index 7036d2f..db8f396 100644
--- a/src/parquet/parquet_version.h.in
+++ b/src/parquet/parquet_version.h.in
@@ -21,4 +21,4 @@
 // define the parquet created by version
 #define CREATED_BY_VERSION "parquet-cpp version @PARQUET_VERSION@"
 
-#endif // PARQUET_VERSION_H
+#endif  // PARQUET_VERSION_H
diff --git a/src/parquet/schema.h b/src/parquet/schema.h
index e240b82..c6b7fbe 100644
--- a/src/parquet/schema.h
+++ b/src/parquet/schema.h
@@ -332,6 +332,10 @@
 
   LogicalType::type logical_type() const { return primitive_node_->logical_type(); }
 
+  SortOrder::type sort_order() const {
+    return GetSortOrder(logical_type(), physical_type());
+  }
+
   const std::string& name() const { return primitive_node_->name(); }
 
   const std::shared_ptr<schema::ColumnPath> path() const;
diff --git a/src/parquet/statistics-test.cc b/src/parquet/statistics-test.cc
index 26352c1..d6c5205 100644
--- a/src/parquet/statistics-test.cc
+++ b/src/parquet/statistics-test.cc
@@ -194,9 +194,8 @@
 }
 
 template <typename TestType>
-typename std::vector<typename TestType::c_type>
-TestRowGroupStatistics<TestType>::GetDeepCopy(
-    const std::vector<typename TestType::c_type>& values) {
+typename std::vector<typename TestType::c_type> TestRowGroupStatistics<
+    TestType>::GetDeepCopy(const std::vector<typename TestType::c_type>& values) {
   return values;
 }
 
@@ -311,6 +310,7 @@
   this->TestMerge();
 }
 
+// Statistics are restricted for few types in older parquet version
 TEST(CorruptStatistics, Basics) {
   ApplicationVersion version("parquet-mr version 1.8.0");
   SchemaDescriptor schema;
@@ -356,5 +356,332 @@
   ASSERT_FALSE(column_chunk6->is_stats_set());
 }
 
+// Statistics for all types have no restrictions in newer parquet version
+TEST(CorrectStatistics, Basics) {
+  ApplicationVersion version("parquet-cpp version 1.3.0");
+  SchemaDescriptor schema;
+  schema::NodePtr node;
+  std::vector<schema::NodePtr> fields;
+  // Test Physical Types
+  fields.push_back(schema::PrimitiveNode::Make("col1", Repetition::OPTIONAL, Type::INT32,
+                                               LogicalType::NONE));
+  fields.push_back(schema::PrimitiveNode::Make("col2", Repetition::OPTIONAL,
+                                               Type::BYTE_ARRAY, LogicalType::NONE));
+  // Test Logical Types
+  fields.push_back(schema::PrimitiveNode::Make("col3", Repetition::OPTIONAL, Type::INT32,
+                                               LogicalType::DATE));
+  fields.push_back(schema::PrimitiveNode::Make("col4", Repetition::OPTIONAL, Type::INT32,
+                                               LogicalType::UINT_32));
+  fields.push_back(schema::PrimitiveNode::Make("col5", Repetition::OPTIONAL,
+                                               Type::FIXED_LEN_BYTE_ARRAY,
+                                               LogicalType::INTERVAL, 12));
+  fields.push_back(schema::PrimitiveNode::Make("col6", Repetition::OPTIONAL,
+                                               Type::BYTE_ARRAY, LogicalType::UTF8));
+  node = schema::GroupNode::Make("schema", Repetition::REQUIRED, fields);
+  schema.Init(node);
+
+  format::ColumnChunk col_chunk;
+  col_chunk.meta_data.__isset.statistics = true;
+  auto column_chunk1 = ColumnChunkMetaData::Make(
+      reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(0), &version);
+  ASSERT_TRUE(column_chunk1->is_stats_set());
+  auto column_chunk2 = ColumnChunkMetaData::Make(
+      reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(1), &version);
+  ASSERT_TRUE(column_chunk2->is_stats_set());
+  auto column_chunk3 = ColumnChunkMetaData::Make(
+      reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(2), &version);
+  ASSERT_TRUE(column_chunk3->is_stats_set());
+  auto column_chunk4 = ColumnChunkMetaData::Make(
+      reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(3), &version);
+  ASSERT_TRUE(column_chunk4->is_stats_set());
+  auto column_chunk5 = ColumnChunkMetaData::Make(
+      reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(4), &version);
+  ASSERT_TRUE(column_chunk5->is_stats_set());
+  auto column_chunk6 = ColumnChunkMetaData::Make(
+      reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(5), &version);
+  ASSERT_TRUE(column_chunk6->is_stats_set());
+}
+
+// Test SortOrder class
+static const int NUM_VALUES = 10;
+
+template <typename TestType>
+class TestStatistics : public ::testing::Test {
+ public:
+  typedef typename TestType::c_type T;
+
+  void AddNodes(std::string name) {
+    fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED,
+                                                  TestType::type_num, LogicalType::NONE));
+  }
+
+  void SetUpSchema() {
+    stats_.resize(fields_.size());
+    values_.resize(NUM_VALUES);
+    schema_ = std::static_pointer_cast<GroupNode>(
+        GroupNode::Make("Schema", Repetition::REQUIRED, fields_));
+
+    parquet_sink_ = std::make_shared<InMemoryOutputStream>();
+  }
+
+  void SetValues();
+
+  void WriteParquet() {
+    // Add writer properties
+    parquet::WriterProperties::Builder builder;
+    builder.compression(parquet::Compression::SNAPPY);
+    builder.created_by("parquet-cpp version 1.3.0");
+    std::shared_ptr<parquet::WriterProperties> props = builder.build();
+
+    // Create a ParquetFileWriter instance
+    auto file_writer = parquet::ParquetFileWriter::Open(parquet_sink_, schema_, props);
+
+    // Append a RowGroup with a specific number of rows.
+    auto rg_writer = file_writer->AppendRowGroup(NUM_VALUES);
+
+    this->SetValues();
+
+    // Insert Values
+    for (int i = 0; i < static_cast<int>(fields_.size()); i++) {
+      auto column_writer =
+          static_cast<parquet::TypedColumnWriter<TestType>*>(rg_writer->NextColumn());
+      column_writer->WriteBatch(NUM_VALUES, nullptr, nullptr, values_.data());
+    }
+  }
+
+  void VerifyParquetStats() {
+    auto pbuffer = parquet_sink_->GetBuffer();
+
+    // Create a ParquetReader instance
+    std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
+        parquet::ParquetFileReader::Open(
+            std::make_shared<arrow::io::BufferReader>(pbuffer));
+
+    // Get the File MetaData
+    std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata();
+    std::shared_ptr<parquet::RowGroupMetaData> rg_metadata = file_metadata->RowGroup(0);
+    for (int i = 0; i < static_cast<int>(fields_.size()); i++) {
+      std::shared_ptr<parquet::ColumnChunkMetaData> cc_metadata =
+          rg_metadata->ColumnChunk(i);
+      ASSERT_EQ(stats_[i].min(), cc_metadata->statistics()->EncodeMin());
+      ASSERT_EQ(stats_[i].max(), cc_metadata->statistics()->EncodeMax());
+    }
+  }
+
+ protected:
+  std::vector<T> values_;
+  std::vector<uint8_t> values_buf_;
+  std::vector<schema::NodePtr> fields_;
+  std::shared_ptr<schema::GroupNode> schema_;
+  std::shared_ptr<InMemoryOutputStream> parquet_sink_;
+  std::vector<EncodedStatistics> stats_;
+};
+
+using CompareTestTypes = ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType,
+                                   DoubleType, ByteArrayType, FLBAType>;
+
+// TYPE::INT32
+template <>
+void TestStatistics<Int32Type>::AddNodes(std::string name) {
+  // UINT_32 logical type to set Unsigned Statistics
+  fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, Type::INT32,
+                                                LogicalType::UINT_32));
+  // INT_32 logical type to set Signed Statistics
+  fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, Type::INT32,
+                                                LogicalType::INT_32));
+}
+
+template <>
+void TestStatistics<Int32Type>::SetValues() {
+  for (int i = 0; i < NUM_VALUES; i++) {
+    values_[i] = i - 5;  // {-5, -4, -3, -2, -1, 0, 1, 2, 3, 4};
+  }
+
+  // Write UINT32 min/max values
+  stats_[0]
+      .set_min(std::string(reinterpret_cast<const char*>(&values_[5]), sizeof(T)))
+      .set_max(std::string(reinterpret_cast<const char*>(&values_[4]), sizeof(T)));
+
+  // Write INT32 min/max values
+  stats_[1]
+      .set_min(std::string(reinterpret_cast<const char*>(&values_[0]), sizeof(T)))
+      .set_max(std::string(reinterpret_cast<const char*>(&values_[9]), sizeof(T)));
+}
+
+// TYPE::INT64
+template <>
+void TestStatistics<Int64Type>::AddNodes(std::string name) {
+  // UINT_64 logical type to set Unsigned Statistics
+  fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, Type::INT64,
+                                                LogicalType::UINT_64));
+  // INT_64 logical type to set Signed Statistics
+  fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, Type::INT64,
+                                                LogicalType::INT_64));
+}
+
+template <>
+void TestStatistics<Int64Type>::SetValues() {
+  for (int i = 0; i < NUM_VALUES; i++) {
+    values_[i] = i - 5;  // {-5, -4, -3, -2, -1, 0, 1, 2, 3, 4};
+  }
+
+  // Write UINT64 min/max values
+  stats_[0]
+      .set_min(std::string(reinterpret_cast<const char*>(&values_[5]), sizeof(T)))
+      .set_max(std::string(reinterpret_cast<const char*>(&values_[4]), sizeof(T)));
+
+  // Write INT64 min/max values
+  stats_[1]
+      .set_min(std::string(reinterpret_cast<const char*>(&values_[0]), sizeof(T)))
+      .set_max(std::string(reinterpret_cast<const char*>(&values_[9]), sizeof(T)));
+}
+
+// TYPE::INT96
+template <>
+void TestStatistics<Int96Type>::AddNodes(std::string name) {
+  // INT96 physical type has only Unsigned Statistics
+  fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, Type::INT96,
+                                                LogicalType::NONE));
+}
+
+template <>
+void TestStatistics<Int96Type>::SetValues() {
+  for (int i = 0; i < NUM_VALUES; i++) {
+    values_[i].value[0] = i - 5;  // {-5, -4, -3, -2, -1, 0, 1, 2, 3, 4};
+    values_[i].value[1] = i - 5;  // {-5, -4, -3, -2, -1, 0, 1, 2, 3, 4};
+    values_[i].value[2] = i - 5;  // {-5, -4, -3, -2, -1, 0, 1, 2, 3, 4};
+  }
+
+  // Write Int96 min/max values
+  stats_[0]
+      .set_min(std::string(reinterpret_cast<const char*>(&values_[5]), sizeof(T)))
+      .set_max(std::string(reinterpret_cast<const char*>(&values_[4]), sizeof(T)));
+}
+
+// TYPE::FLOAT
+template <>
+void TestStatistics<FloatType>::SetValues() {
+  for (int i = 0; i < NUM_VALUES; i++) {
+    values_[i] =
+        (i * 1.0f) - 5;  // {-5.0, -4.0, -3.0, -2.0, -1.0, 0.0, 1.0, 2.0, 3.0, 4.0};
+  }
+
+  // Write Float min/max values
+  stats_[0]
+      .set_min(std::string(reinterpret_cast<const char*>(&values_[0]), sizeof(T)))
+      .set_max(std::string(reinterpret_cast<const char*>(&values_[9]), sizeof(T)));
+}
+
+// TYPE::DOUBLE
+template <>
+void TestStatistics<DoubleType>::SetValues() {
+  for (int i = 0; i < NUM_VALUES; i++) {
+    values_[i] =
+        (i * 1.0f) - 5;  // {-5.0, -4.0, -3.0, -2.0, -1.0, 0.0, 1.0, 2.0, 3.0, 4.0};
+  }
+
+  // Write Double min/max values
+  stats_[0]
+      .set_min(std::string(reinterpret_cast<const char*>(&values_[0]), sizeof(T)))
+      .set_max(std::string(reinterpret_cast<const char*>(&values_[9]), sizeof(T)));
+}
+
+// TYPE::ByteArray
+template <>
+void TestStatistics<ByteArrayType>::AddNodes(std::string name) {
+  // UTF8 logical type to set Unsigned Statistics
+  fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED,
+                                                Type::BYTE_ARRAY, LogicalType::UTF8));
+}
+
+template <>
+void TestStatistics<ByteArrayType>::SetValues() {
+  int max_byte_array_len = 10;
+  size_t nbytes = NUM_VALUES * max_byte_array_len;
+  values_buf_.resize(nbytes);
+  std::vector<std::string> vals = {u8"c123", u8"b123", u8"a123", u8"d123", u8"e123",
+                                   u8"f123", u8"g123", u8"h123", u8"i123", u8"ü123"};
+
+  uint8_t* base = &values_buf_.data()[0];
+  for (int i = 0; i < NUM_VALUES; i++) {
+    memcpy(base, vals[i].c_str(), vals[i].length());
+    values_[i].ptr = base;
+    values_[i].len = static_cast<uint32_t>(vals[i].length());
+    base += vals[i].length();
+  }
+
+  // Write String min/max values
+  stats_[0]
+      .set_min(
+          std::string(reinterpret_cast<const char*>(vals[2].c_str()), vals[2].length()))
+      .set_max(
+          std::string(reinterpret_cast<const char*>(vals[9].c_str()), vals[9].length()));
+}
+
+// TYPE::FLBAArray
+template <>
+void TestStatistics<FLBAType>::AddNodes(std::string name) {
+  // FLBA has only Unsigned Statistics
+  fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED,
+                                                Type::FIXED_LEN_BYTE_ARRAY,
+                                                LogicalType::NONE, FLBA_LENGTH));
+}
+
+template <>
+void TestStatistics<FLBAType>::SetValues() {
+  size_t nbytes = NUM_VALUES * FLBA_LENGTH;
+  values_buf_.resize(nbytes);
+  char vals[NUM_VALUES][FLBA_LENGTH] = {"b12345", "a12345", "c12345", "d12345", "e12345",
+                                        "f12345", "g12345", "h12345", "z12345", "a12345"};
+
+  uint8_t* base = &values_buf_.data()[0];
+  for (int i = 0; i < NUM_VALUES; i++) {
+    memcpy(base, &vals[i][0], FLBA_LENGTH);
+    values_[i].ptr = base;
+    base += FLBA_LENGTH;
+  }
+
+  // Write FLBA min,max values
+  stats_[0]
+      .set_min(std::string(reinterpret_cast<const char*>(&vals[1][0]), FLBA_LENGTH))
+      .set_max(std::string(reinterpret_cast<const char*>(&vals[8][0]), FLBA_LENGTH));
+}
+
+TYPED_TEST_CASE(TestStatistics, CompareTestTypes);
+
+TYPED_TEST(TestStatistics, MinMax) {
+  this->AddNodes("Column ");
+  this->SetUpSchema();
+  this->WriteParquet();
+  this->VerifyParquetStats();
+}
+
+// Ensure UNKNOWN sort order is handled properly
+using TestStatisticsFLBA = TestStatistics<FLBAType>;
+
+TEST_F(TestStatisticsFLBA, UnknownSortOrder) {
+  this->fields_.push_back(schema::PrimitiveNode::Make(
+      "Column 0", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY, LogicalType::INTERVAL,
+      FLBA_LENGTH));
+  this->SetUpSchema();
+  this->WriteParquet();
+
+  auto pbuffer = parquet_sink_->GetBuffer();
+  // Create a ParquetReader instance
+  std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
+      parquet::ParquetFileReader::Open(
+          std::make_shared<arrow::io::BufferReader>(pbuffer));
+  // Get the File MetaData
+  std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata();
+  std::shared_ptr<parquet::RowGroupMetaData> rg_metadata = file_metadata->RowGroup(0);
+  std::shared_ptr<parquet::ColumnChunkMetaData> cc_metadata = rg_metadata->ColumnChunk(0);
+
+  // stats should not be set for UNKNOWN sort order
+  ASSERT_FALSE(cc_metadata->is_stats_set());
+}
+
+
+
+
 }  // namespace test
 }  // namespace parquet
diff --git a/src/parquet/statistics.cc b/src/parquet/statistics.cc
index 12d1f5b..dad1a9b 100644
--- a/src/parquet/statistics.cc
+++ b/src/parquet/statistics.cc
@@ -21,7 +21,6 @@
 #include "parquet/encoding-internal.h"
 #include "parquet/exception.h"
 #include "parquet/statistics.h"
-#include "parquet/util/comparison.h"
 #include "parquet/util/memory.h"
 
 using arrow::default_memory_pool;
@@ -86,6 +85,12 @@
 }
 
 template <typename DType>
+void TypedRowGroupStatistics<DType>::SetComparator() {
+  comparator_ =
+      std::static_pointer_cast<CompareDefault<DType> >(Comparator::Make(descr_));
+}
+
+template <typename DType>
 void TypedRowGroupStatistics<DType>::Reset() {
   ResetCounts();
   has_min_max_ = false;
@@ -102,15 +107,17 @@
   // TODO: support distinct count?
   if (num_not_null == 0) return;
 
-  Compare<T> compare(descr_);
-  auto batch_minmax = std::minmax_element(values, values + num_not_null, compare);
+  auto batch_minmax =
+      std::minmax_element(values, values + num_not_null, std::ref(*(this->comparator_)));
   if (!has_min_max_) {
     has_min_max_ = true;
     Copy(*batch_minmax.first, &min_, min_buffer_.get());
     Copy(*batch_minmax.second, &max_, max_buffer_.get());
   } else {
-    Copy(std::min(min_, *batch_minmax.first, compare), &min_, min_buffer_.get());
-    Copy(std::max(max_, *batch_minmax.second, compare), &max_, max_buffer_.get());
+    Copy(std::min(min_, *batch_minmax.first, std::ref(*(this->comparator_))), &min_,
+         min_buffer_.get());
+    Copy(std::max(max_, *batch_minmax.second, std::ref(*(this->comparator_))), &max_,
+         max_buffer_.get());
   }
 }
 
@@ -128,7 +135,6 @@
   // TODO: support distinct count?
   if (num_not_null == 0) return;
 
-  Compare<T> compare(descr_);
   INIT_BITSET(valid_bits, static_cast<int>(valid_bits_offset));
   // Find first valid entry and use that for min/max
   // As (num_not_null != 0) there must be one
@@ -144,9 +150,9 @@
   T max = values[i];
   for (; i < length; i++) {
     if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
-      if (compare(values[i], min)) {
+      if ((std::ref(*(this->comparator_)))(values[i], min)) {
         min = values[i];
-      } else if (compare(max, values[i])) {
+      } else if ((std::ref(*(this->comparator_)))(max, values[i])) {
         max = values[i];
       }
     }
@@ -157,8 +163,8 @@
     Copy(min, &min_, min_buffer_.get());
     Copy(max, &max_, max_buffer_.get());
   } else {
-    Copy(std::min(min_, min, compare), &min_, min_buffer_.get());
-    Copy(std::max(max_, max, compare), &max_, max_buffer_.get());
+    Copy(std::min(min_, min, std::ref(*(this->comparator_))), &min_, min_buffer_.get());
+    Copy(std::max(max_, max, std::ref(*(this->comparator_))), &max_, max_buffer_.get());
   }
 }
 
@@ -185,9 +191,10 @@
     return;
   }
 
-  Compare<T> compare(descr_);
-  Copy(std::min(this->min_, other.min_, compare), &this->min_, min_buffer_.get());
-  Copy(std::max(this->max_, other.max_, compare), &this->max_, max_buffer_.get());
+  Copy(std::min(this->min_, other.min_, std::ref(*(this->comparator_))), &this->min_,
+       min_buffer_.get());
+  Copy(std::max(this->max_, other.max_, std::ref(*(this->comparator_))), &this->max_,
+       max_buffer_.get());
 }
 
 template <typename DType>
diff --git a/src/parquet/statistics.h b/src/parquet/statistics.h
index 12d0555..b5466c0 100644
--- a/src/parquet/statistics.h
+++ b/src/parquet/statistics.h
@@ -24,6 +24,7 @@
 
 #include "parquet/schema.h"
 #include "parquet/types.h"
+#include "parquet/util/comparison.h"
 #include "parquet/util/memory.h"
 #include "parquet/util/visibility.h"
 
@@ -97,13 +98,19 @@
 
   virtual EncodedStatistics Encode() = 0;
 
+  // Set the Corresponding Comparator
+  virtual void SetComparator() = 0;
+
   virtual ~RowGroupStatistics() {}
 
   Type::type physical_type() const { return descr_->physical_type(); }
 
  protected:
   const ColumnDescriptor* descr() const { return descr_; }
-  void SetDescr(const ColumnDescriptor* schema) { descr_ = schema; }
+  void SetDescr(const ColumnDescriptor* schema) {
+    descr_ = schema;
+    SetComparator();
+  }
 
   void IncrementNullCount(int64_t n) { statistics_.null_count += n; }
 
@@ -146,6 +153,7 @@
 
   bool HasMinMax() const override;
   void Reset() override;
+  void SetComparator() override;
   void Merge(const TypedRowGroupStatistics<DType>& other);
 
   void Update(const T* values, int64_t num_not_null, int64_t num_null);
@@ -164,6 +172,7 @@
   T min_;
   T max_;
   ::arrow::MemoryPool* pool_;
+  std::shared_ptr<CompareDefault<DType> > comparator_;
 
   void PlainEncode(const T& src, std::string* dst);
   void PlainDecode(const std::string& src, T* dst);
diff --git a/src/parquet/test-util.h b/src/parquet/test-util.h
index 356486b..3fd72f2 100644
--- a/src/parquet/test-util.h
+++ b/src/parquet/test-util.h
@@ -42,7 +42,7 @@
 
 namespace parquet {
 
-static int FLBA_LENGTH = 12;
+static constexpr int FLBA_LENGTH = 12;
 
 bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) {
   return 0 == memcmp(a.ptr, b.ptr, FLBA_LENGTH);
diff --git a/src/parquet/types.cc b/src/parquet/types.cc
index 97c769b..0652c6a 100644
--- a/src/parquet/types.cc
+++ b/src/parquet/types.cc
@@ -221,4 +221,56 @@
   return 0;
 }
 
+// Return the Sort Order of the Parquet Physical Types
+SortOrder::type DefaultSortOrder(Type::type primitive) {
+  switch (primitive) {
+    case Type::BOOLEAN:
+    case Type::INT32:
+    case Type::INT64:
+    case Type::FLOAT:
+    case Type::DOUBLE:
+      return SortOrder::SIGNED;
+    case Type::BYTE_ARRAY:
+    case Type::FIXED_LEN_BYTE_ARRAY:
+    case Type::INT96:  // only used for timestamp, which uses unsigned values
+      return SortOrder::UNSIGNED;
+  }
+  return SortOrder::UNKNOWN;
+}
+
+// Return the SortOrder of the Parquet Types using Logical or Physical Types
+SortOrder::type GetSortOrder(LogicalType::type converted, Type::type primitive) {
+  if (converted == LogicalType::NONE) return DefaultSortOrder(primitive);
+  switch (converted) {
+    case LogicalType::INT_8:
+    case LogicalType::INT_16:
+    case LogicalType::INT_32:
+    case LogicalType::INT_64:
+    case LogicalType::DATE:
+    case LogicalType::TIME_MICROS:
+    case LogicalType::TIME_MILLIS:
+    case LogicalType::TIMESTAMP_MICROS:
+    case LogicalType::TIMESTAMP_MILLIS:
+      return SortOrder::SIGNED;
+    case LogicalType::UINT_8:
+    case LogicalType::UINT_16:
+    case LogicalType::UINT_32:
+    case LogicalType::UINT_64:
+    case LogicalType::ENUM:
+    case LogicalType::UTF8:
+    case LogicalType::BSON:
+    case LogicalType::JSON:
+      return SortOrder::UNSIGNED;
+    case LogicalType::DECIMAL:
+    case LogicalType::LIST:
+    case LogicalType::MAP:
+    case LogicalType::MAP_KEY_VALUE:
+    case LogicalType::INTERVAL:
+    case LogicalType::NONE:  // required instead of default
+    case LogicalType::NA:    // required instead of default
+      return SortOrder::UNKNOWN;
+  }
+  return SortOrder::UNKNOWN;
+}
+
 }  // namespace parquet
diff --git a/src/parquet/types.h b/src/parquet/types.h
index 38015c4..af3a58f 100644
--- a/src/parquet/types.h
+++ b/src/parquet/types.h
@@ -116,6 +116,19 @@
   enum type { DATA_PAGE, INDEX_PAGE, DICTIONARY_PAGE, DATA_PAGE_V2 };
 };
 
+// Reference:
+// parquet-mr/parquet-hadoop/src/main/java/org/apache/parquet/
+//                            format/converter/ParquetMetadataConverter.java
+// Sort order for page and column statistics. Types are associated with sort
+// orders (e.g., UTF8 columns should use UNSIGNED) and column stats are
+// aggregated using a sort order. As of parquet-format version 2.3.1, the
+// order used to aggregate stats is always SIGNED and is not stored in the
+// Parquet file. These stats are discarded for types that need unsigned.
+// See PARQUET-686.
+struct SortOrder {
+  enum type { SIGNED, UNSIGNED, UNKNOWN };
+};
+
 // ----------------------------------------------------------------------
 
 struct ByteArray {
@@ -283,6 +296,11 @@
 
 PARQUET_EXPORT int GetTypeByteSize(Type::type t);
 
+PARQUET_EXPORT SortOrder::type DefaultSortOrder(Type::type primitive);
+
+PARQUET_EXPORT SortOrder::type GetSortOrder(LogicalType::type converted,
+                                            Type::type primitive);
+
 }  // namespace parquet
 
 #endif  // PARQUET_TYPES_H
diff --git a/src/parquet/util/comparison-test.cc b/src/parquet/util/comparison-test.cc
index 8401983..dc915bf 100644
--- a/src/parquet/util/comparison-test.cc
+++ b/src/parquet/util/comparison-test.cc
@@ -42,48 +42,178 @@
   return FLBA(ptr);
 }
 
-TEST(Comparison, ByteArray) {
-  NodePtr node = PrimitiveNode::Make("bytearray", Repetition::REQUIRED, Type::BYTE_ARRAY);
-  ColumnDescriptor descr(node, 0, 0);
-  Compare<parquet::ByteArray> less(&descr);
-
-  std::string a = "arrange";
-  std::string b = "arrangement";
-  auto arr1 = ByteArrayFromString(a);
-  auto arr2 = ByteArrayFromString(b);
-  ASSERT_TRUE(less(arr1, arr2));
-
-  a = u8"braten";
-  b = u8"bügeln";
-  auto arr3 = ByteArrayFromString(a);
-  auto arr4 = ByteArrayFromString(b);
-  // see PARQUET-686 discussion about binary comparison
-  ASSERT_TRUE(!less(arr3, arr4));
-}
-
-TEST(Comparison, FLBA) {
-  std::string a = "Antidisestablishmentarianism";
-  std::string b = "Bundesgesundheitsministerium";
-  auto arr1 = FLBAFromString(a);
-  auto arr2 = FLBAFromString(b);
-
+TEST(Comparison, signedByteArray) {
   NodePtr node =
-      PrimitiveNode::Make("FLBA", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY,
-                          LogicalType::NONE, static_cast<int>(a.size()));
+      PrimitiveNode::Make("SignedByteArray", Repetition::REQUIRED, Type::BYTE_ARRAY);
   ColumnDescriptor descr(node, 0, 0);
-  Compare<parquet::FixedLenByteArray> less(&descr);
-  ASSERT_TRUE(less(arr1, arr2));
+
+  CompareDefaultByteArray less;
+
+  std::string s1 = "12345";
+  std::string s2 = "12345678";
+  ByteArray s1ba = ByteArrayFromString(s1);
+  ByteArray s2ba = ByteArrayFromString(s2);
+  ASSERT_TRUE(less(s1ba, s2ba));
+
+  // This is case where signed comparision UTF-8 (PARQUET-686) is incorrect
+  // This example is to only check signed comparison and not UTF-8.
+  s1 = u8"bügeln";
+  s2 = u8"braten";
+  s1ba = ByteArrayFromString(s1);
+  s2ba = ByteArrayFromString(s2);
+  ASSERT_TRUE(less(s1ba, s2ba));
 }
 
-TEST(Comparison, Int96) {
-  parquet::Int96 a{{1, 41, 14}}, b{{1, 41, 42}};
-
-  NodePtr node = PrimitiveNode::Make("int96", Repetition::REQUIRED, Type::INT96);
+TEST(Comparison, UnsignedByteArray) {
+  NodePtr node = PrimitiveNode::Make("UnsignedByteArray", Repetition::REQUIRED,
+                                     Type::BYTE_ARRAY, LogicalType::UTF8);
   ColumnDescriptor descr(node, 0, 0);
-  Compare<parquet::Int96> less(&descr);
+
+  // Check if UTF-8 is compared using unsigned correctly
+  CompareUnsignedByteArray uless;
+
+  std::string s1 = "arrange";
+  std::string s2 = "arrangement";
+  ByteArray s1ba = ByteArrayFromString(s1);
+  ByteArray s2ba = ByteArrayFromString(s2);
+  ASSERT_TRUE(uless(s1ba, s2ba));
+
+  // Multi-byte UTF-8 characters
+  s1 = u8"braten";
+  s2 = u8"bügeln";
+  s1ba = ByteArrayFromString(s1);
+  s2ba = ByteArrayFromString(s2);
+  ASSERT_TRUE(uless(s1ba, s2ba));
+
+  s1 = u8"ünk123456";  // ü = 252
+  s2 = u8"ănk123456";  // ă = 259
+  s1ba = ByteArrayFromString(s1);
+  s2ba = ByteArrayFromString(s2);
+  ASSERT_TRUE(uless(s1ba, s2ba));
+}
+
+TEST(Comparison, SignedFLBA) {
+  int size = 10;
+  NodePtr node = PrimitiveNode::Make("SignedFLBA", Repetition::REQUIRED,
+                                     Type::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE, size);
+  ColumnDescriptor descr(node, 0, 0);
+
+  CompareDefaultFLBA less(descr.type_length());
+
+  std::string s1 = "Anti123456";
+  std::string s2 = "Bunkd123456";
+  FLBA s1flba = FLBAFromString(s1);
+  FLBA s2flba = FLBAFromString(s2);
+  ASSERT_TRUE(less(s1flba, s2flba));
+
+  s1 = "Bünk123456";
+  s2 = "Bunk123456";
+  s1flba = FLBAFromString(s1);
+  s2flba = FLBAFromString(s2);
+  ASSERT_TRUE(less(s1flba, s2flba));
+}
+
+TEST(Comparison, UnsignedFLBA) {
+  int size = 10;
+  NodePtr node = PrimitiveNode::Make("UnsignedFLBA", Repetition::REQUIRED,
+                                     Type::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE, size);
+  ColumnDescriptor descr(node, 0, 0);
+
+  CompareUnsignedFLBA uless(descr.type_length());
+
+  std::string s1 = "Anti123456";
+  std::string s2 = "Bunkd123456";
+  FLBA s1flba = FLBAFromString(s1);
+  FLBA s2flba = FLBAFromString(s2);
+  ASSERT_TRUE(uless(s1flba, s2flba));
+
+  s1 = "Bunk123456";
+  s2 = "Bünk123456";
+  s1flba = FLBAFromString(s1);
+  s2flba = FLBAFromString(s2);
+  ASSERT_TRUE(uless(s1flba, s2flba));
+}
+
+TEST(Comparison, SignedInt96) {
+  parquet::Int96 a{{1, 41, 14}}, b{{1, 41, 42}};
+  parquet::Int96 aa{{1, 41, 14}}, bb{{1, 41, 14}};
+  parquet::Int96 aaa{{static_cast<uint32_t>(-1), 41, 14}}, bbb{{1, 41, 42}};
+
+  NodePtr node = PrimitiveNode::Make("SignedInt96", Repetition::REQUIRED, Type::INT96);
+  ColumnDescriptor descr(node, 0, 0);
+
+  CompareDefaultInt96 less;
+
   ASSERT_TRUE(less(a, b));
-  b.value[2] = 14;
-  ASSERT_TRUE(!less(a, b) && !less(b, a));
+  ASSERT_TRUE(!less(aa, bb) && !less(bb, aa));
+  ASSERT_TRUE(less(aaa, bbb));
+}
+
+TEST(Comparison, UnsignedInt96) {
+  parquet::Int96 a{{1, 41, 14}}, b{{1, static_cast<uint32_t>(-41), 42}};
+  parquet::Int96 aa{{1, 41, 14}}, bb{{static_cast<uint32_t>(-1), 41, 14}};
+
+  NodePtr node = PrimitiveNode::Make("UnsignedInt96", Repetition::REQUIRED, Type::INT96);
+  ColumnDescriptor descr(node, 0, 0);
+
+  CompareUnsignedInt96 uless;
+
+  ASSERT_TRUE(uless(a, b));
+  ASSERT_TRUE(uless(aa, bb));
+}
+
+TEST(Comparison, SignedInt64) {
+  int64_t a = 1, b = 4;
+  int64_t aa = 1, bb = 1;
+  int64_t aaa = -1, bbb = 1;
+
+  NodePtr node = PrimitiveNode::Make("SignedInt64", Repetition::REQUIRED, Type::INT64);
+  ColumnDescriptor descr(node, 0, 0);
+
+  CompareDefaultInt64 less;
+
+  ASSERT_TRUE(less(a, b));
+  ASSERT_TRUE(!less(aa, bb) && !less(bb, aa));
+  ASSERT_TRUE(less(aaa, bbb));
+}
+
+TEST(Comparison, UnsignedInt64) {
+  uint64_t a = 1, b = 4;
+  uint64_t aa = 1, bb = 1;
+  uint64_t aaa = 1, bbb = -1;
+
+  NodePtr node = PrimitiveNode::Make("UnsignedInt64", Repetition::REQUIRED, Type::INT64);
+  ColumnDescriptor descr(node, 0, 0);
+
+  CompareUnsignedInt64 less;
+
+  ASSERT_TRUE(less(a, b));
+  ASSERT_TRUE(!less(aa, bb) && !less(bb, aa));
+  ASSERT_TRUE(less(aaa, bbb));
+}
+
+TEST(Comparison, UnsignedInt32) {
+  uint32_t a = 1, b = 4;
+  uint32_t aa = 1, bb = 1;
+  uint32_t aaa = 1, bbb = -1;
+
+  NodePtr node = PrimitiveNode::Make("UnsignedInt32", Repetition::REQUIRED, Type::INT32);
+  ColumnDescriptor descr(node, 0, 0);
+
+  CompareUnsignedInt32 less;
+
+  ASSERT_TRUE(less(a, b));
+  ASSERT_TRUE(!less(aa, bb) && !less(bb, aa));
+  ASSERT_TRUE(less(aaa, bbb));
+}
+
+TEST(Comparison, UnknownSortOrder) {
+  NodePtr node =
+      PrimitiveNode::Make("Unknown", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY,
+                          LogicalType::INTERVAL, 12);
+  ColumnDescriptor descr(node, 0, 0);
+
+  ASSERT_THROW(Comparator::Make(&descr), ParquetException);
 }
 
 }  // namespace test
diff --git a/src/parquet/util/comparison.cc b/src/parquet/util/comparison.cc
new file mode 100644
index 0000000..1d7bb9d
--- /dev/null
+++ b/src/parquet/util/comparison.cc
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+
+#include "parquet/exception.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+#include "parquet/util/comparison.h"
+
+namespace parquet {
+
+std::shared_ptr<Comparator> Comparator::Make(const ColumnDescriptor* descr) {
+  if (SortOrder::SIGNED == descr->sort_order()) {
+    switch (descr->physical_type()) {
+      case Type::BOOLEAN:
+        return std::make_shared<CompareDefaultBoolean>();
+      case Type::INT32:
+        return std::make_shared<CompareDefaultInt32>();
+      case Type::INT64:
+        return std::make_shared<CompareDefaultInt64>();
+      case Type::INT96:
+        return std::make_shared<CompareDefaultInt96>();
+      case Type::FLOAT:
+        return std::make_shared<CompareDefaultFloat>();
+      case Type::DOUBLE:
+        return std::make_shared<CompareDefaultDouble>();
+      case Type::BYTE_ARRAY:
+        return std::make_shared<CompareDefaultByteArray>();
+      case Type::FIXED_LEN_BYTE_ARRAY:
+        return std::make_shared<CompareDefaultFLBA>(descr->type_length());
+      default:
+        ParquetException::NYI("Signed Compare not implemented");
+    }
+  } else if (SortOrder::UNSIGNED == descr->sort_order()) {
+    switch (descr->physical_type()) {
+      case Type::INT32:
+        return std::make_shared<CompareUnsignedInt32>();
+      case Type::INT64:
+        return std::make_shared<CompareUnsignedInt64>();
+      case Type::INT96:
+        return std::make_shared<CompareUnsignedInt96>();
+      case Type::BYTE_ARRAY:
+        return std::make_shared<CompareUnsignedByteArray>();
+      case Type::FIXED_LEN_BYTE_ARRAY:
+        return std::make_shared<CompareUnsignedFLBA>(descr->type_length());
+      default:
+        ParquetException::NYI("Unsigned Compare not implemented");
+    }
+  } else {
+    throw ParquetException("UNKNOWN Sort Order");
+  }
+  return nullptr;
+}
+
+template class PARQUET_TEMPLATE_EXPORT CompareDefault<BooleanType>;
+template class PARQUET_TEMPLATE_EXPORT CompareDefault<Int32Type>;
+template class PARQUET_TEMPLATE_EXPORT CompareDefault<Int64Type>;
+template class PARQUET_TEMPLATE_EXPORT CompareDefault<Int96Type>;
+template class PARQUET_TEMPLATE_EXPORT CompareDefault<FloatType>;
+template class PARQUET_TEMPLATE_EXPORT CompareDefault<DoubleType>;
+template class PARQUET_TEMPLATE_EXPORT CompareDefault<ByteArrayType>;
+template class PARQUET_TEMPLATE_EXPORT CompareDefault<FLBAType>;
+
+}  // namespace parquet
diff --git a/src/parquet/util/comparison.h b/src/parquet/util/comparison.h
index edd3df1..803f0da 100644
--- a/src/parquet/util/comparison.h
+++ b/src/parquet/util/comparison.h
@@ -20,40 +20,145 @@
 
 #include <algorithm>
 
+#include "parquet/exception.h"
 #include "parquet/schema.h"
 #include "parquet/types.h"
 
 namespace parquet {
 
-template <typename T>
-struct Compare {
-  explicit Compare(const ColumnDescriptor* descr) : type_length_(descr->type_length()) {}
+class PARQUET_EXPORT Comparator {
+ public:
+  virtual ~Comparator() {}
+  static std::shared_ptr<Comparator> Make(const ColumnDescriptor* descr);
+};
 
-  inline bool operator()(const T& a, const T& b) { return a < b; }
-
- private:
-  int32_t type_length_;
+// The default comparison is SIGNED
+template <typename DType>
+class PARQUET_EXPORT CompareDefault : public Comparator {
+ public:
+  typedef typename DType::c_type T;
+  CompareDefault() {}
+  virtual ~CompareDefault() {}
+  virtual bool operator()(const T& a, const T& b) { return a < b; }
 };
 
 template <>
-inline bool Compare<Int96>::operator()(const Int96& a, const Int96& b) {
-  return std::lexicographical_compare(a.value, a.value + 3, b.value, b.value + 3);
-}
+class PARQUET_EXPORT CompareDefault<Int96Type> : public Comparator {
+ public:
+  CompareDefault() {}
+  virtual ~CompareDefault() {}
+  virtual bool operator()(const Int96& a, const Int96& b) {
+    const int32_t* aptr = reinterpret_cast<const int32_t*>(&a.value[0]);
+    const int32_t* bptr = reinterpret_cast<const int32_t*>(&b.value[0]);
+    return std::lexicographical_compare(aptr, aptr + 3, bptr, bptr + 3);
+  }
+};
 
 template <>
-inline bool Compare<ByteArray>::operator()(const ByteArray& a, const ByteArray& b) {
-  auto aptr = reinterpret_cast<const int8_t*>(a.ptr);
-  auto bptr = reinterpret_cast<const int8_t*>(b.ptr);
-  return std::lexicographical_compare(aptr, aptr + a.len, bptr, bptr + b.len);
-}
+class PARQUET_EXPORT CompareDefault<ByteArrayType> : public Comparator {
+ public:
+  CompareDefault() {}
+  virtual ~CompareDefault() {}
+  virtual bool operator()(const ByteArray& a, const ByteArray& b) {
+    const int8_t* aptr = reinterpret_cast<const int8_t*>(a.ptr);
+    const int8_t* bptr = reinterpret_cast<const int8_t*>(b.ptr);
+    return std::lexicographical_compare(aptr, aptr + a.len, bptr, bptr + b.len);
+  }
+};
 
 template <>
-inline bool Compare<FLBA>::operator()(const FLBA& a, const FLBA& b) {
-  auto aptr = reinterpret_cast<const int8_t*>(a.ptr);
-  auto bptr = reinterpret_cast<const int8_t*>(b.ptr);
-  return std::lexicographical_compare(aptr, aptr + type_length_, bptr,
-                                      bptr + type_length_);
-}
+class PARQUET_EXPORT CompareDefault<FLBAType> : public Comparator {
+ public:
+  explicit CompareDefault(int length) : type_length_(length) {}
+  virtual ~CompareDefault() {}
+  virtual bool operator()(const FLBA& a, const FLBA& b) {
+    const int8_t* aptr = reinterpret_cast<const int8_t*>(a.ptr);
+    const int8_t* bptr = reinterpret_cast<const int8_t*>(b.ptr);
+    return std::lexicographical_compare(aptr, aptr + type_length_, bptr,
+                                        bptr + type_length_);
+  }
+  int32_t type_length_;
+};
+
+typedef CompareDefault<BooleanType> CompareDefaultBoolean;
+typedef CompareDefault<Int32Type> CompareDefaultInt32;
+typedef CompareDefault<Int64Type> CompareDefaultInt64;
+typedef CompareDefault<Int96Type> CompareDefaultInt96;
+typedef CompareDefault<FloatType> CompareDefaultFloat;
+typedef CompareDefault<DoubleType> CompareDefaultDouble;
+typedef CompareDefault<ByteArrayType> CompareDefaultByteArray;
+typedef CompareDefault<FLBAType> CompareDefaultFLBA;
+
+#if defined(__GNUC__) && !defined(__clang__)
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wattributes"
+#endif
+
+PARQUET_EXTERN_TEMPLATE CompareDefault<BooleanType>;
+PARQUET_EXTERN_TEMPLATE CompareDefault<Int32Type>;
+PARQUET_EXTERN_TEMPLATE CompareDefault<Int64Type>;
+PARQUET_EXTERN_TEMPLATE CompareDefault<Int96Type>;
+PARQUET_EXTERN_TEMPLATE CompareDefault<FloatType>;
+PARQUET_EXTERN_TEMPLATE CompareDefault<DoubleType>;
+PARQUET_EXTERN_TEMPLATE CompareDefault<ByteArrayType>;
+PARQUET_EXTERN_TEMPLATE CompareDefault<FLBAType>;
+
+#if defined(__GNUC__) && !defined(__clang__)
+#pragma GCC diagnostic pop
+#endif
+
+// Define Unsigned Comparators
+class PARQUET_EXPORT CompareUnsignedInt32 : public CompareDefaultInt32 {
+ public:
+  virtual ~CompareUnsignedInt32() {}
+  bool operator()(const int32_t& a, const int32_t& b) override {
+    const uint32_t ua = a;
+    const uint32_t ub = b;
+    return (ua < ub);
+  }
+};
+
+class PARQUET_EXPORT CompareUnsignedInt64 : public CompareDefaultInt64 {
+ public:
+  virtual ~CompareUnsignedInt64() {}
+  bool operator()(const int64_t& a, const int64_t& b) override {
+    const uint64_t ua = a;
+    const uint64_t ub = b;
+    return (ua < ub);
+  }
+};
+
+class PARQUET_EXPORT CompareUnsignedInt96 : public CompareDefaultInt96 {
+ public:
+  virtual ~CompareUnsignedInt96() {}
+  bool operator()(const Int96& a, const Int96& b) override {
+    const uint32_t* aptr = reinterpret_cast<const uint32_t*>(&a.value[0]);
+    const uint32_t* bptr = reinterpret_cast<const uint32_t*>(&b.value[0]);
+    return std::lexicographical_compare(aptr, aptr + 3, bptr, bptr + 3);
+  }
+};
+
+class PARQUET_EXPORT CompareUnsignedByteArray : public CompareDefaultByteArray {
+ public:
+  virtual ~CompareUnsignedByteArray() {}
+  bool operator()(const ByteArray& a, const ByteArray& b) override {
+    const uint8_t* aptr = reinterpret_cast<const uint8_t*>(a.ptr);
+    const uint8_t* bptr = reinterpret_cast<const uint8_t*>(b.ptr);
+    return std::lexicographical_compare(aptr, aptr + a.len, bptr, bptr + b.len);
+  }
+};
+
+class PARQUET_EXPORT CompareUnsignedFLBA : public CompareDefaultFLBA {
+ public:
+  explicit CompareUnsignedFLBA(int length) : CompareDefaultFLBA(length) {}
+  virtual ~CompareUnsignedFLBA() {}
+  bool operator()(const FLBA& a, const FLBA& b) override {
+    const uint8_t* aptr = reinterpret_cast<const uint8_t*>(a.ptr);
+    const uint8_t* bptr = reinterpret_cast<const uint8_t*>(b.ptr);
+    return std::lexicographical_compare(aptr, aptr + type_length_, bptr,
+                                        bptr + type_length_);
+  }
+};
 
 }  // namespace parquet