PARQUET-979: Limit size of min, max or disable stats for long binary types

Author: Deepak Majeti <deepak.majeti@hpe.com>

Closes #465 from majetideepak/PARQUET-979 and squashes the following commits:

3b18173 [Deepak Majeti] improve naming and ColumnProperties class
a888aa4 [Deepak Majeti] Add an option to specify max stats size
c103c4f [Deepak Majeti] make format
cf0260c [Deepak Majeti] PARQUET-979: [C++] Limit size of min, max or disable stats for long binary types
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 47226a3..6d7e1eb 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1445,8 +1445,8 @@
   using ::arrow::Schema;
   using ::arrow::Table;
   using ::arrow::TimeUnit;
-  using ::arrow::TimestampType;
   using ::arrow::TimestampBuilder;
+  using ::arrow::TimestampType;
   using ::arrow::default_memory_pool;
 
   auto timestamp_type = std::make_shared<TimestampType>(TimeUnit::NANO);
diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc
index 7a5f379..aac582a 100644
--- a/src/parquet/column_writer-test.cc
+++ b/src/parquet/column_writer-test.cc
@@ -71,19 +71,21 @@
       int64_t output_size = SMALL_SIZE,
       const ColumnProperties& column_properties = ColumnProperties()) {
     sink_.reset(new InMemoryOutputStream());
-    metadata_ = ColumnChunkMetaDataBuilder::Make(
-        writer_properties_, this->descr_, reinterpret_cast<uint8_t*>(&thrift_metadata_));
-    std::unique_ptr<PageWriter> pager =
-        PageWriter::Open(sink_.get(), column_properties.codec, metadata_.get());
     WriterProperties::Builder wp_builder;
-    if (column_properties.encoding == Encoding::PLAIN_DICTIONARY ||
-        column_properties.encoding == Encoding::RLE_DICTIONARY) {
+    if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY ||
+        column_properties.encoding() == Encoding::RLE_DICTIONARY) {
       wp_builder.enable_dictionary();
     } else {
       wp_builder.disable_dictionary();
-      wp_builder.encoding(column_properties.encoding);
+      wp_builder.encoding(column_properties.encoding());
     }
+    wp_builder.max_statistics_size(column_properties.max_statistics_size());
     writer_properties_ = wp_builder.build();
+
+    metadata_ = ColumnChunkMetaDataBuilder::Make(
+        writer_properties_, this->descr_, reinterpret_cast<uint8_t*>(&thrift_metadata_));
+    std::unique_ptr<PageWriter> pager =
+        PageWriter::Open(sink_.get(), column_properties.compression(), metadata_.get());
     std::shared_ptr<ColumnWriter> writer =
         ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get());
     return std::static_pointer_cast<TypedColumnWriter<TestType>>(writer);
@@ -173,6 +175,16 @@
     return metadata_accessor->num_values();
   }
 
+  bool metadata_is_stats_set() {
+    // Metadata accessor must be created lazily.
+    // This is because the ColumnChunkMetaData semantics dictate the metadata object is
+    // complete (no changes to the metadata buffer can be made after instantiation)
+    ApplicationVersion app_version(this->writer_properties_->created_by());
+    auto metadata_accessor = ColumnChunkMetaData::Make(
+        reinterpret_cast<const uint8_t*>(&thrift_metadata_), this->descr_, &app_version);
+    return metadata_accessor->is_stats_set();
+  }
+
   std::vector<Encoding::type> metadata_encodings() {
     // Metadata accessor must be created lazily.
     // This is because the ColumnChunkMetaData semantics dictate the metadata object is
@@ -520,6 +532,50 @@
   }
 }
 
