PARQUET-1193: [CPP] Implement ColumnOrder to support min_value and max_value

Changes:

1. Update parquet.thrift format
2. Add ColumnOrder Implementation
3. Make Int96 sort order UNKNOWN

Author: Deepak Majeti <deepak.majeti@hpe.com>

Closes #430 from majetideepak/PARQUET-1193 and squashes the following commits:

d31df36 [Deepak Majeti] Fix unused function Warning
4ed405f [Deepak Majeti] Add comments
dec58ca [Deepak Majeti] clang-format
2cd9f11 [Deepak Majeti] Make Int96 sort order UNKNOWN
ff41b3c [Deepak Majeti] Add ColumnOrder Implementation
6221cba [Deepak Majeti] Pull updated parquet.thrift format
diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc
index 7e5dc82..224a23d 100644
--- a/src/parquet/column_writer-test.cc
+++ b/src/parquet/column_writer-test.cc
@@ -219,6 +219,24 @@
 }
 
 template <>
+void TestPrimitiveWriter<Int96Type>::ReadAndCompare(Compression::type compression,
+                                                    int64_t num_rows) {
+  this->SetupValuesOut(num_rows);
+  this->ReadColumnFully(compression);
+  std::shared_ptr<CompareDefault<Int96Type>> compare;
+  compare = std::make_shared<CompareDefaultInt96>();
+  for (size_t i = 0; i < this->values_.size(); i++) {
+    if ((*compare)(this->values_[i], this->values_out_[i]) ||
+        (*compare)(this->values_out_[i], this->values_[i])) {
+      std::cout << "Failed at " << i << std::endl;
+    }
+    ASSERT_FALSE((*compare)(this->values_[i], this->values_out_[i]));
+    ASSERT_FALSE((*compare)(this->values_out_[i], this->values_[i]));
+  }
+  ASSERT_EQ(this->values_, this->values_out_);
+}
+
+template <>
 void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compression) {
   int64_t total_values = static_cast<int64_t>(this->values_out_.size());
   BuildReader(total_values, compression);
diff --git a/src/parquet/metadata-test.cc b/src/parquet/metadata-test.cc
index b20293b..53653bd 100644
--- a/src/parquet/metadata-test.cc
+++ b/src/parquet/metadata-test.cc
@@ -219,7 +219,7 @@
 
   ASSERT_EQ(true, version.VersionLt(version1));
 
-  ASSERT_FALSE(version1.HasCorrectStatistics(Type::INT96, SortOrder::SIGNED));
+  ASSERT_FALSE(version1.HasCorrectStatistics(Type::INT96, SortOrder::UNKNOWN));
   ASSERT_TRUE(version.HasCorrectStatistics(Type::INT32, SortOrder::SIGNED));
   ASSERT_FALSE(version.HasCorrectStatistics(Type::BYTE_ARRAY, SortOrder::SIGNED));
   ASSERT_TRUE(version1.HasCorrectStatistics(Type::BYTE_ARRAY, SortOrder::SIGNED));
diff --git a/src/parquet/metadata.cc b/src/parquet/metadata.cc
index 1c7db86..91304cf 100644
--- a/src/parquet/metadata.cc
+++ b/src/parquet/metadata.cc
@@ -41,8 +41,8 @@
 template <typename DType>
 static std::shared_ptr<RowGroupStatistics> MakeTypedColumnStats(
     const format::ColumnMetaData& metadata, const ColumnDescriptor* descr) {
-  // If new fields max_value/min_value are set, then return them.
-  if (metadata.statistics.__isset.max_value || metadata.statistics.__isset.min_value) {
+  // If ColumnOrder is defined, return max_value and min_value
+  if (descr->column_order().get_order() == ColumnOrder::TYPE_DEFINED_ORDER) {
     return std::make_shared<TypedRowGroupStatistics<DType>>(
         descr, metadata.statistics.min_value, metadata.statistics.max_value,
         metadata.num_values - metadata.statistics.null_count,
@@ -310,6 +310,7 @@
     }
 
     InitSchema();
+    InitColumnOrders();
     InitKeyValueMetadata();
   }
   ~FileMetaDataImpl() {}
@@ -357,6 +358,23 @@
                                           static_cast<int>(metadata_->schema.size()));
     schema_.Init(converter.Convert());
   }
+  void InitColumnOrders() {
+    // update ColumnOrder
+    std::vector<parquet::ColumnOrder> column_orders;
+    if (metadata_->__isset.column_orders) {
+      for (auto column_order : metadata_->column_orders) {
+        if (column_order.__isset.TYPE_ORDER) {
+          column_orders.push_back(ColumnOrder::type_defined_);
+        } else {
+          column_orders.push_back(ColumnOrder::undefined_);
+        }
+      }
+    } else {
+      column_orders.resize(schema_.num_columns(), ColumnOrder::undefined_);
+    }
+
+    schema_.updateColumnOrders(column_orders);
+  }
   SchemaDescriptor schema_;
   ApplicationVersion writer_version_;
 
@@ -495,10 +513,9 @@
   // Parquet cpp version 1.3.0 onwards stats are computed correctly for all types
   if ((application_ != "parquet-cpp") || (VersionLt(PARQUET_CPP_FIXED_STATS_VERSION))) {
     // Only SIGNED are valid
-    if (SortOrder::SIGNED != sort_order) return false;
-
-    // None of the current tools write INT96 Statistics correctly
-    if (col_type == Type::INT96) return false;
+    if (SortOrder::SIGNED != sort_order) {
+      return false;
+    }
 
     // Statistics of other types are OK
     if (col_type != Type::FIXED_LEN_BYTE_ARRAY && col_type != Type::BYTE_ARRAY) {
@@ -511,6 +528,11 @@
     return true;
   }
 
+  // Unknown sort order has incorrect stats
+  if (SortOrder::UNKNOWN == sort_order) {
+    return false;
+  }
+
   // PARQUET-251
   if (VersionLt(PARQUET_251_FIXED_VERSION)) {
     return false;
@@ -808,6 +830,19 @@
     }
     metadata_->__set_version(file_version);
     metadata_->__set_created_by(properties_->created_by());
+
+    // Users cannot set the `ColumnOrder` since we donot not have user defined sort order
+    // in the spec yet.
+    // We always default to `TYPE_DEFINED_ORDER`. We can expose it in
+    // the API once we have user defined sort orders in the Parquet format.
+    // TypeDefinedOrder implies choose SortOrder based on LogicalType/PhysicalType
+    format::TypeDefinedOrder type_defined_order;
+    format::ColumnOrder column_order;
+    column_order.__set_TYPE_ORDER(type_defined_order);
+    column_order.__isset.TYPE_ORDER = true;
+    metadata_->column_orders.resize(schema_->num_columns(), column_order);
+    metadata_->__isset.column_orders = true;
+
     parquet::schema::SchemaFlattener flattener(
         static_cast<parquet::schema::GroupNode*>(schema_->schema_root().get()),
         &metadata_->schema);
diff --git a/src/parquet/parquet.thrift b/src/parquet/parquet.thrift
index a72ef2c..cfcc1fe 100644
--- a/src/parquet/parquet.thrift
+++ b/src/parquet/parquet.thrift
@@ -30,17 +30,6 @@
  * with the encodings to control the on disk storage format.
  * For example INT16 is not included as a type since a good encoding of INT32
  * would handle this.
- *
- * When a logical type is not present, the type-defined sort order of these
- * physical types are:
- * * BOOLEAN - false, true
- * * INT32 - signed comparison
- * * INT64 - signed comparison
- * * INT96 - signed comparison
- * * FLOAT - signed comparison
- * * DOUBLE - signed comparison
- * * BYTE_ARRAY - unsigned byte-wise comparison
- * * FIXED_LEN_BYTE_ARRAY - unsigned byte-wise comparison
  */
 enum Type {
   BOOLEAN = 0;
@@ -187,14 +176,6 @@
    * particular timezone or date.
    */
   INTERVAL = 21;
-
-  /**
-   * Annotates a column that is always null
-   * Sometimes when discovering the schema of existing data
-   * values are always null
-   * This is NULL in parquet-format
-   */
-  NA = 25;
 }
 
 /**
@@ -222,12 +203,12 @@
     * Values are encoded using PLAIN encoding, except that variable-length byte
     * arrays do not include a length prefix.
     *
-    * These fields encode min and max values determined by SIGNED comparison
+    * These fields encode min and max values determined by signed comparison
     * only. New files should use the correct order for a column's logical type
     * and store the values in the min_value and max_value fields.
     *
     * To support older readers, these may be set when the column order is
-    * SIGNED.
+    * signed.
     */
    1: optional binary max;
    2: optional binary min;
@@ -245,6 +226,115 @@
    6: optional binary min_value;
 }
 
+/** Empty structs to use as logical type annotations */
+struct StringType {}  // allowed for BINARY, must be encoded with UTF-8
+struct UUIDType {}    // allowed for FIXED[16], must encoded raw UUID bytes
+struct MapType {}     // see LogicalTypes.md
+struct ListType {}    // see LogicalTypes.md
+struct EnumType {}    // allowed for BINARY, must be encoded with UTF-8
+struct DateType {}    // allowed for INT32
+
+/**
+ * Logical type to annotate a column that is always null.
+ *
+ * Sometimes when discovering the schema of existing data, values are always
+ * null and the physical type can't be determined. This annotation signals
+ * the case where the physical type was guessed from all null values.
+ */
+struct NullType {}    // allowed for any physical type, only null values stored
+
+/**
+ * Decimal logical type annotation
+ *
+ * To maintain forward-compatibility in v1, implementations using this logical
+ * type must also set scale and precision on the annotated SchemaElement.
+ *
+ * Allowed for physical types: INT32, INT64, FIXED, and BINARY
+ */
+struct DecimalType {
+  1: required i32 scale
+  2: required i32 precision
+}
+
+/** Time units for logical types */
+struct MilliSeconds {}
+struct MicroSeconds {}
+union TimeUnit {
+  1: MilliSeconds MILLIS
+  2: MicroSeconds MICROS
+}
+
+/**
+ * Timestamp logical type annotation
+ *
+ * Allowed for physical types: INT64
+ */
+struct TimestampType {
+  1: required bool isAdjustedToUTC
+  2: required TimeUnit unit
+}
+
+/**
+ * Time logical type annotation
+ *
+ * Allowed for physical types: INT32 (millis), INT64 (micros)
+ */
+struct TimeType {
+  1: required bool isAdjustedToUTC
+  2: required TimeUnit unit
+}
+
+/**
+ * Integer logical type annotation
+ *
+ * bitWidth must be 8, 16, 32, or 64.
+ *
+ * Allowed for physical types: INT32, INT64
+ */
+struct IntType {
+  1: required byte bitWidth
+  2: required bool isSigned
+}
+
+/**
+ * Embedded JSON logical type annotation
+ *
+ * Allowed for physical types: BINARY
+ */
+struct JsonType {
+}
+
+/**
+ * Embedded BSON logical type annotation
+ *
+ * Allowed for physical types: BINARY
+ */
+struct BsonType {
+}
+
+/**
+ * LogicalType annotations to replace ConvertedType.
+ *
+ * To maintain compatibility, implementations using LogicalType for a
+ * SchemaElement must also set the corresponding ConvertedType from the
+ * following table.
+ */
+union LogicalType {
+  1:  StringType STRING       // use ConvertedType UTF8 if encoding is UTF-8
+  2:  MapType MAP             // use ConvertedType MAP
+  3:  ListType LIST           // use ConvertedType LIST
+  4:  EnumType ENUM           // use ConvertedType ENUM
+  5:  DecimalType DECIMAL     // use ConvertedType DECIMAL
+  6:  DateType DATE           // use ConvertedType DATE
+  7:  TimeType TIME           // use ConvertedType TIME_MICROS or TIME_MILLIS
+  8:  TimestampType TIMESTAMP // use ConvertedType TIMESTAMP_MICROS or TIMESTAMP_MILLIS
+  // 9: reserved for INTERVAL
+  10: IntType INTEGER         // use ConvertedType INT_* or UINT_*
+  11: NullType UNKNOWN        // no compatible ConvertedType
+  12: JsonType JSON           // use ConvertedType JSON
+  13: BsonType BSON           // use ConvertedType BSON
+}
+
 /**
  * Represents a element inside a schema definition.
  *  - if it is a group (inner node) then type is undefined and num_children is defined
@@ -292,6 +382,13 @@
    */
   9: optional i32 field_id;
 
+  /**
+   * The logical type of this SchemaElement; only valid for primitives.
+   *
+   * LogicalType replaces ConvertedType, but ConvertedType is still required
+   * for some logical types to ensure forward-compatibility in format v1.
+   */
+  10: optional LogicalType logicalType
 }
 
 /**
@@ -324,7 +421,7 @@
    */
   PLAIN_DICTIONARY = 2;
 
