blob: cbc761d88922a297b8975a472411bd765d360c4d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include <algorithm>
#include <array>
#include <cstdint>
#include <cstring>
#include <memory>
#include <vector>
#include "parquet/column_reader.h"
#include "parquet/column_writer.h"
#include "parquet/file/reader.h"
#include "parquet/file/writer.h"
#include "parquet/schema.h"
#include "parquet/statistics.h"
#include "parquet/test-specialization.h"
#include "parquet/test-util.h"
#include "parquet/thrift.h"
#include "parquet/types.h"
#include "parquet/util/memory.h"
using arrow::default_memory_pool;
using arrow::MemoryPool;
namespace parquet {
using schema::NodePtr;
using schema::PrimitiveNode;
using schema::GroupNode;
namespace test {
template <typename TestType>
class TestRowGroupStatistics : public PrimitiveTypedTest<TestType> {
public:
using T = typename TestType::c_type;
using TypedStats = TypedRowGroupStatistics<TestType>;
std::vector<T> GetDeepCopy(
const std::vector<T>&); // allocates new memory for FLBA/ByteArray
T* GetValuesPointer(std::vector<T>&);
void DeepFree(std::vector<T>&);
void TestMinMaxEncode() {
this->GenerateData(1000);
TypedStats statistics1(this->schema_.Column(0));
statistics1.Update(this->values_ptr_, this->values_.size(), 0);
std::string encoded_min = statistics1.EncodeMin();
std::string encoded_max = statistics1.EncodeMax();
TypedStats statistics2(this->schema_.Column(0), encoded_min, encoded_max,
this->values_.size(), 0, 0, true);
TypedStats statistics3(this->schema_.Column(0));
std::vector<uint8_t> valid_bits(
BitUtil::RoundUpNumBytes(static_cast<uint32_t>(this->values_.size())) + 1, 255);
statistics3.UpdateSpaced(
this->values_ptr_, valid_bits.data(), 0, this->values_.size(), 0);
std::string encoded_min_spaced = statistics3.EncodeMin();
std::string encoded_max_spaced = statistics3.EncodeMax();
ASSERT_EQ(encoded_min, statistics2.EncodeMin());
ASSERT_EQ(encoded_max, statistics2.EncodeMax());
ASSERT_EQ(statistics1.min(), statistics2.min());
ASSERT_EQ(statistics1.max(), statistics2.max());
ASSERT_EQ(encoded_min_spaced, statistics2.EncodeMin());
ASSERT_EQ(encoded_max_spaced, statistics2.EncodeMax());
ASSERT_EQ(statistics3.min(), statistics2.min());
ASSERT_EQ(statistics3.max(), statistics2.max());
}
void TestReset() {
this->GenerateData(1000);
TypedStats statistics(this->schema_.Column(0));
statistics.Update(this->values_ptr_, this->values_.size(), 0);
ASSERT_EQ(this->values_.size(), statistics.num_values());
statistics.Reset();
ASSERT_EQ(0, statistics.null_count());
ASSERT_EQ(0, statistics.num_values());
ASSERT_EQ("", statistics.EncodeMin());
ASSERT_EQ("", statistics.EncodeMax());
}
void TestMerge() {
int num_null[2];
random_numbers(2, 42, 0, 100, num_null);
TypedStats statistics1(this->schema_.Column(0));
this->GenerateData(1000);
statistics1.Update(
this->values_ptr_, this->values_.size() - num_null[0], num_null[0]);
TypedStats statistics2(this->schema_.Column(0));
this->GenerateData(1000);
statistics2.Update(
this->values_ptr_, this->values_.size() - num_null[1], num_null[1]);
TypedStats total(this->schema_.Column(0));
total.Merge(statistics1);
total.Merge(statistics2);
ASSERT_EQ(num_null[0] + num_null[1], total.null_count());
ASSERT_EQ(this->values_.size() * 2 - num_null[0] - num_null[1], total.num_values());
ASSERT_EQ(total.min(), std::min(statistics1.min(), statistics2.min()));
ASSERT_EQ(total.max(), std::max(statistics1.max(), statistics2.max()));
}
void TestFullRoundtrip(int64_t num_values, int64_t null_count) {
this->GenerateData(num_values);
// compute statistics for the whole batch
TypedStats expected_stats(this->schema_.Column(0));
expected_stats.Update(this->values_ptr_, num_values - null_count, null_count);
auto sink = std::make_shared<InMemoryOutputStream>();
auto gnode = std::static_pointer_cast<GroupNode>(this->node_);
std::shared_ptr<WriterProperties> writer_properties =
WriterProperties::Builder().enable_statistics("column")->build();
auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties);
auto row_group_writer = file_writer->AppendRowGroup(num_values);
auto column_writer =
static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
// simulate the case when data comes from multiple buffers,
// in which case special care is necessary for FLBA/ByteArray types
for (int i = 0; i < 2; i++) {
int64_t batch_num_values = i ? num_values - num_values / 2 : num_values / 2;
int64_t batch_null_count = i ? null_count : 0;
DCHECK(null_count <= num_values); // avoid too much headache
std::vector<int16_t> definition_levels(batch_null_count, 0);
definition_levels.insert(
definition_levels.end(), batch_num_values - batch_null_count, 1);
auto beg = this->values_.begin() + i * num_values / 2;
auto end = beg + batch_num_values;
std::vector<T> batch = GetDeepCopy(std::vector<T>(beg, end));
T* batch_values_ptr = GetValuesPointer(batch);
column_writer->WriteBatch(
batch_num_values, definition_levels.data(), nullptr, batch_values_ptr);
DeepFree(batch);
}
column_writer->Close();
row_group_writer->Close();
file_writer->Close();
auto buffer = sink->GetBuffer();
auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
auto file_reader = ParquetFileReader::Open(source);
auto rg_reader = file_reader->RowGroup(0);
auto column_chunk = rg_reader->metadata()->ColumnChunk(0);
if (!column_chunk->is_stats_set()) return;
std::shared_ptr<RowGroupStatistics> stats = column_chunk->statistics();
// check values after serialization + deserialization
ASSERT_EQ(null_count, stats->null_count());
ASSERT_EQ(num_values - null_count, stats->num_values());
ASSERT_EQ(expected_stats.EncodeMin(), stats->EncodeMin());
ASSERT_EQ(expected_stats.EncodeMax(), stats->EncodeMax());
}
};
template <typename TestType>
typename TestType::c_type* TestRowGroupStatistics<TestType>::GetValuesPointer(
std::vector<typename TestType::c_type>& values) {
return values.data();
}
template <>
bool* TestRowGroupStatistics<BooleanType>::GetValuesPointer(std::vector<bool>& values) {
static std::vector<uint8_t> bool_buffer;
bool_buffer.clear();
bool_buffer.resize(values.size());
std::copy(values.begin(), values.end(), bool_buffer.begin());
return reinterpret_cast<bool*>(bool_buffer.data());
}
template <typename TestType>
typename std::vector<typename TestType::c_type>
TestRowGroupStatistics<TestType>::GetDeepCopy(
const std::vector<typename TestType::c_type>& values) {
return values;
}
template <>
std::vector<FLBA> TestRowGroupStatistics<FLBAType>::GetDeepCopy(
const std::vector<FLBA>& values) {
std::vector<FLBA> copy;
MemoryPool* pool = ::arrow::default_memory_pool();
for (const FLBA& flba : values) {
uint8_t* ptr;
PARQUET_THROW_NOT_OK(pool->Allocate(FLBA_LENGTH, &ptr));
memcpy(ptr, flba.ptr, FLBA_LENGTH);
copy.emplace_back(ptr);
}
return copy;
}
template <>
std::vector<ByteArray> TestRowGroupStatistics<ByteArrayType>::GetDeepCopy(
const std::vector<ByteArray>& values) {
std::vector<ByteArray> copy;
MemoryPool* pool = default_memory_pool();
for (const ByteArray& ba : values) {
uint8_t* ptr;
PARQUET_THROW_NOT_OK(pool->Allocate(ba.len, &ptr));
memcpy(ptr, ba.ptr, ba.len);
copy.emplace_back(ba.len, ptr);
}
return copy;
}
template <typename TestType>
void TestRowGroupStatistics<TestType>::DeepFree(
std::vector<typename TestType::c_type>& values) {}
template <>
void TestRowGroupStatistics<FLBAType>::DeepFree(std::vector<FLBA>& values) {
MemoryPool* pool = default_memory_pool();
for (FLBA& flba : values) {
auto ptr = const_cast<uint8_t*>(flba.ptr);
memset(ptr, 0, FLBA_LENGTH);
pool->Free(ptr, FLBA_LENGTH);
}
}
template <>
void TestRowGroupStatistics<ByteArrayType>::DeepFree(std::vector<ByteArray>& values) {
MemoryPool* pool = default_memory_pool();
for (ByteArray& ba : values) {
auto ptr = const_cast<uint8_t*>(ba.ptr);
memset(ptr, 0, ba.len);
pool->Free(ptr, ba.len);
}
}
template <>
void TestRowGroupStatistics<ByteArrayType>::TestMinMaxEncode() {
this->GenerateData(1000);
// Test that we encode min max strings correctly
TypedRowGroupStatistics<ByteArrayType> statistics1(this->schema_.Column(0));
statistics1.Update(this->values_ptr_, this->values_.size(), 0);
std::string encoded_min = statistics1.EncodeMin();
std::string encoded_max = statistics1.EncodeMax();
// encoded is same as unencoded
ASSERT_EQ(encoded_min,
std::string((const char*)statistics1.min().ptr, statistics1.min().len));
ASSERT_EQ(encoded_max,
std::string((const char*)statistics1.max().ptr, statistics1.max().len));
TypedRowGroupStatistics<ByteArrayType> statistics2(this->schema_.Column(0), encoded_min,
encoded_max, this->values_.size(), 0, 0, true);
ASSERT_EQ(encoded_min, statistics2.EncodeMin());
ASSERT_EQ(encoded_max, statistics2.EncodeMax());
ASSERT_EQ(statistics1.min(), statistics2.min());
ASSERT_EQ(statistics1.max(), statistics2.max());
}
using TestTypes = ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
ByteArrayType, FLBAType, BooleanType>;
TYPED_TEST_CASE(TestRowGroupStatistics, TestTypes);
TYPED_TEST(TestRowGroupStatistics, MinMaxEncode) {
this->SetUpSchema(Repetition::REQUIRED);
this->TestMinMaxEncode();
}
TYPED_TEST(TestRowGroupStatistics, Reset) {
this->SetUpSchema(Repetition::OPTIONAL);
this->TestReset();
}
TYPED_TEST(TestRowGroupStatistics, FullRoundtrip) {
this->SetUpSchema(Repetition::OPTIONAL);
this->TestFullRoundtrip(100, 31);
this->TestFullRoundtrip(1000, 415);
this->TestFullRoundtrip(10000, 926);
}
template <typename TestType>
class TestNumericRowGroupStatistics : public TestRowGroupStatistics<TestType> {};
using NumericTypes = ::testing::Types<Int32Type, Int64Type, FloatType, DoubleType>;
TYPED_TEST_CASE(TestNumericRowGroupStatistics, NumericTypes);
TYPED_TEST(TestNumericRowGroupStatistics, Merge) {
this->SetUpSchema(Repetition::OPTIONAL);
this->TestMerge();
}
TEST(CorruptStatistics, Basics) {
ApplicationVersion version("parquet-mr version 1.8.0");
SchemaDescriptor schema;
schema::NodePtr node;
std::vector<schema::NodePtr> fields;
// Test Physical Types
fields.push_back(schema::PrimitiveNode::Make(
"col1", Repetition::OPTIONAL, Type::INT32, LogicalType::NONE));
fields.push_back(schema::PrimitiveNode::Make(
"col2", Repetition::OPTIONAL, Type::BYTE_ARRAY, LogicalType::NONE));
// Test Logical Types
fields.push_back(schema::PrimitiveNode::Make(
"col3", Repetition::OPTIONAL, Type::INT32, LogicalType::DATE));
fields.push_back(schema::PrimitiveNode::Make(
"col4", Repetition::OPTIONAL, Type::INT32, LogicalType::UINT_32));
fields.push_back(schema::PrimitiveNode::Make("col5", Repetition::OPTIONAL,
Type::FIXED_LEN_BYTE_ARRAY, LogicalType::INTERVAL, 12));
fields.push_back(schema::PrimitiveNode::Make(
"col6", Repetition::OPTIONAL, Type::BYTE_ARRAY, LogicalType::UTF8));
node = schema::GroupNode::Make("schema", Repetition::REQUIRED, fields);
schema.Init(node);
format::ColumnChunk col_chunk;
col_chunk.meta_data.__isset.statistics = true;
auto column_chunk1 = ColumnChunkMetaData::Make(
reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(0), &version);
ASSERT_TRUE(column_chunk1->is_stats_set());
auto column_chunk2 = ColumnChunkMetaData::Make(
reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(1), &version);
ASSERT_FALSE(column_chunk2->is_stats_set());
auto column_chunk3 = ColumnChunkMetaData::Make(
reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(2), &version);
ASSERT_TRUE(column_chunk3->is_stats_set());
auto column_chunk4 = ColumnChunkMetaData::Make(
reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(3), &version);
ASSERT_FALSE(column_chunk4->is_stats_set());
auto column_chunk5 = ColumnChunkMetaData::Make(
reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(4), &version);
ASSERT_FALSE(column_chunk5->is_stats_set());
auto column_chunk6 = ColumnChunkMetaData::Make(
reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(5), &version);
ASSERT_FALSE(column_chunk6->is_stats_set());
}
} // namespace test
} // namespace parquet