PARQUET-979: Limit size of min, max or disable stats for long binary types
Author: Deepak Majeti <deepak.majeti@hpe.com>
Closes #465 from majetideepak/PARQUET-979 and squashes the following commits:
3b18173 [Deepak Majeti] improve naming and ColumnProperties class
a888aa4 [Deepak Majeti] Add an option to specify max stats size
c103c4f [Deepak Majeti] make format
cf0260c [Deepak Majeti] PARQUET-979: [C++] Limit size of min, max or disable stats for long binary types
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 47226a3..6d7e1eb 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1445,8 +1445,8 @@
using ::arrow::Schema;
using ::arrow::Table;
using ::arrow::TimeUnit;
- using ::arrow::TimestampType;
using ::arrow::TimestampBuilder;
+ using ::arrow::TimestampType;
using ::arrow::default_memory_pool;
auto timestamp_type = std::make_shared<TimestampType>(TimeUnit::NANO);
diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc
index 7a5f379..aac582a 100644
--- a/src/parquet/column_writer-test.cc
+++ b/src/parquet/column_writer-test.cc
@@ -71,19 +71,21 @@
int64_t output_size = SMALL_SIZE,
const ColumnProperties& column_properties = ColumnProperties()) {
sink_.reset(new InMemoryOutputStream());
- metadata_ = ColumnChunkMetaDataBuilder::Make(
- writer_properties_, this->descr_, reinterpret_cast<uint8_t*>(&thrift_metadata_));
- std::unique_ptr<PageWriter> pager =
- PageWriter::Open(sink_.get(), column_properties.codec, metadata_.get());
WriterProperties::Builder wp_builder;
- if (column_properties.encoding == Encoding::PLAIN_DICTIONARY ||
- column_properties.encoding == Encoding::RLE_DICTIONARY) {
+ if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY ||
+ column_properties.encoding() == Encoding::RLE_DICTIONARY) {
wp_builder.enable_dictionary();
} else {
wp_builder.disable_dictionary();
- wp_builder.encoding(column_properties.encoding);
+ wp_builder.encoding(column_properties.encoding());
}
+ wp_builder.max_statistics_size(column_properties.max_statistics_size());
writer_properties_ = wp_builder.build();
+
+ metadata_ = ColumnChunkMetaDataBuilder::Make(
+ writer_properties_, this->descr_, reinterpret_cast<uint8_t*>(&thrift_metadata_));
+ std::unique_ptr<PageWriter> pager =
+ PageWriter::Open(sink_.get(), column_properties.compression(), metadata_.get());
std::shared_ptr<ColumnWriter> writer =
ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get());
return std::static_pointer_cast<TypedColumnWriter<TestType>>(writer);
@@ -173,6 +175,16 @@
return metadata_accessor->num_values();
}
+ bool metadata_is_stats_set() {
+ // Metadata accessor must be created lazily.
+ // This is because the ColumnChunkMetaData semantics dictate the metadata object is
+ // complete (no changes to the metadata buffer can be made after instantiation)
+ ApplicationVersion app_version(this->writer_properties_->created_by());
+ auto metadata_accessor = ColumnChunkMetaData::Make(
+ reinterpret_cast<const uint8_t*>(&thrift_metadata_), this->descr_, &app_version);
+ return metadata_accessor->is_stats_set();
+ }
+
std::vector<Encoding::type> metadata_encodings() {
// Metadata accessor must be created lazily.
// This is because the ColumnChunkMetaData semantics dictate the metadata object is
@@ -520,6 +532,50 @@
}
}
+// PARQUET-979
+// Prevent writing large stats
+using TestByteArrayValuesWriter = TestPrimitiveWriter<ByteArrayType>;
+TEST_F(TestByteArrayValuesWriter, OmitStats) {
+ int min_len = 1024 * 4;
+ int max_len = 1024 * 8;
+ this->SetUpSchema(Repetition::REQUIRED);
+ auto writer = this->BuildWriter();
+
+ values_.resize(SMALL_SIZE);
+ InitWideByteArrayValues(SMALL_SIZE, this->values_, this->buffer_, min_len, max_len);
+ writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_.data());
+ writer->Close();
+
+ ASSERT_FALSE(this->metadata_is_stats_set());
+}
+
+TEST_F(TestByteArrayValuesWriter, LimitStats) {
+ int min_len = 1024 * 4;
+ int max_len = 1024 * 8;
+ this->SetUpSchema(Repetition::REQUIRED);
+ ColumnProperties column_properties;
+ column_properties.set_max_statistics_size(static_cast<size_t>(max_len));
+ auto writer = this->BuildWriter(SMALL_SIZE, column_properties);
+
+ values_.resize(SMALL_SIZE);
+ InitWideByteArrayValues(SMALL_SIZE, this->values_, this->buffer_, min_len, max_len);
+ writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_.data());
+ writer->Close();
+
+ ASSERT_TRUE(this->metadata_is_stats_set());
+}
+
+TEST_F(TestByteArrayValuesWriter, CheckDefaultStats) {
+ this->SetUpSchema(Repetition::REQUIRED);
+ auto writer = this->BuildWriter();
+ this->GenerateData(SMALL_SIZE);
+
+ writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_ptr_);
+ writer->Close();
+
+ ASSERT_TRUE(this->metadata_is_stats_set());
+}
+
void GenerateLevels(int min_repeat_factor, int max_repeat_factor, int max_level,
std::vector<int16_t>& input_levels) {
// for each repetition count upto max_repeat_factor
diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc
index 4f2ef6c..8a1b56c 100644
--- a/src/parquet/column_writer.cc
+++ b/src/parquet/column_writer.cc
@@ -431,7 +431,14 @@
FlushBufferedDataPages();
EncodedStatistics chunk_statistics = GetChunkStatistics();
- if (chunk_statistics.is_set()) {
+ // From parquet-mr
+ // Don't write stats larger than the max size rather than truncating. The
+ // rationale is that some engines may use the minimum value in the page as
+ // the true minimum for aggregations and there is no way to mark that a
+ // value has been truncated and is a lower bound and not in the page.
+ if (chunk_statistics.is_set() &&
+ chunk_statistics.max_stat_length() <=
+ properties_->max_statistics_size(descr_->path())) {
metadata_->SetStatistics(SortOrder::SIGNED == descr_->sort_order(),
chunk_statistics);
}
diff --git a/src/parquet/properties.h b/src/parquet/properties.h
index a331aae..83dc205 100644
--- a/src/parquet/properties.h
+++ b/src/parquet/properties.h
@@ -84,6 +84,7 @@
static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024;
static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 64 * 1024 * 1024;
static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true;
+static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096;
static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
ParquetVersion::PARQUET_1_0;
@@ -95,16 +96,46 @@
ColumnProperties(Encoding::type encoding = DEFAULT_ENCODING,
Compression::type codec = DEFAULT_COMPRESSION_TYPE,
bool dictionary_enabled = DEFAULT_IS_DICTIONARY_ENABLED,
- bool statistics_enabled = DEFAULT_ARE_STATISTICS_ENABLED)
- : encoding(encoding),
- codec(codec),
- dictionary_enabled(dictionary_enabled),
- statistics_enabled(statistics_enabled) {}
+ bool statistics_enabled = DEFAULT_ARE_STATISTICS_ENABLED,
+ size_t max_stats_size = DEFAULT_MAX_STATISTICS_SIZE)
+ : encoding_(encoding),
+ codec_(codec),
+ dictionary_enabled_(dictionary_enabled),
+ statistics_enabled_(statistics_enabled),
+ max_stats_size_(max_stats_size) {}
- Encoding::type encoding;
- Compression::type codec;
- bool dictionary_enabled;
- bool statistics_enabled;
+ void set_encoding(Encoding::type encoding) { encoding_ = encoding; }
+
+ void set_compression(Compression::type codec) { codec_ = codec; }
+
+ void set_dictionary_enabled(bool dictionary_enabled) {
+ dictionary_enabled_ = dictionary_enabled;
+ }
+
+ void set_statistics_enabled(bool statistics_enabled) {
+ statistics_enabled_ = statistics_enabled;
+ }
+
+ void set_max_statistics_size(size_t max_stats_size) {
+ max_stats_size_ = max_stats_size;
+ }
+
+ Encoding::type encoding() const { return encoding_; }
+
+ Compression::type compression() const { return codec_; }
+
+ bool dictionary_enabled() const { return dictionary_enabled_; }
+
+ bool statistics_enabled() const { return statistics_enabled_; }
+
+ size_t max_statistics_size() const { return max_stats_size_; }
+
+ private:
+ Encoding::type encoding_;
+ Compression::type codec_;
+ bool dictionary_enabled_;
+ bool statistics_enabled_;
+ size_t max_stats_size_;
};
class PARQUET_EXPORT WriterProperties {
@@ -127,12 +158,12 @@
}
Builder* enable_dictionary() {
- default_column_properties_.dictionary_enabled = true;
+ default_column_properties_.set_dictionary_enabled(true);
return this;
}
Builder* disable_dictionary() {
- default_column_properties_.dictionary_enabled = false;
+ default_column_properties_.set_dictionary_enabled(false);
return this;
}
@@ -196,7 +227,7 @@
throw ParquetException("Can't use dictionary encoding as fallback encoding");
}
- default_column_properties_.encoding = encoding_type;
+ default_column_properties_.set_encoding(encoding_type);
return this;
}
@@ -228,7 +259,12 @@
}
Builder* compression(Compression::type codec) {
- default_column_properties_.codec = codec;
+ default_column_properties_.set_compression(codec);
+ return this;
+ }
+
+ Builder* max_statistics_size(size_t max_stats_sz) {
+ default_column_properties_.set_max_statistics_size(max_stats_sz);
return this;
}
@@ -243,12 +279,12 @@
}
Builder* enable_statistics() {
- default_column_properties_.statistics_enabled = true;
+ default_column_properties_.set_statistics_enabled(true);
return this;
}
Builder* disable_statistics() {
- default_column_properties_.statistics_enabled = false;
+ default_column_properties_.set_statistics_enabled(false);
return this;
}
@@ -280,12 +316,12 @@
return it->second;
};
- for (const auto& item : encodings_) get(item.first).encoding = item.second;
- for (const auto& item : codecs_) get(item.first).codec = item.second;
+ for (const auto& item : encodings_) get(item.first).set_encoding(item.second);
+ for (const auto& item : codecs_) get(item.first).set_compression(item.second);
for (const auto& item : dictionary_enabled_)
- get(item.first).dictionary_enabled = item.second;
+ get(item.first).set_dictionary_enabled(item.second);
for (const auto& item : statistics_enabled_)
- get(item.first).statistics_enabled = item.second;
+ get(item.first).set_statistics_enabled(item.second);
return std::shared_ptr<WriterProperties>(
new WriterProperties(pool_, dictionary_pagesize_limit_, write_batch_size_,
@@ -348,19 +384,23 @@
}
Encoding::type encoding(const std::shared_ptr<schema::ColumnPath>& path) const {
- return column_properties(path).encoding;
+ return column_properties(path).encoding();
}
Compression::type compression(const std::shared_ptr<schema::ColumnPath>& path) const {
- return column_properties(path).codec;
+ return column_properties(path).compression();
}
bool dictionary_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
- return column_properties(path).dictionary_enabled;
+ return column_properties(path).dictionary_enabled();
}
bool statistics_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
- return column_properties(path).statistics_enabled;
+ return column_properties(path).statistics_enabled();
+ }
+
+ size_t max_statistics_size(const std::shared_ptr<schema::ColumnPath>& path) const {
+ return column_properties(path).max_statistics_size();
}
private:
diff --git a/src/parquet/statistics.h b/src/parquet/statistics.h
index 36f9e44..4f9df72 100644
--- a/src/parquet/statistics.h
+++ b/src/parquet/statistics.h
@@ -18,6 +18,7 @@
#ifndef PARQUET_COLUMN_STATISTICS_H
#define PARQUET_COLUMN_STATISTICS_H
+#include <algorithm>
#include <cstdint>
#include <memory>
#include <string>
@@ -52,6 +53,9 @@
return has_min || has_max || has_null_count || has_distinct_count;
}
+ // larger of the max_ and min_ stat values
+ inline size_t max_stat_length() { return std::max(max_->length(), min_->length()); }
+
inline EncodedStatistics& set_max(const std::string& value) {
*max_ = value;
has_max = true;
diff --git a/src/parquet/test-specialization.h b/src/parquet/test-specialization.h
index a6112a2..3d88cfc 100644
--- a/src/parquet/test-specialization.h
+++ b/src/parquet/test-specialization.h
@@ -50,6 +50,14 @@
random_byte_array(num_values, 0, buffer.data(), values.data(), max_byte_array_len);
}
+void inline InitWideByteArrayValues(int num_values, vector<ByteArray>& values,
+ vector<uint8_t>& buffer, int min_len, int max_len) {
+ int num_bytes = static_cast<int>(max_len + sizeof(uint32_t));
+ size_t nbytes = num_values * num_bytes;
+ buffer.resize(nbytes);
+ random_byte_array(num_values, 0, buffer.data(), values.data(), min_len, max_len);
+}
+
template <>
void inline InitValues<FLBA>(int num_values, vector<FLBA>& values,
vector<uint8_t>& buffer) {