-  /** Group packed run length encoding. Usable for definition/reptition levels
+  /** Group packed run length encoding. Usable for definition/repetition levels
    * encoding and Booleans (on one bit: 0 is false; 1 is true.)
    */
   RLE = 3;
@@ -356,15 +453,20 @@
 
 /**
  * Supported compression algorithms.
+ *
+ * Codecs added in 2.3.2 can be read by readers based on 2.3.2 and later.
+ * Codec support may vary between readers based on the format version and
+ * libraries available at runtime. Gzip, Snappy, and LZ4 codecs are
+ * widely available, while Zstd and Brotli require additional libraries.
  */
 enum CompressionCodec {
   UNCOMPRESSED = 0;
   SNAPPY = 1;
   GZIP = 2;
   LZO = 3;
-  BROTLI = 4;
-  LZ4 = 5;
-  ZSTD = 6;
+  BROTLI = 4; // Added in 2.3.2
+  LZ4 = 5;    // Added in 2.3.2
+  ZSTD = 6;   // Added in 2.3.2
 }
 
 enum PageType {
@@ -374,6 +476,16 @@
   DATA_PAGE_V2 = 3;
 }
 
+/**
+ * Enum to annotate whether lists of min/max elements inside ColumnIndex
+ * are ordered and if so, in which direction.
+ */
+enum BoundaryOrder {
+  UNORDERED = 0;
+  ASCENDING = 1;
+  DESCENDING = 2;
+}
+
 /** Data page header */
 struct DataPageHeader {
   /** Number of values, including NULLs, in this data page. **/
@@ -408,7 +520,7 @@
 }
 
 /**
- * New page format alowing reading levels without decompressing the data
+ * New page format allowing reading levels without decompressing the data
  * Repetition and definition levels are uncompressed
  * The remaining section containing the data is compressed if is_compressed is true
  **/