+// PARQUET-979
+// Prevent writing large stats
+using TestByteArrayValuesWriter = TestPrimitiveWriter<ByteArrayType>;
+TEST_F(TestByteArrayValuesWriter, OmitStats) {
+  int min_len = 1024 * 4;
+  int max_len = 1024 * 8;
+  this->SetUpSchema(Repetition::REQUIRED);
+  auto writer = this->BuildWriter();
+
+  values_.resize(SMALL_SIZE);
+  InitWideByteArrayValues(SMALL_SIZE, this->values_, this->buffer_, min_len, max_len);
+  writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_.data());
+  writer->Close();
+
+  ASSERT_FALSE(this->metadata_is_stats_set());
+}
+
+TEST_F(TestByteArrayValuesWriter, LimitStats) {
+  int min_len = 1024 * 4;
+  int max_len = 1024 * 8;
+  this->SetUpSchema(Repetition::REQUIRED);
+  ColumnProperties column_properties;
+  column_properties.set_max_statistics_size(static_cast<size_t>(max_len));
+  auto writer = this->BuildWriter(SMALL_SIZE, column_properties);
+
+  values_.resize(SMALL_SIZE);
+  InitWideByteArrayValues(SMALL_SIZE, this->values_, this->buffer_, min_len, max_len);
+  writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_.data());
+  writer->Close();
+
+  ASSERT_TRUE(this->metadata_is_stats_set());
+}
+
+TEST_F(TestByteArrayValuesWriter, CheckDefaultStats) {
+  this->SetUpSchema(Repetition::REQUIRED);
+  auto writer = this->BuildWriter();
+  this->GenerateData(SMALL_SIZE);
+
+  writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_ptr_);
+  writer->Close();
+
+  ASSERT_TRUE(this->metadata_is_stats_set());
+}
+
 void GenerateLevels(int min_repeat_factor, int max_repeat_factor, int max_level,
                     std::vector<int16_t>& input_levels) {
   // for each repetition count upto max_repeat_factor
diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc
index 4f2ef6c..8a1b56c 100644
--- a/src/parquet/column_writer.cc
+++ b/src/parquet/column_writer.cc
@@ -431,7 +431,14 @@
     FlushBufferedDataPages();
 
     EncodedStatistics chunk_statistics = GetChunkStatistics();
-    if (chunk_statistics.is_set()) {
+    // From parquet-mr
+    // Don't write stats larger than the max size rather than truncating. The
+    // rationale is that some engines may use the minimum value in the page as
+    // the true minimum for aggregations and there is no way to mark that a
+    // value has been truncated and is a lower bound and not in the page.
+    if (chunk_statistics.is_set() &&
+        chunk_statistics.max_stat_length() <=
+            properties_->max_statistics_size(descr_->path())) {
       metadata_->SetStatistics(SortOrder::SIGNED == descr_->sort_order(),
                                chunk_statistics);
     }
diff --git a/src/parquet/properties.h b/src/parquet/properties.h
index a331aae..83dc205 100644
--- a/src/parquet/properties.h
+++ b/src/parquet/properties.h
@@ -84,6 +84,7 @@
 static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024;
 static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 64 * 1024 * 1024;
 static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true;
+static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096;
 static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
 static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
     ParquetVersion::PARQUET_1_0;
@@ -95,16 +96,46 @@
   ColumnProperties(Encoding::type encoding = DEFAULT_ENCODING,
                    Compression::type codec = DEFAULT_COMPRESSION_TYPE,
                    bool dictionary_enabled = DEFAULT_IS_DICTIONARY_ENABLED,
-                   bool statistics_enabled = DEFAULT_ARE_STATISTICS_ENABLED)
-      : encoding(encoding),
-        codec(codec),
-        dictionary_enabled(dictionary_enabled),
-        statistics_enabled(statistics_enabled) {}
+                   bool statistics_enabled = DEFAULT_ARE_STATISTICS_ENABLED,
+                   size_t max_stats_size = DEFAULT_MAX_STATISTICS_SIZE)
+      : encoding_(encoding),
+        codec_(codec),
+        dictionary_enabled_(dictionary_enabled),
+        statistics_enabled_(statistics_enabled),
+        max_stats_size_(max_stats_size) {}
 
-  Encoding::type encoding;
-  Compression::type codec;
-  bool dictionary_enabled;
-  bool statistics_enabled;
+  void set_encoding(Encoding::type encoding) { encoding_ = encoding; }
+
+  void set_compression(Compression::type codec) { codec_ = codec; }
+
+  void set_dictionary_enabled(bool dictionary_enabled) {
+    dictionary_enabled_ = dictionary_enabled;
+  }
+
+  void set_statistics_enabled(bool statistics_enabled) {
+    statistics_enabled_ = statistics_enabled;
+  }
+
+  void set_max_statistics_size(size_t max_stats_size) {
+    max_stats_size_ = max_stats_size;
+  }
+
+  Encoding::type encoding() const { return encoding_; }
+
+  Compression::type compression() const { return codec_; }
+
+  bool dictionary_enabled() const { return dictionary_enabled_; }
+
+  bool statistics_enabled() const { return statistics_enabled_; }
+
+  size_t max_statistics_size() const { return max_stats_size_; }
+
+ private:
+  Encoding::type encoding_;
+  Compression::type codec_;
+  bool dictionary_enabled_;
+  bool statistics_enabled_;
+  size_t max_stats_size_;
 };
 
 class PARQUET_EXPORT WriterProperties {
@@ -127,12 +158,12 @@
     }
 
     Builder* enable_dictionary() {
-      default_column_properties_.dictionary_enabled = true;
+      default_column_properties_.set_dictionary_enabled(true);
       return this;
     }
 
     Builder* disable_dictionary() {
-      default_column_properties_.dictionary_enabled = false;
+      default_column_properties_.set_dictionary_enabled(false);
       return this;
     }
 
