PARQUET-1002: Compute statistics based on Sort Order
@lomereiter, You might also want to take a look since you previously implemented the Statistics API.
Author: Deepak Majeti <deepak.majeti@hpe.com>
Closes #383 from majetideepak/PARQUET-1002 and squashes the following commits:
5a93fe3 [Deepak Majeti] fix error
48dd22d [Deepak Majeti] change fix version from 1.2.1 to 1.3.0
712ea90 [Deepak Majeti] Move test to statistics-test
75ea475 [Deepak Majeti] Fix formatting
c5d9610 [Deepak Majeti] Rename compare to comparator
ba18ae6 [Deepak Majeti] Review comments
f87bda6 [Deepak Majeti] Avoid UTF-8 file encoding mismatch between windows and linux
3cb8a3e [Deepak Majeti] Review comments
17a79f3 [Deepak Majeti] fix failures
85a9052 [Deepak Majeti] fix Clang failure and improve test Fix Warnings on Windows
bdd8d37 [Deepak Majeti] Add another test
6a985de [Deepak Majeti] Comments
808e764 [Deepak Majeti] fix failure
55960fe [Deepak Majeti] format
b4fba8b [Deepak Majeti] Add test for Unknown sort order
d58658d [Deepak Majeti] make format
c1abb69 [Deepak Majeti] rename to Make
3df2686 [Deepak Majeti] Add Reader Writer Statistics Test
c4b1827 [Deepak Majeti] extend testing fix tests
4287565 [Deepak Majeti] Fix reader read new max and min values
046f8d3 [Deepak Majeti] Move SortOrder to types.h add int32 comparison INT96 fix statistics in metadata Use templates
diff --git a/CMakeLists.txt b/CMakeLists.txt
index b7a41d8..1ecf877 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -646,6 +646,7 @@
src/parquet/parquet_constants.cpp
src/parquet/parquet_types.cpp
+ src/parquet/util/comparison.cc
src/parquet/util/memory.cc
)
diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc
index 3ec3663..48f243e 100644
--- a/src/parquet/column_writer-test.cc
+++ b/src/parquet/column_writer-test.cc
@@ -151,14 +151,16 @@
void ReadAndCompare(Compression::type compression, int64_t num_rows) {
this->SetupValuesOut(num_rows);
this->ReadColumnFully(compression);
- Compare<T> compare(this->descr_);
+ std::shared_ptr<CompareDefault<TestType>> compare;
+ compare = std::static_pointer_cast<CompareDefault<TestType>>(
+ Comparator::Make(this->descr_));
for (size_t i = 0; i < this->values_.size(); i++) {
- if (compare(this->values_[i], this->values_out_[i]) ||
- compare(this->values_out_[i], this->values_[i])) {
+ if ((*compare)(this->values_[i], this->values_out_[i]) ||
+ (*compare)(this->values_out_[i], this->values_[i])) {
std::cout << "Failed at " << i << std::endl;
}
- ASSERT_FALSE(compare(this->values_[i], this->values_out_[i]));
- ASSERT_FALSE(compare(this->values_out_[i], this->values_[i]));
+ ASSERT_FALSE((*compare)(this->values_[i], this->values_out_[i]));
+ ASSERT_FALSE((*compare)(this->values_out_[i], this->values_[i]));
}
ASSERT_EQ(this->values_, this->values_out_);
}
diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc
index b36f395..ac7e2ba 100644
--- a/src/parquet/column_writer.cc
+++ b/src/parquet/column_writer.cc
@@ -267,7 +267,10 @@
FlushBufferedDataPages();
EncodedStatistics chunk_statistics = GetChunkStatistics();
- if (chunk_statistics.is_set()) metadata_->SetStatistics(chunk_statistics);
+ if (chunk_statistics.is_set()) {
+ metadata_->SetStatistics(SortOrder::SIGNED == descr_->sort_order(),
+ chunk_statistics);
+ }
pager_->Close(has_dictionary_, fallback_);
}
@@ -317,7 +320,8 @@
ParquetException::NYI("Selected encoding is not supported");
}
- if (properties->statistics_enabled(descr_->path())) {
+ if (properties->statistics_enabled(descr_->path()) &&
+ (SortOrder::UNKNOWN != descr_->sort_order())) {
page_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
chunk_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
}
diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/file/file-metadata-test.cc
index a7c438c..5bd8419 100644
--- a/src/parquet/file/file-metadata-test.cc
+++ b/src/parquet/file/file-metadata-test.cc
@@ -63,8 +63,8 @@
auto col1_builder = rg1_builder->NextColumnChunk();
auto col2_builder = rg1_builder->NextColumnChunk();
// column metadata
- col1_builder->SetStatistics(stats_int);
- col2_builder->SetStatistics(stats_float);
+ col1_builder->SetStatistics(true, stats_int);
+ col2_builder->SetStatistics(true, stats_float);
col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, true, false);
col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, true, false);
rg1_builder->Finish(1024);
@@ -73,8 +73,8 @@
col1_builder = rg2_builder->NextColumnChunk();
col2_builder = rg2_builder->NextColumnChunk();
// column metadata
- col1_builder->SetStatistics(stats_int);
- col2_builder->SetStatistics(stats_float);
+ col1_builder->SetStatistics(true, stats_int);
+ col2_builder->SetStatistics(true, stats_float);
col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, true, false);
col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, true, false);
rg2_builder->Finish(1024);
@@ -215,11 +215,12 @@
ASSERT_EQ(true, version.VersionLt(version1));
- ASSERT_FALSE(version1.HasCorrectStatistics(Type::INT96));
- ASSERT_TRUE(version.HasCorrectStatistics(Type::INT32));
- ASSERT_FALSE(version.HasCorrectStatistics(Type::BYTE_ARRAY));
- ASSERT_TRUE(version1.HasCorrectStatistics(Type::BYTE_ARRAY));
- ASSERT_TRUE(version3.HasCorrectStatistics(Type::FIXED_LEN_BYTE_ARRAY));
+ ASSERT_FALSE(version1.HasCorrectStatistics(Type::INT96, SortOrder::SIGNED));
+ ASSERT_TRUE(version.HasCorrectStatistics(Type::INT32, SortOrder::SIGNED));
+ ASSERT_FALSE(version.HasCorrectStatistics(Type::BYTE_ARRAY, SortOrder::SIGNED));
+ ASSERT_TRUE(version1.HasCorrectStatistics(Type::BYTE_ARRAY, SortOrder::SIGNED));
+ ASSERT_TRUE(
+ version3.HasCorrectStatistics(Type::FIXED_LEN_BYTE_ARRAY, SortOrder::SIGNED));
}
} // namespace metadata
diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc
index d5a96f3..6e7fc3b 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/file/metadata.cc
@@ -35,62 +35,20 @@
ApplicationVersion("parquet-mr version 1.8.0");
const ApplicationVersion ApplicationVersion::PARQUET_816_FIXED_VERSION =
ApplicationVersion("parquet-mr version 1.2.9");
-
-// Return the Sort Order of the Parquet Physical Types
-SortOrder default_sort_order(Type::type primitive) {
- switch (primitive) {
- case Type::BOOLEAN:
- case Type::INT32:
- case Type::INT64:
- case Type::FLOAT:
- case Type::DOUBLE:
- return SortOrder::SIGNED;
- case Type::BYTE_ARRAY:
- case Type::FIXED_LEN_BYTE_ARRAY:
- case Type::INT96: // only used for timestamp, which uses unsigned values
- return SortOrder::UNSIGNED;
- }
- return SortOrder::UNKNOWN;
-}
-
-// Return the SortOrder of the Parquet Types using Logical or Physical Types
-SortOrder get_sort_order(LogicalType::type converted, Type::type primitive) {
- if (converted == LogicalType::NONE) return default_sort_order(primitive);
- switch (converted) {
- case LogicalType::INT_8:
- case LogicalType::INT_16:
- case LogicalType::INT_32:
- case LogicalType::INT_64:
- case LogicalType::DATE:
- case LogicalType::TIME_MICROS:
- case LogicalType::TIME_MILLIS:
- case LogicalType::TIMESTAMP_MICROS:
- case LogicalType::TIMESTAMP_MILLIS:
- return SortOrder::SIGNED;
- case LogicalType::UINT_8:
- case LogicalType::UINT_16:
- case LogicalType::UINT_32:
- case LogicalType::UINT_64:
- case LogicalType::ENUM:
- case LogicalType::UTF8:
- case LogicalType::BSON:
- case LogicalType::JSON:
- return SortOrder::UNSIGNED;
- case LogicalType::NA:
- case LogicalType::DECIMAL:
- case LogicalType::LIST:
- case LogicalType::MAP:
- case LogicalType::MAP_KEY_VALUE:
- case LogicalType::INTERVAL:
- case LogicalType::NONE: // required instead of default
- return SortOrder::UNKNOWN;
- }
- return SortOrder::UNKNOWN;
-}
+const ApplicationVersion ApplicationVersion::PARQUET_CPP_FIXED_STATS_VERSION =
+ ApplicationVersion("parquet-cpp version 1.3.0");
template <typename DType>
static std::shared_ptr<RowGroupStatistics> MakeTypedColumnStats(
const format::ColumnMetaData& metadata, const ColumnDescriptor* descr) {
+ // If new fields max_value/min_value are set, then return them.
+ if (metadata.statistics.__isset.max_value || metadata.statistics.__isset.min_value) {
+ return std::make_shared<TypedRowGroupStatistics<DType>>(
+ descr, metadata.statistics.min_value, metadata.statistics.max_value,
+ metadata.num_values - metadata.statistics.null_count,
+ metadata.statistics.null_count, metadata.statistics.distinct_count, true);
+ }
+ // Default behavior
return std::make_shared<TypedRowGroupStatistics<DType>>(
descr, metadata.statistics.min, metadata.statistics.max,
metadata.num_values - metadata.statistics.null_count,
@@ -159,9 +117,7 @@
inline bool is_stats_set() const {
DCHECK(writer_version_ != nullptr);
return column_->meta_data.__isset.statistics &&
- writer_version_->HasCorrectStatistics(type()) &&
- SortOrder::SIGNED ==
- get_sort_order(descr_->logical_type(), descr_->physical_type());
+ writer_version_->HasCorrectStatistics(type(), descr_->sort_order());
}
inline std::shared_ptr<RowGroupStatistics> statistics() const {
@@ -534,15 +490,21 @@
// Reference:
// parquet-mr/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
// PARQUET-686 has more disussion on statistics
-bool ApplicationVersion::HasCorrectStatistics(Type::type col_type) const {
- // None of the current tools write INT96 Statistics correctly
- if (col_type == Type::INT96) return false;
+bool ApplicationVersion::HasCorrectStatistics(Type::type col_type,
+ SortOrder::type sort_order) const {
+ // Parquet cpp version 1.3.0 onwards stats are computed correctly for all types
+ if ((application_ != "parquet-cpp") || (VersionLt(PARQUET_CPP_FIXED_STATS_VERSION))) {
+ // Only SIGNED are valid
+ if (SortOrder::SIGNED != sort_order) return false;
- // Statistics of other types are OK
- if (col_type != Type::FIXED_LEN_BYTE_ARRAY && col_type != Type::BYTE_ARRAY) {
- return true;
+ // None of the current tools write INT96 Statistics correctly
+ if (col_type == Type::INT96) return false;
+
+ // Statistics of other types are OK
+ if (col_type != Type::FIXED_LEN_BYTE_ARRAY && col_type != Type::BYTE_ARRAY) {
+ return true;
+ }
}
-
// created_by is not populated, which could have been caused by
// parquet-mr during the same time as PARQUET-251, see PARQUET-297
if (application_ == "unknown") {
@@ -577,16 +539,24 @@
void set_file_path(const std::string& val) { column_chunk_->__set_file_path(val); }
// column metadata
- void SetStatistics(const EncodedStatistics& val) {
+ void SetStatistics(bool is_signed, const EncodedStatistics& val) {
format::Statistics stats;
stats.null_count = val.null_count;
stats.distinct_count = val.distinct_count;
- stats.max = val.max();
- stats.min = val.min();
- stats.__isset.min = val.has_min;
- stats.__isset.max = val.has_max;
+ stats.max_value = val.max();
+ stats.min_value = val.min();
+ stats.__isset.min_value = val.has_min;
+ stats.__isset.max_value = val.has_max;
stats.__isset.null_count = val.has_null_count;
stats.__isset.distinct_count = val.has_distinct_count;
+ // If the order is SIGNED, then the old min/max values must be set too.
+ // This for backward compatibility
+ if (is_signed) {
+ stats.max = val.max();
+ stats.min = val.min();
+ stats.__isset.min = val.has_min;
+ stats.__isset.max = val.has_max;
+ }
column_chunk_->meta_data.__set_statistics(stats);
}
@@ -674,8 +644,9 @@
return impl_->descr();
}
-void ColumnChunkMetaDataBuilder::SetStatistics(const EncodedStatistics& result) {
- impl_->SetStatistics(result);
+void ColumnChunkMetaDataBuilder::SetStatistics(bool is_signed,
+ const EncodedStatistics& result) {
+ impl_->SetStatistics(is_signed, result);
}
class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h
index 4250f6b..0d8e10e 100644
--- a/src/parquet/file/metadata.h
+++ b/src/parquet/file/metadata.h
@@ -35,22 +35,12 @@
using KeyValueMetadata = ::arrow::KeyValueMetadata;
-// Reference:
-// parquet-mr/parquet-hadoop/src/main/java/org/apache/parquet/
-// format/converter/ParquetMetadataConverter.java
-// Sort order for page and column statistics. Types are associated with sort
-// orders (e.g., UTF8 columns should use UNSIGNED) and column stats are
-// aggregated using a sort order. As of parquet-format version 2.3.1, the
-// order used to aggregate stats is always SIGNED and is not stored in the
-// Parquet file. These stats are discarded for types that need unsigned.
-// See PARQUET-686.
-enum SortOrder { SIGNED, UNSIGNED, UNKNOWN };
-
class ApplicationVersion {
public:
// Known Versions with Issues
static const ApplicationVersion PARQUET_251_FIXED_VERSION;
static const ApplicationVersion PARQUET_816_FIXED_VERSION;
+ static const ApplicationVersion PARQUET_CPP_FIXED_STATS_VERSION;
// Regular expression for the version format
// major . minor . patch unknown - prerelease.x + build info
// Eg: 1.5.0ab-cdh5.5.0+cd
@@ -92,7 +82,8 @@
bool VersionEq(const ApplicationVersion& other_version) const;
// Checks if the Version has the correct statistics for a given column
- bool HasCorrectStatistics(Type::type primitive) const;
+ bool HasCorrectStatistics(Type::type primitive,
+ SortOrder::type sort_order = SortOrder::SIGNED) const;
};
class PARQUET_EXPORT ColumnChunkMetaData {
@@ -209,7 +200,7 @@
// Used when a dataset is spread across multiple files
void set_file_path(const std::string& path);
// column metadata
- void SetStatistics(const EncodedStatistics& stats);
+ void SetStatistics(bool is_signed, const EncodedStatistics& stats);
// get the column descriptor
const ColumnDescriptor* descr() const;
// commit the metadata
diff --git a/src/parquet/parquet_version.h.in b/src/parquet/parquet_version.h.in
index 7036d2f..db8f396 100644
--- a/src/parquet/parquet_version.h.in
+++ b/src/parquet/parquet_version.h.in
@@ -21,4 +21,4 @@
// define the parquet created by version
#define CREATED_BY_VERSION "parquet-cpp version @PARQUET_VERSION@"
-#endif // PARQUET_VERSION_H
+#endif // PARQUET_VERSION_H
diff --git a/src/parquet/schema.h b/src/parquet/schema.h
index e240b82..c6b7fbe 100644
--- a/src/parquet/schema.h
+++ b/src/parquet/schema.h
@@ -332,6 +332,10 @@
LogicalType::type logical_type() const { return primitive_node_->logical_type(); }
+ SortOrder::type sort_order() const {
+ return GetSortOrder(logical_type(), physical_type());
+ }
+
const std::string& name() const { return primitive_node_->name(); }
const std::shared_ptr<schema::ColumnPath> path() const;
diff --git a/src/parquet/statistics-test.cc b/src/parquet/statistics-test.cc
index 26352c1..d6c5205 100644
--- a/src/parquet/statistics-test.cc
+++ b/src/parquet/statistics-test.cc
@@ -194,9 +194,8 @@
}
template <typename TestType>
-typename std::vector<typename TestType::c_type>
-TestRowGroupStatistics<TestType>::GetDeepCopy(
- const std::vector<typename TestType::c_type>& values) {
+typename std::vector<typename TestType::c_type> TestRowGroupStatistics<
+ TestType>::GetDeepCopy(const std::vector<typename TestType::c_type>& values) {
return values;
}
@@ -311,6 +310,7 @@
this->TestMerge();
}
+// Statistics are restricted for few types in older parquet version
TEST(CorruptStatistics, Basics) {
ApplicationVersion version("parquet-mr version 1.8.0");
SchemaDescriptor schema;
@@ -356,5 +356,332 @@
ASSERT_FALSE(column_chunk6->is_stats_set());
}
+// Statistics for all types have no restrictions in newer parquet version
+TEST(CorrectStatistics, Basics) {
+ ApplicationVersion version("parquet-cpp version 1.3.0");
+ SchemaDescriptor schema;
+ schema::NodePtr node;
+ std::vector<schema::NodePtr> fields;
+ // Test Physical Types
+ fields.push_back(schema::PrimitiveNode::Make("col1", Repetition::OPTIONAL, Type::INT32,
+ LogicalType::NONE));
+ fields.push_back(schema::PrimitiveNode::Make("col2", Repetition::OPTIONAL,
+ Type::BYTE_ARRAY, LogicalType::NONE));
+ // Test Logical Types
+ fields.push_back(schema::PrimitiveNode::Make("col3", Repetition::OPTIONAL, Type::INT32,
+ LogicalType::DATE));
+ fields.push_back(schema::PrimitiveNode::Make("col4", Repetition::OPTIONAL, Type::INT32,
+ LogicalType::UINT_32));
+ fields.push_back(schema::PrimitiveNode::Make("col5", Repetition::OPTIONAL,
+ Type::FIXED_LEN_BYTE_ARRAY,
+ LogicalType::INTERVAL, 12));
+ fields.push_back(schema::PrimitiveNode::Make("col6", Repetition::OPTIONAL,
+ Type::BYTE_ARRAY, LogicalType::UTF8));
+ node = schema::GroupNode::Make("schema", Repetition::REQUIRED, fields);
+ schema.Init(node);
+
+ format::ColumnChunk col_chunk;
+ col_chunk.meta_data.__isset.statistics = true;
+ auto column_chunk1 = ColumnChunkMetaData::Make(
+ reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(0), &version);
+ ASSERT_TRUE(column_chunk1->is_stats_set());
+ auto column_chunk2 = ColumnChunkMetaData::Make(
+ reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(1), &version);
+ ASSERT_TRUE(column_chunk2->is_stats_set());
+ auto column_chunk3 = ColumnChunkMetaData::Make(
+ reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(2), &version);
+ ASSERT_TRUE(column_chunk3->is_stats_set());
+ auto column_chunk4 = ColumnChunkMetaData::Make(
+ reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(3), &version);
+ ASSERT_TRUE(column_chunk4->is_stats_set());
+ auto column_chunk5 = ColumnChunkMetaData::Make(
+ reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(4), &version);
+ ASSERT_TRUE(column_chunk5->is_stats_set());
+ auto column_chunk6 = ColumnChunkMetaData::Make(
+ reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(5), &version);
+ ASSERT_TRUE(column_chunk6->is_stats_set());
+}
+
+// Test SortOrder class
+static const int NUM_VALUES = 10;
+
+template <typename TestType>
+class TestStatistics : public ::testing::Test {
+ public:
+ typedef typename TestType::c_type T;
+
+ void AddNodes(std::string name) {
+ fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED,
+ TestType::type_num, LogicalType::NONE));
+ }
+
+ void SetUpSchema() {
+ stats_.resize(fields_.size());
+ values_.resize(NUM_VALUES);
+ schema_ = std::static_pointer_cast<GroupNode>(
+ GroupNode::Make("Schema", Repetition::REQUIRED, fields_));
+
+ parquet_sink_ = std::make_shared<InMemoryOutputStream>();
+ }
+
+ void SetValues();
+
+ void WriteParquet() {
+ // Add writer properties
+ parquet::WriterProperties::Builder builder;
+ builder.compression(parquet::Compression::SNAPPY);
+ builder.created_by("parquet-cpp version 1.3.0");
+ std::shared_ptr<parquet::WriterProperties> props = builder.build();
+
+ // Create a ParquetFileWriter instance
+ auto file_writer = parquet::ParquetFileWriter::Open(parquet_sink_, schema_, props);
+
+ // Append a RowGroup with a specific number of rows.
+ auto rg_writer = file_writer->AppendRowGroup(NUM_VALUES);
+
+ this->SetValues();
+
+ // Insert Values
+ for (int i = 0; i < static_cast<int>(fields_.size()); i++) {
+ auto column_writer =
+ static_cast<parquet::TypedColumnWriter<TestType>*>(rg_writer->NextColumn());
+ column_writer->WriteBatch(NUM_VALUES, nullptr, nullptr, values_.data());
+ }
+ }
+
+ void VerifyParquetStats() {
+ auto pbuffer = parquet_sink_->GetBuffer();
+
+ // Create a ParquetReader instance
+ std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
+ parquet::ParquetFileReader::Open(
+ std::make_shared<arrow::io::BufferReader>(pbuffer));
+
+ // Get the File MetaData
+ std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata();
+ std::shared_ptr<parquet::RowGroupMetaData> rg_metadata = file_metadata->RowGroup(0);
+ for (int i = 0; i < static_cast<int>(fields_.size()); i++) {
+ std::shared_ptr<parquet::ColumnChunkMetaData> cc_metadata =
+ rg_metadata->ColumnChunk(i);
+ ASSERT_EQ(stats_[i].min(), cc_metadata->statistics()->EncodeMin());
+ ASSERT_EQ(stats_[i].max(), cc_metadata->statistics()->EncodeMax());
+ }
+ }
+
+ protected:
+ std::vector<T> values_;
+ std::vector<uint8_t> values_buf_;
+ std::vector<schema::NodePtr> fields_;
+ std::shared_ptr<schema::GroupNode> schema_;
+ std::shared_ptr<InMemoryOutputStream> parquet_sink_;
+ std::vector<EncodedStatistics> stats_;
+};
+
+using CompareTestTypes = ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType,
+ DoubleType, ByteArrayType, FLBAType>;
+
+// TYPE::INT32
+template <>
+void TestStatistics<Int32Type>::AddNodes(std::string name) {
+ // UINT_32 logical type to set Unsigned Statistics
+ fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, Type::INT32,
+ LogicalType::UINT_32));
+ // INT_32 logical type to set Signed Statistics
+ fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, Type::INT32,
+ LogicalType::INT_32));
+}
+
+template <>
+void TestStatistics<Int32Type>::SetValues() {
+ for (int i = 0; i < NUM_VALUES; i++) {
+ values_[i] = i - 5; // {-5, -4, -3, -2, -1, 0, 1, 2, 3, 4};
+ }
+
+ // Write UINT32 min/max values
+ stats_[0]
+ .set_min(std::string(reinterpret_cast<const char*>(&values_[5]), sizeof(T)))
+ .set_max(std::string(reinterpret_cast<const char*>(&values_[4]), sizeof(T)));
+
+ // Write INT32 min/max values
+ stats_[1]
+ .set_min(std::string(reinterpret_cast<const char*>(&values_[0]), sizeof(T)))
+ .set_max(std::string(reinterpret_cast<const char*>(&values_[9]), sizeof(T)));
+}
+
+// TYPE::INT64
+template <>
+void TestStatistics<Int64Type>::AddNodes(std::string name) {
+ // UINT_64 logical type to set Unsigned Statistics
+ fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, Type::INT64,
+ LogicalType::UINT_64));
+ // INT_64 logical type to set Signed Statistics
+ fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, Type::INT64,
+ LogicalType::INT_64));
+}
+
+template <>
+void TestStatistics<Int64Type>::SetValues() {
+ for (int i = 0; i < NUM_VALUES; i++) {
+ values_[i] = i - 5; // {-5, -4, -3, -2, -1, 0, 1, 2, 3, 4};
+ }
+
+ // Write UINT64 min/max values
+ stats_[0]
+ .set_min(std::string(reinterpret_cast<const char*>(&values_[5]), sizeof(T)))
+ .set_max(std::string(reinterpret_cast<const char*>(&values_[4]), sizeof(T)));
+
+ // Write INT64 min/max values
+ stats_[1]
+ .set_min(std::string(reinterpret_cast<const char*>(&values_[0]), sizeof(T)))
+ .set_max(std::string(reinterpret_cast<const char*>(&values_[9]), sizeof(T)));
+}
+
+// TYPE::INT96
+template <>
+void TestStatistics<Int96Type>::AddNodes(std::string name) {
+ // INT96 physical type has only Unsigned Statistics
+ fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, Type::INT96,
+ LogicalType::NONE));
+}
+
+template <>
+void TestStatistics<Int96Type>::SetValues() {
+ for (int i = 0; i < NUM_VALUES; i++) {
+ values_[i].value[0] = i - 5; // {-5, -4, -3, -2, -1, 0, 1, 2, 3, 4};
+ values_[i].value[1] = i - 5; // {-5, -4, -3, -2, -1, 0, 1, 2, 3, 4};
+ values_[i].value[2] = i - 5; // {-5, -4, -3, -2, -1, 0, 1, 2, 3, 4};
+ }
+
+ // Write Int96 min/max values
+ stats_[0]
+ .set_min(std::string(reinterpret_cast<const char*>(&values_[5]), sizeof(T)))
+ .set_max(std::string(reinterpret_cast<const char*>(&values_[4]), sizeof(T)));
+}
+
+// TYPE::FLOAT
+template <>
+void TestStatistics<FloatType>::SetValues() {
+ for (int i = 0; i < NUM_VALUES; i++) {
+ values_[i] =
+ (i * 1.0f) - 5; // {-5.0, -4.0, -3.0, -2.0, -1.0, 0.0, 1.0, 2.0, 3.0, 4.0};
+ }
+
+ // Write Float min/max values
+ stats_[0]
+ .set_min(std::string(reinterpret_cast<const char*>(&values_[0]), sizeof(T)))
+ .set_max(std::string(reinterpret_cast<const char*>(&values_[9]), sizeof(T)));
+}
+
+// TYPE::DOUBLE
+template <>
+void TestStatistics<DoubleType>::SetValues() {
+ for (int i = 0; i < NUM_VALUES; i++) {
+ values_[i] =
+ (i * 1.0f) - 5; // {-5.0, -4.0, -3.0, -2.0, -1.0, 0.0, 1.0, 2.0, 3.0, 4.0};
+ }
+
+ // Write Double min/max values
+ stats_[0]
+ .set_min(std::string(reinterpret_cast<const char*>(&values_[0]), sizeof(T)))
+ .set_max(std::string(reinterpret_cast<const char*>(&values_[9]), sizeof(T)));
+}
+
+// TYPE::ByteArray
+template <>
+void TestStatistics<ByteArrayType>::AddNodes(std::string name) {
+ // UTF8 logical type to set Unsigned Statistics
+ fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED,
+ Type::BYTE_ARRAY, LogicalType::UTF8));
+}
+
+template <>
+void TestStatistics<ByteArrayType>::SetValues() {
+ int max_byte_array_len = 10;
+ size_t nbytes = NUM_VALUES * max_byte_array_len;
+ values_buf_.resize(nbytes);
+ std::vector<std::string> vals = {u8"c123", u8"b123", u8"a123", u8"d123", u8"e123",
+ u8"f123", u8"g123", u8"h123", u8"i123", u8"ü123"};
+
+ uint8_t* base = &values_buf_.data()[0];
+ for (int i = 0; i < NUM_VALUES; i++) {
+ memcpy(base, vals[i].c_str(), vals[i].length());
+ values_[i].ptr = base;
+ values_[i].len = static_cast<uint32_t>(vals[i].length());
+ base += vals[i].length();
+ }
+
+ // Write String min/max values
+ stats_[0]
+ .set_min(
+ std::string(reinterpret_cast<const char*>(vals[2].c_str()), vals[2].length()))
+ .set_max(
+ std::string(reinterpret_cast<const char*>(vals[9].c_str()), vals[9].length()));
+}
+
+// TYPE::FLBAArray
+template <>
+void TestStatistics<FLBAType>::AddNodes(std::string name) {
+ // FLBA has only Unsigned Statistics
+ fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED,
+ Type::FIXED_LEN_BYTE_ARRAY,
+ LogicalType::NONE, FLBA_LENGTH));
+}
+
+template <>
+void TestStatistics<FLBAType>::SetValues() {
+ size_t nbytes = NUM_VALUES * FLBA_LENGTH;
+ values_buf_.resize(nbytes);
+ char vals[NUM_VALUES][FLBA_LENGTH] = {"b12345", "a12345", "c12345", "d12345", "e12345",
+ "f12345", "g12345", "h12345", "z12345", "a12345"};
+
+ uint8_t* base = &values_buf_.data()[0];
+ for (int i = 0; i < NUM_VALUES; i++) {
+ memcpy(base, &vals[i][0], FLBA_LENGTH);
+ values_[i].ptr = base;
+ base += FLBA_LENGTH;
+ }
+
+ // Write FLBA min,max values
+ stats_[0]
+ .set_min(std::string(reinterpret_cast<const char*>(&vals[1][0]), FLBA_LENGTH))
+ .set_max(std::string(reinterpret_cast<const char*>(&vals[8][0]), FLBA_LENGTH));
+}
+
+TYPED_TEST_CASE(TestStatistics, CompareTestTypes);
+
+TYPED_TEST(TestStatistics, MinMax) {
+ this->AddNodes("Column ");
+ this->SetUpSchema();
+ this->WriteParquet();
+ this->VerifyParquetStats();
+}
+
+// Ensure UNKNOWN sort order is handled properly
+using TestStatisticsFLBA = TestStatistics<FLBAType>;
+
+TEST_F(TestStatisticsFLBA, UnknownSortOrder) {
+ this->fields_.push_back(schema::PrimitiveNode::Make(
+ "Column 0", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY, LogicalType::INTERVAL,
+ FLBA_LENGTH));
+ this->SetUpSchema();
+ this->WriteParquet();
+
+ auto pbuffer = parquet_sink_->GetBuffer();
+ // Create a ParquetReader instance
+ std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
+ parquet::ParquetFileReader::Open(
+ std::make_shared<arrow::io::BufferReader>(pbuffer));
+ // Get the File MetaData
+ std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata();
+ std::shared_ptr<parquet::RowGroupMetaData> rg_metadata = file_metadata->RowGroup(0);
+ std::shared_ptr<parquet::ColumnChunkMetaData> cc_metadata = rg_metadata->ColumnChunk(0);
+
+ // stats should not be set for UNKNOWN sort order
+ ASSERT_FALSE(cc_metadata->is_stats_set());
+}
+
+
+
+
} // namespace test
} // namespace parquet
diff --git a/src/parquet/statistics.cc b/src/parquet/statistics.cc
index 12d1f5b..dad1a9b 100644
--- a/src/parquet/statistics.cc
+++ b/src/parquet/statistics.cc
@@ -21,7 +21,6 @@
#include "parquet/encoding-internal.h"
#include "parquet/exception.h"
#include "parquet/statistics.h"
-#include "parquet/util/comparison.h"
#include "parquet/util/memory.h"
using arrow::default_memory_pool;
@@ -86,6 +85,12 @@
}
template <typename DType>
+void TypedRowGroupStatistics<DType>::SetComparator() {
+ comparator_ =
+ std::static_pointer_cast<CompareDefault<DType> >(Comparator::Make(descr_));
+}
+
+template <typename DType>
void TypedRowGroupStatistics<DType>::Reset() {
ResetCounts();
has_min_max_ = false;
@@ -102,15 +107,17 @@
// TODO: support distinct count?
if (num_not_null == 0) return;
- Compare<T> compare(descr_);
- auto batch_minmax = std::minmax_element(values, values + num_not_null, compare);
+ auto batch_minmax =
+ std::minmax_element(values, values + num_not_null, std::ref(*(this->comparator_)));
if (!has_min_max_) {
has_min_max_ = true;
Copy(*batch_minmax.first, &min_, min_buffer_.get());
Copy(*batch_minmax.second, &max_, max_buffer_.get());
} else {
- Copy(std::min(min_, *batch_minmax.first, compare), &min_, min_buffer_.get());
- Copy(std::max(max_, *batch_minmax.second, compare), &max_, max_buffer_.get());
+ Copy(std::min(min_, *batch_minmax.first, std::ref(*(this->comparator_))), &min_,
+ min_buffer_.get());
+ Copy(std::max(max_, *batch_minmax.second, std::ref(*(this->comparator_))), &max_,
+ max_buffer_.get());
}
}
@@ -128,7 +135,6 @@
// TODO: support distinct count?
if (num_not_null == 0) return;
- Compare<T> compare(descr_);
INIT_BITSET(valid_bits, static_cast<int>(valid_bits_offset));
// Find first valid entry and use that for min/max
// As (num_not_null != 0) there must be one
@@ -144,9 +150,9 @@
T max = values[i];
for (; i < length; i++) {
if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
- if (compare(values[i], min)) {
+ if ((std::ref(*(this->comparator_)))(values[i], min)) {
min = values[i];
- } else if (compare(max, values[i])) {
+ } else if ((std::ref(*(this->comparator_)))(max, values[i])) {
max = values[i];
}
}
@@ -157,8 +163,8 @@
Copy(min, &min_, min_buffer_.get());
Copy(max, &max_, max_buffer_.get());
} else {
- Copy(std::min(min_, min, compare), &min_, min_buffer_.get());
- Copy(std::max(max_, max, compare), &max_, max_buffer_.get());
+ Copy(std::min(min_, min, std::ref(*(this->comparator_))), &min_, min_buffer_.get());
+ Copy(std::max(max_, max, std::ref(*(this->comparator_))), &max_, max_buffer_.get());
}
}
@@ -185,9 +191,10 @@
return;
}
- Compare<T> compare(descr_);
- Copy(std::min(this->min_, other.min_, compare), &this->min_, min_buffer_.get());
- Copy(std::max(this->max_, other.max_, compare), &this->max_, max_buffer_.get());
+ Copy(std::min(this->min_, other.min_, std::ref(*(this->comparator_))), &this->min_,
+ min_buffer_.get());
+ Copy(std::max(this->max_, other.max_, std::ref(*(this->comparator_))), &this->max_,
+ max_buffer_.get());
}
template <typename DType>
diff --git a/src/parquet/statistics.h b/src/parquet/statistics.h
index 12d0555..b5466c0 100644
--- a/src/parquet/statistics.h
+++ b/src/parquet/statistics.h
@@ -24,6 +24,7 @@
#include "parquet/schema.h"
#include "parquet/types.h"
+#include "parquet/util/comparison.h"
#include "parquet/util/memory.h"
#include "parquet/util/visibility.h"
@@ -97,13 +98,19 @@
virtual EncodedStatistics Encode() = 0;
+ // Set the Corresponding Comparator
+ virtual void SetComparator() = 0;
+
virtual ~RowGroupStatistics() {}
Type::type physical_type() const { return descr_->physical_type(); }
protected:
const ColumnDescriptor* descr() const { return descr_; }
- void SetDescr(const ColumnDescriptor* schema) { descr_ = schema; }
+ void SetDescr(const ColumnDescriptor* schema) {
+ descr_ = schema;
+ SetComparator();
+ }
void IncrementNullCount(int64_t n) { statistics_.null_count += n; }
@@ -146,6 +153,7 @@
bool HasMinMax() const override;
void Reset() override;
+ void SetComparator() override;
void Merge(const TypedRowGroupStatistics<DType>& other);
void Update(const T* values, int64_t num_not_null, int64_t num_null);
@@ -164,6 +172,7 @@
T min_;
T max_;
::arrow::MemoryPool* pool_;
+ std::shared_ptr<CompareDefault<DType> > comparator_;
void PlainEncode(const T& src, std::string* dst);
void PlainDecode(const std::string& src, T* dst);
diff --git a/src/parquet/test-util.h b/src/parquet/test-util.h
index 356486b..3fd72f2 100644
--- a/src/parquet/test-util.h
+++ b/src/parquet/test-util.h
@@ -42,7 +42,7 @@
namespace parquet {
-static int FLBA_LENGTH = 12;
+static constexpr int FLBA_LENGTH = 12;
bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) {
return 0 == memcmp(a.ptr, b.ptr, FLBA_LENGTH);
diff --git a/src/parquet/types.cc b/src/parquet/types.cc
index 97c769b..0652c6a 100644
--- a/src/parquet/types.cc
+++ b/src/parquet/types.cc
@@ -221,4 +221,56 @@
return 0;
}
+// Return the Sort Order of the Parquet Physical Types
+SortOrder::type DefaultSortOrder(Type::type primitive) {
+ switch (primitive) {
+ case Type::BOOLEAN:
+ case Type::INT32:
+ case Type::INT64:
+ case Type::FLOAT:
+ case Type::DOUBLE:
+ return SortOrder::SIGNED;
+ case Type::BYTE_ARRAY:
+ case Type::FIXED_LEN_BYTE_ARRAY:
+ case Type::INT96: // only used for timestamp, which uses unsigned values
+ return SortOrder::UNSIGNED;
+ }
+ return SortOrder::UNKNOWN;
+}
+
+// Return the SortOrder of the Parquet Types using Logical or Physical Types
+SortOrder::type GetSortOrder(LogicalType::type converted, Type::type primitive) {
+ if (converted == LogicalType::NONE) return DefaultSortOrder(primitive);
+ switch (converted) {
+ case LogicalType::INT_8:
+ case LogicalType::INT_16:
+ case LogicalType::INT_32:
+ case LogicalType::INT_64:
+ case LogicalType::DATE:
+ case LogicalType::TIME_MICROS:
+ case LogicalType::TIME_MILLIS:
+ case LogicalType::TIMESTAMP_MICROS:
+ case LogicalType::TIMESTAMP_MILLIS:
+ return SortOrder::SIGNED;
+ case LogicalType::UINT_8:
+ case LogicalType::UINT_16:
+ case LogicalType::UINT_32:
+ case LogicalType::UINT_64:
+ case LogicalType::ENUM:
+ case LogicalType::UTF8:
+ case LogicalType::BSON:
+ case LogicalType::JSON:
+ return SortOrder::UNSIGNED;
+ case LogicalType::DECIMAL:
+ case LogicalType::LIST:
+ case LogicalType::MAP:
+ case LogicalType::MAP_KEY_VALUE:
+ case LogicalType::INTERVAL:
+ case LogicalType::NONE: // required instead of default
+ case LogicalType::NA: // required instead of default
+ return SortOrder::UNKNOWN;
+ }
+ return SortOrder::UNKNOWN;
+}
+
} // namespace parquet
diff --git a/src/parquet/types.h b/src/parquet/types.h
index 38015c4..af3a58f 100644
--- a/src/parquet/types.h
+++ b/src/parquet/types.h
@@ -116,6 +116,19 @@
enum type { DATA_PAGE, INDEX_PAGE, DICTIONARY_PAGE, DATA_PAGE_V2 };
};
+// Reference:
+// parquet-mr/parquet-hadoop/src/main/java/org/apache/parquet/
+// format/converter/ParquetMetadataConverter.java
+// Sort order for page and column statistics. Types are associated with sort
+// orders (e.g., UTF8 columns should use UNSIGNED) and column stats are
+// aggregated using a sort order. As of parquet-format version 2.3.1, the
+// order used to aggregate stats is always SIGNED and is not stored in the
+// Parquet file. These stats are discarded for types that need unsigned.
+// See PARQUET-686.
+struct SortOrder {
+ enum type { SIGNED, UNSIGNED, UNKNOWN };
+};
+
// ----------------------------------------------------------------------
struct ByteArray {
@@ -283,6 +296,11 @@
PARQUET_EXPORT int GetTypeByteSize(Type::type t);
+PARQUET_EXPORT SortOrder::type DefaultSortOrder(Type::type primitive);
+
+PARQUET_EXPORT SortOrder::type GetSortOrder(LogicalType::type converted,
+ Type::type primitive);
+
} // namespace parquet
#endif // PARQUET_TYPES_H
diff --git a/src/parquet/util/comparison-test.cc b/src/parquet/util/comparison-test.cc
index 8401983..dc915bf 100644
--- a/src/parquet/util/comparison-test.cc
+++ b/src/parquet/util/comparison-test.cc
@@ -42,48 +42,178 @@
return FLBA(ptr);
}
-TEST(Comparison, ByteArray) {
- NodePtr node = PrimitiveNode::Make("bytearray", Repetition::REQUIRED, Type::BYTE_ARRAY);
- ColumnDescriptor descr(node, 0, 0);
- Compare<parquet::ByteArray> less(&descr);
-
- std::string a = "arrange";
- std::string b = "arrangement";
- auto arr1 = ByteArrayFromString(a);
- auto arr2 = ByteArrayFromString(b);
- ASSERT_TRUE(less(arr1, arr2));
-
- a = u8"braten";
- b = u8"bügeln";
- auto arr3 = ByteArrayFromString(a);
- auto arr4 = ByteArrayFromString(b);
- // see PARQUET-686 discussion about binary comparison
- ASSERT_TRUE(!less(arr3, arr4));
-}
-
-TEST(Comparison, FLBA) {
- std::string a = "Antidisestablishmentarianism";
- std::string b = "Bundesgesundheitsministerium";
- auto arr1 = FLBAFromString(a);
- auto arr2 = FLBAFromString(b);
-
+TEST(Comparison, signedByteArray) {
NodePtr node =
- PrimitiveNode::Make("FLBA", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY,
- LogicalType::NONE, static_cast<int>(a.size()));
+ PrimitiveNode::Make("SignedByteArray", Repetition::REQUIRED, Type::BYTE_ARRAY);
ColumnDescriptor descr(node, 0, 0);
- Compare<parquet::FixedLenByteArray> less(&descr);
- ASSERT_TRUE(less(arr1, arr2));
+
+ CompareDefaultByteArray less;
+
+ std::string s1 = "12345";
+ std::string s2 = "12345678";
+ ByteArray s1ba = ByteArrayFromString(s1);
+ ByteArray s2ba = ByteArrayFromString(s2);
+ ASSERT_TRUE(less(s1ba, s2ba));
+
+ // This is case where signed comparision UTF-8 (PARQUET-686) is incorrect
+ // This example is to only check signed comparison and not UTF-8.
+ s1 = u8"bügeln";
+ s2 = u8"braten";
+ s1ba = ByteArrayFromString(s1);
+ s2ba = ByteArrayFromString(s2);
+ ASSERT_TRUE(less(s1ba, s2ba));
}
-TEST(Comparison, Int96) {
- parquet::Int96 a{{1, 41, 14}}, b{{1, 41, 42}};
-
- NodePtr node = PrimitiveNode::Make("int96", Repetition::REQUIRED, Type::INT96);
+TEST(Comparison, UnsignedByteArray) {
+ NodePtr node = PrimitiveNode::Make("UnsignedByteArray", Repetition::REQUIRED,
+ Type::BYTE_ARRAY, LogicalType::UTF8);
ColumnDescriptor descr(node, 0, 0);
- Compare<parquet::Int96> less(&descr);
+
+ // Check if UTF-8 is compared using unsigned correctly
+ CompareUnsignedByteArray uless;
+
+ std::string s1 = "arrange";
+ std::string s2 = "arrangement";
+ ByteArray s1ba = ByteArrayFromString(s1);
+ ByteArray s2ba = ByteArrayFromString(s2);
+ ASSERT_TRUE(uless(s1ba, s2ba));
+
+ // Multi-byte UTF-8 characters
+ s1 = u8"braten";
+ s2 = u8"bügeln";
+ s1ba = ByteArrayFromString(s1);
+ s2ba = ByteArrayFromString(s2);
+ ASSERT_TRUE(uless(s1ba, s2ba));
+
+ s1 = u8"ünk123456"; // ü = 252
+ s2 = u8"ănk123456"; // ă = 259
+ s1ba = ByteArrayFromString(s1);
+ s2ba = ByteArrayFromString(s2);
+ ASSERT_TRUE(uless(s1ba, s2ba));
+}
+
+TEST(Comparison, SignedFLBA) {
+ int size = 10;
+ NodePtr node = PrimitiveNode::Make("SignedFLBA", Repetition::REQUIRED,
+ Type::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE, size);
+ ColumnDescriptor descr(node, 0, 0);
+
+ CompareDefaultFLBA less(descr.type_length());
+
+ std::string s1 = "Anti123456";
+ std::string s2 = "Bunkd123456";
+ FLBA s1flba = FLBAFromString(s1);
+ FLBA s2flba = FLBAFromString(s2);
+ ASSERT_TRUE(less(s1flba, s2flba));
+
+ s1 = "Bünk123456";
+ s2 = "Bunk123456";
+ s1flba = FLBAFromString(s1);
+ s2flba = FLBAFromString(s2);
+ ASSERT_TRUE(less(s1flba, s2flba));
+}
+
+TEST(Comparison, UnsignedFLBA) {
+ int size = 10;
+ NodePtr node = PrimitiveNode::Make("UnsignedFLBA", Repetition::REQUIRED,
+ Type::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE, size);
+ ColumnDescriptor descr(node, 0, 0);
+
+ CompareUnsignedFLBA uless(descr.type_length());
+
+ std::string s1 = "Anti123456";
+ std::string s2 = "Bunkd123456";
+ FLBA s1flba = FLBAFromString(s1);
+ FLBA s2flba = FLBAFromString(s2);
+ ASSERT_TRUE(uless(s1flba, s2flba));
+
+ s1 = "Bunk123456";
+ s2 = "Bünk123456";
+ s1flba = FLBAFromString(s1);
+ s2flba = FLBAFromString(s2);
+ ASSERT_TRUE(uless(s1flba, s2flba));
+}
+
+TEST(Comparison, SignedInt96) {
+ parquet::Int96 a{{1, 41, 14}}, b{{1, 41, 42}};
+ parquet::Int96 aa{{1, 41, 14}}, bb{{1, 41, 14}};
+ parquet::Int96 aaa{{static_cast<uint32_t>(-1), 41, 14}}, bbb{{1, 41, 42}};
+
+ NodePtr node = PrimitiveNode::Make("SignedInt96", Repetition::REQUIRED, Type::INT96);
+ ColumnDescriptor descr(node, 0, 0);
+
+ CompareDefaultInt96 less;
+
ASSERT_TRUE(less(a, b));
- b.value[2] = 14;
- ASSERT_TRUE(!less(a, b) && !less(b, a));
+ ASSERT_TRUE(!less(aa, bb) && !less(bb, aa));
+ ASSERT_TRUE(less(aaa, bbb));
+}
+
+TEST(Comparison, UnsignedInt96) {
+ parquet::Int96 a{{1, 41, 14}}, b{{1, static_cast<uint32_t>(-41), 42}};
+ parquet::Int96 aa{{1, 41, 14}}, bb{{static_cast<uint32_t>(-1), 41, 14}};
+
+ NodePtr node = PrimitiveNode::Make("UnsignedInt96", Repetition::REQUIRED, Type::INT96);
+ ColumnDescriptor descr(node, 0, 0);
+
+ CompareUnsignedInt96 uless;
+
+ ASSERT_TRUE(uless(a, b));
+ ASSERT_TRUE(uless(aa, bb));
+}
+
+TEST(Comparison, SignedInt64) {
+ int64_t a = 1, b = 4;
+ int64_t aa = 1, bb = 1;
+ int64_t aaa = -1, bbb = 1;
+
+ NodePtr node = PrimitiveNode::Make("SignedInt64", Repetition::REQUIRED, Type::INT64);
+ ColumnDescriptor descr(node, 0, 0);
+
+ CompareDefaultInt64 less;
+
+ ASSERT_TRUE(less(a, b));
+ ASSERT_TRUE(!less(aa, bb) && !less(bb, aa));
+ ASSERT_TRUE(less(aaa, bbb));
+}
+
+TEST(Comparison, UnsignedInt64) {
+ uint64_t a = 1, b = 4;
+ uint64_t aa = 1, bb = 1;
+ uint64_t aaa = 1, bbb = -1;
+
+ NodePtr node = PrimitiveNode::Make("UnsignedInt64", Repetition::REQUIRED, Type::INT64);
+ ColumnDescriptor descr(node, 0, 0);
+
+ CompareUnsignedInt64 less;
+
+ ASSERT_TRUE(less(a, b));
+ ASSERT_TRUE(!less(aa, bb) && !less(bb, aa));
+ ASSERT_TRUE(less(aaa, bbb));
+}
+
+TEST(Comparison, UnsignedInt32) {
+ uint32_t a = 1, b = 4;
+ uint32_t aa = 1, bb = 1;
+ uint32_t aaa = 1, bbb = -1;
+
+ NodePtr node = PrimitiveNode::Make("UnsignedInt32", Repetition::REQUIRED, Type::INT32);
+ ColumnDescriptor descr(node, 0, 0);
+
+ CompareUnsignedInt32 less;
+
+ ASSERT_TRUE(less(a, b));
+ ASSERT_TRUE(!less(aa, bb) && !less(bb, aa));
+ ASSERT_TRUE(less(aaa, bbb));
+}
+
+TEST(Comparison, UnknownSortOrder) {
+ NodePtr node =
+ PrimitiveNode::Make("Unknown", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY,
+ LogicalType::INTERVAL, 12);
+ ColumnDescriptor descr(node, 0, 0);
+
+ ASSERT_THROW(Comparator::Make(&descr), ParquetException);
}
} // namespace test
diff --git a/src/parquet/util/comparison.cc b/src/parquet/util/comparison.cc
new file mode 100644
index 0000000..1d7bb9d
--- /dev/null
+++ b/src/parquet/util/comparison.cc
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+
+#include "parquet/exception.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+#include "parquet/util/comparison.h"
+
+namespace parquet {
+
+std::shared_ptr<Comparator> Comparator::Make(const ColumnDescriptor* descr) {
+ if (SortOrder::SIGNED == descr->sort_order()) {
+ switch (descr->physical_type()) {
+ case Type::BOOLEAN:
+ return std::make_shared<CompareDefaultBoolean>();
+ case Type::INT32:
+ return std::make_shared<CompareDefaultInt32>();
+ case Type::INT64:
+ return std::make_shared<CompareDefaultInt64>();
+ case Type::INT96:
+ return std::make_shared<CompareDefaultInt96>();
+ case Type::FLOAT:
+ return std::make_shared<CompareDefaultFloat>();
+ case Type::DOUBLE:
+ return std::make_shared<CompareDefaultDouble>();
+ case Type::BYTE_ARRAY:
+ return std::make_shared<CompareDefaultByteArray>();
+ case Type::FIXED_LEN_BYTE_ARRAY:
+ return std::make_shared<CompareDefaultFLBA>(descr->type_length());
+ default:
+ ParquetException::NYI("Signed Compare not implemented");
+ }
+ } else if (SortOrder::UNSIGNED == descr->sort_order()) {
+ switch (descr->physical_type()) {
+ case Type::INT32:
+ return std::make_shared<CompareUnsignedInt32>();
+ case Type::INT64:
+ return std::make_shared<CompareUnsignedInt64>();
+ case Type::INT96:
+ return std::make_shared<CompareUnsignedInt96>();
+ case Type::BYTE_ARRAY:
+ return std::make_shared<CompareUnsignedByteArray>();
+ case Type::FIXED_LEN_BYTE_ARRAY:
+ return std::make_shared<CompareUnsignedFLBA>(descr->type_length());
+ default:
+ ParquetException::NYI("Unsigned Compare not implemented");
+ }
+ } else {
+ throw ParquetException("UNKNOWN Sort Order");
+ }
+ return nullptr;
+}
+
+template class PARQUET_TEMPLATE_EXPORT CompareDefault<BooleanType>;
+template class PARQUET_TEMPLATE_EXPORT CompareDefault<Int32Type>;
+template class PARQUET_TEMPLATE_EXPORT CompareDefault<Int64Type>;
+template class PARQUET_TEMPLATE_EXPORT CompareDefault<Int96Type>;
+template class PARQUET_TEMPLATE_EXPORT CompareDefault<FloatType>;
+template class PARQUET_TEMPLATE_EXPORT CompareDefault<DoubleType>;
+template class PARQUET_TEMPLATE_EXPORT CompareDefault<ByteArrayType>;
+template class PARQUET_TEMPLATE_EXPORT CompareDefault<FLBAType>;
+
+} // namespace parquet
diff --git a/src/parquet/util/comparison.h b/src/parquet/util/comparison.h
index edd3df1..803f0da 100644
--- a/src/parquet/util/comparison.h
+++ b/src/parquet/util/comparison.h
@@ -20,40 +20,145 @@
#include <algorithm>
+#include "parquet/exception.h"
#include "parquet/schema.h"
#include "parquet/types.h"
namespace parquet {
-template <typename T>
-struct Compare {
- explicit Compare(const ColumnDescriptor* descr) : type_length_(descr->type_length()) {}
+class PARQUET_EXPORT Comparator {
+ public:
+ virtual ~Comparator() {}
+ static std::shared_ptr<Comparator> Make(const ColumnDescriptor* descr);
+};
- inline bool operator()(const T& a, const T& b) { return a < b; }
-
- private:
- int32_t type_length_;
+// The default comparison is SIGNED
+template <typename DType>
+class PARQUET_EXPORT CompareDefault : public Comparator {
+ public:
+ typedef typename DType::c_type T;
+ CompareDefault() {}
+ virtual ~CompareDefault() {}
+ virtual bool operator()(const T& a, const T& b) { return a < b; }
};
template <>
-inline bool Compare<Int96>::operator()(const Int96& a, const Int96& b) {
- return std::lexicographical_compare(a.value, a.value + 3, b.value, b.value + 3);
-}
+class PARQUET_EXPORT CompareDefault<Int96Type> : public Comparator {
+ public:
+ CompareDefault() {}
+ virtual ~CompareDefault() {}
+ virtual bool operator()(const Int96& a, const Int96& b) {
+ const int32_t* aptr = reinterpret_cast<const int32_t*>(&a.value[0]);
+ const int32_t* bptr = reinterpret_cast<const int32_t*>(&b.value[0]);
+ return std::lexicographical_compare(aptr, aptr + 3, bptr, bptr + 3);
+ }
+};
template <>
-inline bool Compare<ByteArray>::operator()(const ByteArray& a, const ByteArray& b) {
- auto aptr = reinterpret_cast<const int8_t*>(a.ptr);
- auto bptr = reinterpret_cast<const int8_t*>(b.ptr);
- return std::lexicographical_compare(aptr, aptr + a.len, bptr, bptr + b.len);
-}
+class PARQUET_EXPORT CompareDefault<ByteArrayType> : public Comparator {
+ public:
+ CompareDefault() {}
+ virtual ~CompareDefault() {}
+ virtual bool operator()(const ByteArray& a, const ByteArray& b) {
+ const int8_t* aptr = reinterpret_cast<const int8_t*>(a.ptr);
+ const int8_t* bptr = reinterpret_cast<const int8_t*>(b.ptr);
+ return std::lexicographical_compare(aptr, aptr + a.len, bptr, bptr + b.len);
+ }
+};
template <>
-inline bool Compare<FLBA>::operator()(const FLBA& a, const FLBA& b) {
- auto aptr = reinterpret_cast<const int8_t*>(a.ptr);
- auto bptr = reinterpret_cast<const int8_t*>(b.ptr);
- return std::lexicographical_compare(aptr, aptr + type_length_, bptr,
- bptr + type_length_);
-}
+class PARQUET_EXPORT CompareDefault<FLBAType> : public Comparator {
+ public:
+ explicit CompareDefault(int length) : type_length_(length) {}
+ virtual ~CompareDefault() {}
+ virtual bool operator()(const FLBA& a, const FLBA& b) {
+ const int8_t* aptr = reinterpret_cast<const int8_t*>(a.ptr);
+ const int8_t* bptr = reinterpret_cast<const int8_t*>(b.ptr);
+ return std::lexicographical_compare(aptr, aptr + type_length_, bptr,
+ bptr + type_length_);
+ }
+ int32_t type_length_;
+};
+
+typedef CompareDefault<BooleanType> CompareDefaultBoolean;
+typedef CompareDefault<Int32Type> CompareDefaultInt32;
+typedef CompareDefault<Int64Type> CompareDefaultInt64;
+typedef CompareDefault<Int96Type> CompareDefaultInt96;
+typedef CompareDefault<FloatType> CompareDefaultFloat;
+typedef CompareDefault<DoubleType> CompareDefaultDouble;
+typedef CompareDefault<ByteArrayType> CompareDefaultByteArray;
+typedef CompareDefault<FLBAType> CompareDefaultFLBA;
+
+#if defined(__GNUC__) && !defined(__clang__)
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wattributes"
+#endif
+
+PARQUET_EXTERN_TEMPLATE CompareDefault<BooleanType>;
+PARQUET_EXTERN_TEMPLATE CompareDefault<Int32Type>;
+PARQUET_EXTERN_TEMPLATE CompareDefault<Int64Type>;
+PARQUET_EXTERN_TEMPLATE CompareDefault<Int96Type>;
+PARQUET_EXTERN_TEMPLATE CompareDefault<FloatType>;
+PARQUET_EXTERN_TEMPLATE CompareDefault<DoubleType>;
+PARQUET_EXTERN_TEMPLATE CompareDefault<ByteArrayType>;
+PARQUET_EXTERN_TEMPLATE CompareDefault<FLBAType>;
+
+#if defined(__GNUC__) && !defined(__clang__)
+#pragma GCC diagnostic pop
+#endif
+
+// Define Unsigned Comparators
+class PARQUET_EXPORT CompareUnsignedInt32 : public CompareDefaultInt32 {
+ public:
+ virtual ~CompareUnsignedInt32() {}
+ bool operator()(const int32_t& a, const int32_t& b) override {
+ const uint32_t ua = a;
+ const uint32_t ub = b;
+ return (ua < ub);
+ }
+};
+
+class PARQUET_EXPORT CompareUnsignedInt64 : public CompareDefaultInt64 {
+ public:
+ virtual ~CompareUnsignedInt64() {}
+ bool operator()(const int64_t& a, const int64_t& b) override {
+ const uint64_t ua = a;
+ const uint64_t ub = b;
+ return (ua < ub);
+ }
+};
+
+class PARQUET_EXPORT CompareUnsignedInt96 : public CompareDefaultInt96 {
+ public:
+ virtual ~CompareUnsignedInt96() {}
+ bool operator()(const Int96& a, const Int96& b) override {
+ const uint32_t* aptr = reinterpret_cast<const uint32_t*>(&a.value[0]);
+ const uint32_t* bptr = reinterpret_cast<const uint32_t*>(&b.value[0]);
+ return std::lexicographical_compare(aptr, aptr + 3, bptr, bptr + 3);
+ }
+};
+
+class PARQUET_EXPORT CompareUnsignedByteArray : public CompareDefaultByteArray {
+ public:
+ virtual ~CompareUnsignedByteArray() {}
+ bool operator()(const ByteArray& a, const ByteArray& b) override {
+ const uint8_t* aptr = reinterpret_cast<const uint8_t*>(a.ptr);
+ const uint8_t* bptr = reinterpret_cast<const uint8_t*>(b.ptr);
+ return std::lexicographical_compare(aptr, aptr + a.len, bptr, bptr + b.len);
+ }
+};
+
+class PARQUET_EXPORT CompareUnsignedFLBA : public CompareDefaultFLBA {
+ public:
+ explicit CompareUnsignedFLBA(int length) : CompareDefaultFLBA(length) {}
+ virtual ~CompareUnsignedFLBA() {}
+ bool operator()(const FLBA& a, const FLBA& b) override {
+ const uint8_t* aptr = reinterpret_cast<const uint8_t*>(a.ptr);
+ const uint8_t* bptr = reinterpret_cast<const uint8_t*>(b.ptr);
+ return std::lexicographical_compare(aptr, aptr + type_length_, bptr,
+ bptr + type_length_);
+ }
+};
} // namespace parquet