@@ -425,9 +537,9 @@
 
   // repetition levels and definition levels are always using RLE (without size in it)
 
-  /** length of the repetition levels */
-  5: required i32 definition_levels_byte_length;
   /** length of the definition levels */
+  5: required i32 definition_levels_byte_length;
+  /** length of the repetition levels */
   6: required i32 repetition_levels_byte_length;
 
   /**  whether the values are compressed.
@@ -563,6 +675,18 @@
    * metadata.
    **/
   3: optional ColumnMetaData meta_data
+
+  /** File offset of ColumnChunk's OffsetIndex **/
+  4: optional i64 offset_index_offset
+
+  /** Size of ColumnChunk's OffsetIndex, in bytes **/
+  5: optional i32 offset_index_length
+
+  /** File offset of ColumnChunk's ColumnIndex **/
+  6: optional i64 column_index_offset
+
+  /** Size of ColumnChunk's ColumnIndex, in bytes **/
+  7: optional i32 column_index_length
 }
 
 struct RowGroup {
@@ -587,7 +711,9 @@
 struct TypeDefinedOrder {}
 
 /**
- * Union to specify the order used for min, max, and sorting values in a column.
+ * Union to specify the order used for the min_value and max_value fields for a
+ * column. This union takes the role of an enhanced enum that allows rich
+ * elements (which will be needed for a collation-based ordering in the future).
  *
  * Possible values are:
  * * TypeDefinedOrder - the column uses the order defined by its logical or
@@ -597,9 +723,107 @@
  * for this column should be ignored.
  */
 union ColumnOrder {
+
+  /**
+   * The sort orders for logical types are:
+   *   UTF8 - unsigned byte-wise comparison
+   *   INT8 - signed comparison
+   *   INT16 - signed comparison
+   *   INT32 - signed comparison
+   *   INT64 - signed comparison
+   *   UINT8 - unsigned comparison
+   *   UINT16 - unsigned comparison
+   *   UINT32 - unsigned comparison
+   *   UINT64 - unsigned comparison
+   *   DECIMAL - signed comparison of the represented value
+   *   DATE - signed comparison
+   *   TIME_MILLIS - signed comparison
+   *   TIME_MICROS - signed comparison
+   *   TIMESTAMP_MILLIS - signed comparison
+   *   TIMESTAMP_MICROS - signed comparison
+   *   INTERVAL - unsigned comparison
+   *   JSON - unsigned byte-wise comparison
+   *   BSON - unsigned byte-wise comparison
+   *   ENUM - unsigned byte-wise comparison
+   *   LIST - undefined
+   *   MAP - undefined
+   *
+   * In the absence of logical types, the sort order is determined by the physical type:
+   *   BOOLEAN - false, true
+   *   INT32 - signed comparison
+   *   INT64 - signed comparison
+   *   INT96 (only used for legacy timestamps) - unsigned comparison
+   *   FLOAT - signed comparison of the represented value
+   *   DOUBLE - signed comparison of the represented value
+   *   BYTE_ARRAY - unsigned byte-wise comparison
+   *   FIXED_LEN_BYTE_ARRAY - unsigned byte-wise comparison
+   */
   1: TypeDefinedOrder TYPE_ORDER;
 }
 
+struct PageLocation {
+  /** Offset of the page in the file **/
+  1: required i64 offset
+
+  /**
+   * Size of the page, including header. Sum of compressed_page_size and header
+   * length
+   */
+  2: required i32 compressed_page_size
+
+  /**
+   * Index within the RowGroup of the first row of the page; this means pages
+   * change on record boundaries (r = 0).
+   */
+  3: required i64 first_row_index
+}
+
+struct OffsetIndex {
+  /**
+   * PageLocations, ordered by increasing PageLocation.offset. It is required
+   * that page_locations[i].first_row_index < page_locations[i+1].first_row_index.
+   */
+  1: required list<PageLocation> page_locations
+}
+
+/**
+ * Description for ColumnIndex.
+ * Each <array-field>[i] refers to the page at OffsetIndex.page_locations[i]
+ */
+struct ColumnIndex {
+  /**
+   * A list of Boolean values to determine the validity of the corresponding
+   * min and max values. If true, a page contains only null values, and writers
+   * have to set the corresponding entries in min_values and max_values to
+   * byte[0], so that all lists have the same length. If false, the
+   * corresponding entries in min_values and max_values must be valid.
+   */
+  1: required list<bool> null_pages
+
+  /**
+   * Two lists containing lower and upper bounds for the values of each page.
+   * These may be the actual minimum and maximum values found on a page, but
+   * can also be (more compact) values that do not exist on a page. For
+   * example, instead of storing ""Blart Versenwald III", a writer may set
+   * min_values[i]="B", max_values[i]="C". Such more compact values must still
+   * be valid values within the column's logical type. Readers must make sure
+   * that list entries are populated before using them by inspecting null_pages.
+   */
+  2: required list<binary> min_values
+  3: required list<binary> max_values
+
+  /**
+   * Stores whether both min_values and max_values are orderd and if so, in
+   * which direction. This allows readers to perform binary searches in both
+   * lists. Readers cannot assume that max_values[i] <= min_values[i+1], even
+   * if the lists are ordered.
+   */
+  4: required BoundaryOrder boundary_order
+
+  /** A list containing the number of null values for each page **/
+  5: optional list<i64> null_counts
+}
+
 /**
  * Description for file metadata
  */
@@ -631,11 +855,16 @@
   6: optional string created_by
 
   /**
-   * Sort order used for each column in this file.
+   * Sort order used for the min_value and max_value fields of each column in
+   * this file. Each sort order corresponds to one column, determined by its
+   * position in the list, matching the position of the column in the schema.
    *
-   * If this list is not present, then the order for each column is assumed to
-   * be Signed. In addition, min and max values for INTERVAL or DECIMAL stored
-   * as fixed or bytes should be ignored.
+   * Without column_orders, the meaning of the min_value and max_value fields is
+   * undefined. To ensure well-defined behaviour, if min_value and max_value are
+   * written to a Parquet file, column_orders must be written as well.
+   *
+   * The obsolete min and max fields are always sorted by signed comparison
+   * regardless of column_orders.
    */
   7: optional list<ColumnOrder> column_orders;
 }