@@ -196,7 +227,7 @@
         throw ParquetException("Can't use dictionary encoding as fallback encoding");
       }
 
-      default_column_properties_.encoding = encoding_type;
+      default_column_properties_.set_encoding(encoding_type);
       return this;
     }
 
@@ -228,7 +259,12 @@
     }
 
     Builder* compression(Compression::type codec) {
-      default_column_properties_.codec = codec;
+      default_column_properties_.set_compression(codec);
+      return this;
+    }
+
+    Builder* max_statistics_size(size_t max_stats_sz) {
+      default_column_properties_.set_max_statistics_size(max_stats_sz);
       return this;
     }
 
@@ -243,12 +279,12 @@
     }
 
     Builder* enable_statistics() {
-      default_column_properties_.statistics_enabled = true;
+      default_column_properties_.set_statistics_enabled(true);
       return this;
     }
 
     Builder* disable_statistics() {
-      default_column_properties_.statistics_enabled = false;
+      default_column_properties_.set_statistics_enabled(false);
       return this;
     }
 
@@ -280,12 +316,12 @@
           return it->second;
       };
 
-      for (const auto& item : encodings_) get(item.first).encoding = item.second;
-      for (const auto& item : codecs_) get(item.first).codec = item.second;
+      for (const auto& item : encodings_) get(item.first).set_encoding(item.second);
+      for (const auto& item : codecs_) get(item.first).set_compression(item.second);
       for (const auto& item : dictionary_enabled_)
-        get(item.first).dictionary_enabled = item.second;
+        get(item.first).set_dictionary_enabled(item.second);
       for (const auto& item : statistics_enabled_)
-        get(item.first).statistics_enabled = item.second;
+        get(item.first).set_statistics_enabled(item.second);
 
       return std::shared_ptr<WriterProperties>(
           new WriterProperties(pool_, dictionary_pagesize_limit_, write_batch_size_,
@@ -348,19 +384,23 @@
   }
 
   Encoding::type encoding(const std::shared_ptr<schema::ColumnPath>& path) const {
-    return column_properties(path).encoding;
+    return column_properties(path).encoding();
   }
 
   Compression::type compression(const std::shared_ptr<schema::ColumnPath>& path) const {
-    return column_properties(path).codec;
+    return column_properties(path).compression();
   }
 
   bool dictionary_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
-    return column_properties(path).dictionary_enabled;
+    return column_properties(path).dictionary_enabled();
   }
 
   bool statistics_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
-    return column_properties(path).statistics_enabled;
+    return column_properties(path).statistics_enabled();
+  }
+
+  size_t max_statistics_size(const std::shared_ptr<schema::ColumnPath>& path) const {
+    return column_properties(path).max_statistics_size();
   }
 
  private:
diff --git a/src/parquet/statistics.h b/src/parquet/statistics.h
index 36f9e44..4f9df72 100644
--- a/src/parquet/statistics.h
+++ b/src/parquet/statistics.h
@@ -18,6 +18,7 @@
 #ifndef PARQUET_COLUMN_STATISTICS_H
 #define PARQUET_COLUMN_STATISTICS_H
 
+#include <algorithm>
 #include <cstdint>
 #include <memory>
 #include <string>
@@ -52,6 +53,9 @@
     return has_min || has_max || has_null_count || has_distinct_count;
   }
 
+  // larger of the max_ and min_ stat values
+  inline size_t max_stat_length() { return std::max(max_->length(), min_->length()); }
+
   inline EncodedStatistics& set_max(const std::string& value) {
     *max_ = value;
     has_max = true;
diff --git a/src/parquet/test-specialization.h b/src/parquet/test-specialization.h
index a6112a2..3d88cfc 100644
--- a/src/parquet/test-specialization.h
+++ b/src/parquet/test-specialization.h
@@ -50,6 +50,14 @@
   random_byte_array(num_values, 0, buffer.data(), values.data(), max_byte_array_len);
 }
 
+void inline InitWideByteArrayValues(int num_values, vector<ByteArray>& values,
+                                    vector<uint8_t>& buffer, int min_len, int max_len) {
+  int num_bytes = static_cast<int>(max_len + sizeof(uint32_t));
+  size_t nbytes = num_values * num_bytes;
+  buffer.resize(nbytes);
+  random_byte_array(num_values, 0, buffer.data(), values.data(), min_len, max_len);
+}
+
 template <>
 void inline InitValues<FLBA>(int num_values, vector<FLBA>& values,
                              vector<uint8_t>& buffer) {