blob: 9eb08ff103faf2940953393a0a517c628ffe2fa3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXEC_PARQUET_COLUMN_STATS_H
#define IMPALA_EXEC_PARQUET_COLUMN_STATS_H
#include <string>
#include <type_traits>
#include "exec/parquet/parquet-common.h"
#include "runtime/date-value.h"
#include "runtime/decimal-value.h"
#include "runtime/string-buffer.h"
#include "runtime/timestamp-value.h"
#include "runtime/types.h"
#include "gen-cpp/parquet_types.h"
namespace impala {
/// This class, together with its derivatives, is used to update column statistics when
/// writing parquet files. It provides an interface to populate a parquet::Statistics
/// object and attach it to an object supplied by the caller. It can also be used to
/// decode parquet::Statistics into slots.
///
/// We currently support writing the 'min_value' and 'max_value' fields in
/// parquet::Statistics. The other two statistical values - 'null_count' and
/// 'distinct_count' - are not tracked or populated. We do not populate the deprecated
/// 'min' and 'max' fields.
///
/// Regarding the ordering of values, we follow the parquet-format specification for
/// logical types (LogicalTypes.md in parquet-format):
///
/// - Numeric values (BOOLEAN, INT, FLOAT, DOUBLE, DECIMAL) are ordered by their numeric
/// value (as opposed to their binary representation).
///
/// - Strings are ordered using bytewise, unsigned comparison.
///
/// - Timestamps are compared by numerically comparing the points in time they represent.
///
/// - Dates are compared by numerically comparing the days since epoch values.
///
/// NULL values are not considered for min/max statistics, and if a column consists only
/// of NULL values, then no min/max statistics are written.
///
/// Updating the statistics is handled in derived classes to alleviate the need for
/// virtual function calls.
///
/// TODO: Populate null_count and distinct_count.
class ColumnStatsBase {
public:
/// min and max functions for types that are not floating point numbers
template <typename T, typename Enable = void>
struct MinMaxTrait {
static decltype(auto) MinValue(const T& a, const T& b) { return std::min(a, b); }
static decltype(auto) MaxValue(const T& a, const T& b) { return std::max(a, b); }
static int Compare(const T& a, const T& b) {
if (a < b) return -1;
if (a > b) return 1;
return 0;
}
};
/// min and max functions for floating point types
template <typename T>
struct MinMaxTrait<T, std::enable_if_t<std::is_floating_point<T>::value>> {
static decltype(auto) MinValue(const T& a, const T& b) { return std::fmin(a, b); }
static decltype(auto) MaxValue(const T& a, const T& b) { return std::fmax(a, b); }
static int Compare(const T& a, const T& b) {
//TODO: Should be aligned with PARQUET-1222, once resolved
if (a == b) return 0;
if (std::isnan(a) && std::isnan(b)) return 0;
if (MaxValue(a, b) == a) return 1;
return -1;
}
};
ColumnStatsBase() : has_min_max_values_(false), null_count_(0) {}
virtual ~ColumnStatsBase() {}
/// Merges this statistics object with values from 'other'. If other has not been
/// initialized, then this object will not be changed. It maintains internal state that
/// tracks whether the min/max values are ordered.
virtual void Merge(const ColumnStatsBase& other) = 0;
/// Copies the contents of this object's statistics values to internal buffers. Some
/// data types (e.g. StringValue) need to be copied at the end of processing a row
/// batch, since the batch memory will be released. Overwrite this method in derived
/// classes to provide the functionality.
virtual Status MaterializeStringValuesToInternalBuffers() WARN_UNUSED_RESULT {
return Status::OK();
}
/// Returns the number of bytes needed to encode the current statistics into a
/// parquet::Statistics object.
virtual int64_t BytesNeeded() const = 0;
/// Encodes the current values into a Statistics thrift message.
virtual void EncodeToThrift(parquet::Statistics* out) const = 0;
/// Resets the state of this object.
void Reset();
/// Update the statistics by incrementing the null_count. It is called each time a null
/// value is appended to the column or the statistics are merged.
void IncrementNullCount(int64_t count) { null_count_ += count; }
/// Returns the boundary order of the pages. That is, whether the lists of min/max
/// elements inside the ColumnIndex are ordered and if so, in which direction.
/// If both 'ascending_boundary_order_' and 'descending_boundary_order_' is true,
/// it means all elements are equal, we choose ascending order in this case.
/// If only one flag is true, or both of them is false, then we return the identified
/// ordering, or unordered.
parquet::BoundaryOrder::type GetBoundaryOrder() const {
if (ascending_boundary_order_) return parquet::BoundaryOrder::ASCENDING;
if (descending_boundary_order_) return parquet::BoundaryOrder::DESCENDING;
return parquet::BoundaryOrder::UNORDERED;
}
protected:
// Copies the memory of 'value' into 'buffer' and make 'value' point to 'buffer'.
// 'buffer' is reset before making the copy.
static Status CopyToBuffer(StringBuffer* buffer, StringValue* value) WARN_UNUSED_RESULT;
/// Stores whether the min and max values of the current object have been initialized.
bool has_min_max_values_;
// Number of null values since the last call to Reset().
int64_t null_count_;
// If true, min/max values are ascending.
// We assume the values are ascending, so start with true and only make it false when
// we find a descending value. If not all values are equal, then at least one of
// 'ascending_boundary_order_' and 'descending_boundary_order_' will be false.
bool ascending_boundary_order_ = true;
// If true, min/max values are descending.
// See description of 'ascending_boundary_order_'.
bool descending_boundary_order_ = true;
};
/// This class contains behavior specific to our in-memory formats for different types.
template <typename T>
class ColumnStats : public ColumnStatsBase {
friend class ColumnStatsBase;
// We explicitly require types to be listed here in order to support column statistics.
// When adding a type here, users of this class need to ensure that the statistics
// follow the ordering semantics of parquet's min/max statistics for the new type.
// Details on how the values should be ordered can be found in the 'parquet-format'
// project in 'parquet.thrift' and 'LogicalTypes.md'.
using value_type = typename std::enable_if<
std::is_arithmetic<T>::value
|| std::is_same<bool, T>::value
|| std::is_same<StringValue, T>::value
|| std::is_same<TimestampValue, T>::value
|| std::is_same<Decimal4Value, T>::value
|| std::is_same<Decimal8Value, T>::value
|| std::is_same<Decimal16Value, T>::value
|| std::is_same<DateValue, T>::value,
T>::type;
public:
/// 'mem_pool' is used to materialize string values so that the user of this class can
/// free the memory of the original values.
/// 'plain_encoded_value_size' specifies the size of each encoded value in plain
/// encoding, -1 if the type is variable-length.
ColumnStats(MemPool* mem_pool, int plain_encoded_value_size)
: ColumnStatsBase(),
plain_encoded_value_size_(plain_encoded_value_size),
mem_pool_(mem_pool),
min_buffer_(mem_pool),
max_buffer_(mem_pool),
prev_page_min_buffer_(mem_pool),
prev_page_max_buffer_(mem_pool) {}
/// Updates the statistics based on the values min_value and max_value. If necessary,
/// initializes the statistics. It may keep a reference to either value until
/// MaterializeStringValuesToInternalBuffers() gets called.
void Update(const T& min_value, const T& max_value);
/// Wrapper to call the Update function which takes in the min_value and max_value.
void Update(const T& v) { Update(v, v); }
virtual void Merge(const ColumnStatsBase& other) override;
virtual Status MaterializeStringValuesToInternalBuffers() override {
return Status::OK();
}
virtual int64_t BytesNeeded() const override;
virtual void EncodeToThrift(parquet::Statistics* out) const override;
/// Decodes the plain encoded stats value from 'buffer' and writes the result into the
/// buffer pointed to by 'slot'. Returns true if decoding was successful, false
/// otherwise. For timestamps and dates an additional validation will be performed.
static bool DecodePlainValue(const std::string& buffer, void* slot,
parquet::Type::type parquet_type);
protected:
/// Encodes a single value using parquet's plain encoding and stores it into the binary
/// string 'out'. String values are stored without additional encoding. 'bytes_needed'
/// must be positive.
static void EncodePlainValue(const T& v, int64_t bytes_needed, std::string* out);
/// Returns the number of bytes needed to encode value 'v'.
int64_t BytesNeeded(const T& v) const;
// Size of each encoded value in plain encoding, -1 if the type is variable-length.
int plain_encoded_value_size_;
// Minimum value since the last call to Reset().
T min_value_;
// Maximum value since the last call to Reset().
T max_value_;
// Minimum value of the previous page. Need to store that to calculate boundary order.
T prev_page_min_value_;
// Maximum value of the previous page. Need to store that to calculate boundary order.
T prev_page_max_value_;
// Memory pool to allocate from when making copies of the statistics data.
MemPool* mem_pool_;
// Local buffers to copy statistics data into.
StringBuffer min_buffer_;
StringBuffer max_buffer_;
StringBuffer prev_page_min_buffer_;
StringBuffer prev_page_max_buffer_;
};
/// Class that handles the decoding of Parquet stats (min/max/null_count) for a given
/// column chunk.
class ColumnStatsReader {
public:
/// Enum to select whether to read minimum or maximum statistics. Values do not
/// correspond to fields in parquet::Statistics, but instead select between retrieving
/// the minimum or maximum value.
enum class StatsField { MIN, MAX };
ColumnStatsReader(const parquet::ColumnChunk& col_chunk, const ColumnType& col_type,
const parquet::ColumnOrder* col_order, const parquet::SchemaElement& element)
: col_chunk_(col_chunk),
col_type_(col_type),
col_order_(col_order),
element_(element) {}
/// Sets extra information that is only needed for decoding TIMESTAMP stats.
void SetTimestampDecoder(ParquetTimestampDecoder timestamp_decoder) {
timestamp_decoder_ = timestamp_decoder;
}
/// Decodes the parquet::Statistics from 'col_chunk_' and writes the value selected by
/// 'stats_field' into the buffer pointed to by 'slot', based on 'col_type_'. Returns
/// true if reading statistics for columns of type 'col_type_' is supported and decoding
/// was successful, false otherwise.
bool ReadFromThrift(StatsField stats_field, void* slot) const;
/// Read plain encoded value from a string 'encoded_value' into 'slot'.
bool ReadFromString(StatsField stats_field, const std::string& encoded_value,
void* slot) const;
// Gets the null_count statistics from the column chunk's metadata and returns
// it via an output parameter.
// Returns true if the null_count stats were read successfully, false otherwise.
bool ReadNullCountStat(int64_t* null_count) const;
/// Returns the required stats field for the given function. 'fn_name' can be 'le',
/// 'lt', 'ge', and 'gt' (i.e. binary operators <=, <, >=, >). If we want to check that
/// whether a column contains a value less than a constant, we need the minimum value of
/// the column to answer that question. And, to answer the opposite question we need the
/// maximum value. The required stats field (min/max) will be stored in 'stats_field'.
/// The function returns true on success, false otherwise.
static bool GetRequiredStatsField(const std::string& fn_name, StatsField* stats_field);
private:
/// Returns true if we support reading statistics stored in the fields 'min_value' and
/// 'max_value' in parquet::Statistics for the type 'col_type_' and the column order
/// 'col_order_'. Otherwise, returns false. If 'col_order_' is nullptr, only primitive
/// numeric types are supported.
bool CanUseStats() const;
/// Returns true if we consider statistics stored in the deprecated fields 'min' and
/// 'max' in parquet::Statistics to be correct for the type 'col_type_' and the column
/// order 'col_order_'. Otherwise, returns false.
bool CanUseDeprecatedStats() const;
/// Decodes 'stat_value' and does INT64->TimestampValue and timezone conversions if
/// necessary. Returns true if the decoding and conversions were successful.
bool DecodeTimestamp(const std::string& stat_value,
ColumnStatsReader::StatsField stats_field,
TimestampValue* slot) const;
const parquet::ColumnChunk& col_chunk_;
const ColumnType& col_type_;
const parquet::ColumnOrder* col_order_;
const parquet::SchemaElement& element_;
ParquetTimestampDecoder timestamp_decoder_;
};
} // end ns impala
#endif