diff --git a/src/parquet/schema.cc b/src/parquet/schema.cc
index 6075ab6..826ef76 100644
--- a/src/parquet/schema.cc
+++ b/src/parquet/schema.cc
@@ -602,6 +602,37 @@
   Init(NodePtr(schema.release()));
 }
 
+class SchemaUpdater : public Node::Visitor {
+ public:
+  explicit SchemaUpdater(const std::vector<ColumnOrder>& column_orders)
+      : column_orders_(column_orders), leaf_count_(0) {}
+  virtual ~SchemaUpdater() {}
+
+  void Visit(Node* node) override {
+    if (node->is_group()) {
+      GroupNode* group_node = static_cast<GroupNode*>(node);
+      for (int i = 0; i < group_node->field_count(); ++i) {
+        group_node->field(i)->Visit(this);
+      }
+    } else {  // leaf node
+      PrimitiveNode* leaf_node = static_cast<PrimitiveNode*>(node);
+      leaf_node->SetColumnOrder(column_orders_[leaf_count_++]);
+    }
+  }
+
+ private:
+  const std::vector<ColumnOrder>& column_orders_;
+  int leaf_count_;
+};
+
+void SchemaDescriptor::updateColumnOrders(const std::vector<ColumnOrder>& column_orders) {
+  if (static_cast<int>(column_orders.size()) != num_columns()) {
+    throw ParquetException("Malformed schema: not enough ColumnOrder values");
+  }
+  SchemaUpdater visitor(column_orders);
+  const_cast<GroupNode*>(group_node_)->Visit(&visitor);
+}
+
 void SchemaDescriptor::Init(const NodePtr& schema) {
   schema_ = schema;
 
diff --git a/src/parquet/schema.h b/src/parquet/schema.h
index f93f0db..7b6793b 100644
--- a/src/parquet/schema.h
+++ b/src/parquet/schema.h
@@ -209,6 +209,10 @@
 
   Type::type physical_type() const { return physical_type_; }
 
+  ColumnOrder column_order() const { return column_order_; }
+
+  void SetColumnOrder(ColumnOrder column_order) { column_order_ = column_order; }
+
   int32_t type_length() const { return type_length_; }
 
   const DecimalMetadata& decimal_metadata() const { return decimal_metadata_; }
@@ -225,6 +229,7 @@
   Type::type physical_type_;
   int32_t type_length_;
   DecimalMetadata decimal_metadata_;
+  ColumnOrder column_order_;
 
   // For FIXED_LEN_BYTE_ARRAY
   void SetTypeLength(int32_t length) { type_length_ = length; }
@@ -335,6 +340,8 @@
 
   LogicalType::type logical_type() const { return primitive_node_->logical_type(); }
 
+  ColumnOrder column_order() const { return primitive_node_->column_order(); }
+
   SortOrder::type sort_order() const {
     return GetSortOrder(logical_type(), physical_type());
   }
@@ -407,10 +414,14 @@
 
   std::string ToString() const;
 
+  void updateColumnOrders(const std::vector<ColumnOrder>& column_orders);
+
  private:
   friend class ColumnDescriptor;
 
+  // Root Node
   schema::NodePtr schema_;
+  // Root Node
   const schema::GroupNode* group_node_;
 
   void BuildTree(const schema::NodePtr& node, int16_t max_def_level,
diff --git a/src/parquet/statistics-test.cc b/src/parquet/statistics-test.cc
index bc6eac2..ec8f90a 100644
--- a/src/parquet/statistics-test.cc
+++ b/src/parquet/statistics-test.cc
@@ -277,7 +277,7 @@
   ASSERT_EQ(statistics1.max(), statistics2.max());
 }
 
-using TestTypes = ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
+using TestTypes = ::testing::Types<Int32Type, Int64Type, FloatType, DoubleType,
                                    ByteArrayType, FLBAType, BooleanType>;
 
 TYPED_TEST_CASE(TestRowGroupStatistics, TestTypes);
@@ -397,7 +397,7 @@
   ASSERT_TRUE(column_chunk4->is_stats_set());
   auto column_chunk5 = ColumnChunkMetaData::Make(
       reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(4), &version);
-  ASSERT_TRUE(column_chunk5->is_stats_set());
+  ASSERT_FALSE(column_chunk5->is_stats_set());
   auto column_chunk6 = ColumnChunkMetaData::Make(
       reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(5), &version);
   ASSERT_TRUE(column_chunk6->is_stats_set());
@@ -478,8 +478,8 @@
   std::vector<EncodedStatistics> stats_;
 };
 
-using CompareTestTypes = ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType,
-                                          DoubleType, ByteArrayType, FLBAType>;
+using CompareTestTypes = ::testing::Types<Int32Type, Int64Type, FloatType, DoubleType,
+                                          ByteArrayType, FLBAType>;
 
 // TYPE::INT32
 template <>
@@ -537,28 +537,6 @@
       .set_max(std::string(reinterpret_cast<const char*>(&values_[9]), sizeof(T)));
 }
 
-// TYPE::INT96
-template <>
-void TestStatistics<Int96Type>::AddNodes(std::string name) {
-  // INT96 physical type has only Unsigned Statistics
-  fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, Type::INT96,
-                                                LogicalType::NONE));
-}
-
-template <>
-void TestStatistics<Int96Type>::SetValues() {
-  for (int i = 0; i < NUM_VALUES; i++) {
-    values_[i].value[0] = i - 5;  // {-5, -4, -3, -2, -1, 0, 1, 2, 3, 4};
-    values_[i].value[1] = i - 5;  // {-5, -4, -3, -2, -1, 0, 1, 2, 3, 4};
-    values_[i].value[2] = i - 5;  // {-5, -4, -3, -2, -1, 0, 1, 2, 3, 4};
-  }
-
-  // Write Int96 min/max values
-  stats_[0]
-      .set_min(std::string(reinterpret_cast<const char*>(&values_[5]), sizeof(T)))
-      .set_max(std::string(reinterpret_cast<const char*>(&values_[4]), sizeof(T)));
-}
-
 // TYPE::FLOAT
 template <>
 void TestStatistics<FloatType>::SetValues() {
diff --git a/src/parquet/test-specialization.h b/src/parquet/test-specialization.h
index 08160a6..a6112a2 100644
--- a/src/parquet/test-specialization.h
+++ b/src/parquet/test-specialization.h
@@ -35,13 +35,14 @@
 namespace test {
 
 template <>
-void InitValues<bool>(int num_values, vector<bool>& values, vector<uint8_t>& buffer) {
+void inline InitValues<bool>(int num_values, vector<bool>& values,
+                             vector<uint8_t>& buffer) {
   values = flip_coins(num_values, 0);
 }
 
 template <>
-void InitValues<ByteArray>(int num_values, vector<ByteArray>& values,
-                           vector<uint8_t>& buffer) {
+void inline InitValues<ByteArray>(int num_values, vector<ByteArray>& values,
+                                  vector<uint8_t>& buffer) {
   int max_byte_array_len = 12;
   int num_bytes = static_cast<int>(max_byte_array_len + sizeof(uint32_t));
   size_t nbytes = num_values * num_bytes;
@@ -50,14 +51,16 @@
 }
 
 template <>
-void InitValues<FLBA>(int num_values, vector<FLBA>& values, vector<uint8_t>& buffer) {
+void inline InitValues<FLBA>(int num_values, vector<FLBA>& values,
+                             vector<uint8_t>& buffer) {
   size_t nbytes = num_values * FLBA_LENGTH;
   buffer.resize(nbytes);
   random_fixed_byte_array(num_values, 0, buffer.data(), FLBA_LENGTH, values.data());
 }
 
 template <>
-void InitValues<Int96>(int num_values, vector<Int96>& values, vector<uint8_t>& buffer) {
+void inline InitValues<Int96>(int num_values, vector<Int96>& values,
+                              vector<uint8_t>& buffer) {
   random_Int96_numbers(num_values, 0, std::numeric_limits<int32_t>::min(),
                        std::numeric_limits<int32_t>::max(), values.data());
 }
diff --git a/src/parquet/types.cc b/src/parquet/types.cc
index 4e6770f..a4929d5 100644
--- a/src/parquet/types.cc
+++ b/src/parquet/types.cc
@@ -213,8 +213,9 @@
       return SortOrder::SIGNED;
     case Type::BYTE_ARRAY:
     case Type::FIXED_LEN_BYTE_ARRAY:
-    case Type::INT96:  // only used for timestamp, which uses unsigned values
       return SortOrder::UNSIGNED;
+    case Type::INT96:
+      return SortOrder::UNKNOWN;
   }
   return SortOrder::UNKNOWN;
 }
@@ -254,4 +255,7 @@
   return SortOrder::UNKNOWN;
 }
 
+ColumnOrder ColumnOrder::undefined_ = ColumnOrder(ColumnOrder::UNDEFINED);
+ColumnOrder ColumnOrder::type_defined_ = ColumnOrder(ColumnOrder::TYPE_DEFINED_ORDER);
+
 }  // namespace parquet
diff --git a/src/parquet/types.h b/src/parquet/types.h
index c1e9598..2179d50 100644
--- a/src/parquet/types.h
+++ b/src/parquet/types.h
@@ -130,6 +130,21 @@
   enum type { SIGNED, UNSIGNED, UNKNOWN };
 };
 
+class ColumnOrder {
+ public:
+  enum type { UNDEFINED, TYPE_DEFINED_ORDER };
+  explicit ColumnOrder(ColumnOrder::type column_order) : column_order_(column_order) {}
+  // Default to Type Defined Order
+  ColumnOrder() : column_order_(type::TYPE_DEFINED_ORDER) {}
+  ColumnOrder::type get_order() { return column_order_; }
+
+  static ColumnOrder undefined_;
+  static ColumnOrder type_defined_;
+
+ private:
+  ColumnOrder::type column_order_;
+};
+
 // ----------------------------------------------------------------------
 
 struct ByteArray {
diff --git a/src/parquet/util/comparison.cc b/src/parquet/util/comparison.cc
index 1d7bb9d..a0768b3 100644
--- a/src/parquet/util/comparison.cc
+++ b/src/parquet/util/comparison.cc
@@ -33,8 +33,6 @@
         return std::make_shared<CompareDefaultInt32>();
       case Type::INT64:
         return std::make_shared<CompareDefaultInt64>();
-      case Type::INT96:
-        return std::make_shared<CompareDefaultInt96>();
       case Type::FLOAT:
         return std::make_shared<CompareDefaultFloat>();
       case Type::DOUBLE:
@@ -52,8 +50,6 @@
         return std::make_shared<CompareUnsignedInt32>();
       case Type::INT64:
         return std::make_shared<CompareUnsignedInt64>();
-      case Type::INT96:
-        return std::make_shared<CompareUnsignedInt96>();
       case Type::BYTE_ARRAY:
         return std::make_shared<CompareUnsignedByteArray>();
       case Type::FIXED_LEN_BYTE_ARRAY: