blob: 82368e98082408fd540d442566f404df68e8f887 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXEC_PARQUET_COLUMN_STATS_INLINE_H
#define IMPALA_EXEC_PARQUET_COLUMN_STATS_INLINE_H
#include "exec/parquet/parquet-common.h"
#include "gen-cpp/parquet_types.h"
#include "parquet-column-stats.h"
#include "runtime/string-value.inline.h"
namespace impala {
inline void ColumnStatsBase::Reset() {
has_min_max_values_ = false;
null_count_ = 0;
ascending_boundary_order_ = true;
descending_boundary_order_ = true;
}
template <typename T>
inline void ColumnStats<T>::Update(const T& min_value, const T& max_value) {
if (!has_min_max_values_) {
has_min_max_values_ = true;
min_value_ = min_value;
max_value_ = max_value;
} else {
min_value_ = MinMaxTrait<T>::MinValue(min_value_, min_value);
max_value_ = MinMaxTrait<T>::MaxValue(max_value_, max_value);
}
}
template <typename T>
inline void ColumnStats<T>::Merge(const ColumnStatsBase& other) {
DCHECK(dynamic_cast<const ColumnStats<T>*>(&other));
const ColumnStats<T>* cs = static_cast<const ColumnStats<T>*>(&other);
if (cs->has_min_max_values_) {
if (has_min_max_values_) {
if (ascending_boundary_order_) {
if (MinMaxTrait<T>::Compare(prev_page_max_value_, cs->max_value_) > 0 ||
MinMaxTrait<T>::Compare(prev_page_min_value_, cs->min_value_) > 0) {
ascending_boundary_order_ = false;
}
}
if (descending_boundary_order_) {
if (MinMaxTrait<T>::Compare(prev_page_max_value_, cs->max_value_) < 0 ||
MinMaxTrait<T>::Compare(prev_page_min_value_, cs->min_value_) < 0) {
descending_boundary_order_ = false;
}
}
}
Update(cs->min_value_, cs->max_value_);
prev_page_min_value_ = cs->min_value_;
prev_page_max_value_ = cs->max_value_;
}
IncrementNullCount(cs->null_count_);
}
template <typename T>
inline int64_t ColumnStats<T>::BytesNeeded() const {
return BytesNeeded(min_value_) + BytesNeeded(max_value_)
+ ParquetPlainEncoder::ByteSize(null_count_);
}
template <typename T>
inline void ColumnStats<T>::EncodeToThrift(parquet::Statistics* out) const {
if (has_min_max_values_) {
std::string min_str;
EncodePlainValue(min_value_, BytesNeeded(min_value_), &min_str);
out->__set_min_value(move(min_str));
std::string max_str;
EncodePlainValue(max_value_, BytesNeeded(max_value_), &max_str);
out->__set_max_value(move(max_str));
}
out->__set_null_count(null_count_);
}
template <typename T>
inline void ColumnStats<T>::EncodePlainValue(
const T& v, int64_t bytes_needed, std::string* out) {
DCHECK_GT(bytes_needed, 0);
out->resize(bytes_needed);
const int64_t bytes_written = ParquetPlainEncoder::Encode(
v, bytes_needed, reinterpret_cast<uint8_t*>(&(*out)[0]));
DCHECK_EQ(bytes_needed, bytes_written);
}
template <typename T>
inline bool ColumnStats<T>::DecodePlainValue(const std::string& buffer, void* slot,
parquet::Type::type parquet_type) {
T* result = reinterpret_cast<T*>(slot);
int size = buffer.size();
const uint8_t* data = reinterpret_cast<const uint8_t*>(buffer.data());
if (ParquetPlainEncoder::DecodeByParquetType<T>(data, data + size, size, result,
parquet_type) == -1) {
return false;
}
return true;
}
template <typename T>
inline int64_t ColumnStats<T>::BytesNeeded(const T& v) const {
return plain_encoded_value_size_ < 0 ? ParquetPlainEncoder::ByteSize<T>(v) :
plain_encoded_value_size_;
}
/// Plain encoding for Boolean values is not handled by the ParquetPlainEncoder and thus
/// needs special handling here.
template <>
inline void ColumnStats<bool>::EncodePlainValue(
const bool& v, int64_t bytes_needed, std::string* out) {
char c = v;
out->assign(1, c);
}
template <>
inline bool ColumnStats<bool>::DecodePlainValue(const std::string& buffer, void* slot,
parquet::Type::type parquet_type) {
bool* result = reinterpret_cast<bool*>(slot);
DCHECK(buffer.size() == 1);
*result = (buffer[0] != 0);
return true;
}
template <>
inline int64_t ColumnStats<bool>::BytesNeeded(const bool& v) const {
return 1;
}
namespace {
/// Decodes the plain encoded stats value from 'buffer' and writes the result into
/// 'result'. Returns true if decoding was successful, false otherwise. Doesn't perform
/// any validation.
/// Used as a helper function for decoding values that need additional custom validation.
template <typename T, parquet::Type::type ParquetType>
inline bool DecodePlainValueNoValidation(const string& buffer, T* result) {
int size = buffer.size();
const uint8_t* data = reinterpret_cast<const uint8_t*>(buffer.data());
return ParquetPlainEncoder::Decode<T, ParquetType>(
data, data + size, size, result) != -1;
}
}
/// Timestamp values need validation.
template <>
inline bool ColumnStats<TimestampValue>::DecodePlainValue(
const std::string& buffer, void* slot, parquet::Type::type parquet_type) {
if (UNLIKELY(parquet_type != parquet::Type::INT96)) {
DCHECK(false);
return false;
}
TimestampValue* result = reinterpret_cast<TimestampValue*>(slot);
if (!DecodePlainValueNoValidation<TimestampValue, parquet::Type::INT96>(buffer,
result)) {
return false;
}
// We don't need to convert the value here, because it is done by the caller.
// If this function were not static, then it would be possible to store the information
// needed for timezone conversion in the object and do the conversion here.
return TimestampValue::IsValidDate(result->date());
}
/// Date values need validation.
template <>
inline bool ColumnStats<DateValue>::DecodePlainValue(
const std::string& buffer, void* slot, parquet::Type::type parquet_type) {
DateValue* result = reinterpret_cast<DateValue*>(slot);
if (!DecodePlainValueNoValidation<DateValue, parquet::Type::INT32>(buffer, result)) {
return false;
}
return result->IsValid();
}
/// parquet::Statistics stores string values directly and does not use plain encoding.
template <>
inline void ColumnStats<StringValue>::EncodePlainValue(
const StringValue& v, int64_t bytes_needed, string* out) {
out->assign(v.ptr, v.len);
}
template <>
inline bool ColumnStats<StringValue>::DecodePlainValue(
const std::string& buffer, void* slot, parquet::Type::type parquet_type) {
StringValue* result = reinterpret_cast<StringValue*>(slot);
result->ptr = const_cast<char*>(buffer.data());
result->len = buffer.size();
return true;
}
template <>
inline void ColumnStats<StringValue>::Update(
const StringValue& min_value, const StringValue& max_value) {
if (!has_min_max_values_) {
has_min_max_values_ = true;
min_value_ = min_value;
min_buffer_.Clear();
max_value_ = max_value;
max_buffer_.Clear();
} else {
if (min_value < min_value_) {
min_value_ = min_value;
min_buffer_.Clear();
}
if (max_value > max_value_) {
max_value_ = max_value;
max_buffer_.Clear();
}
}
}
template <>
inline void ColumnStats<StringValue>::Merge(const ColumnStatsBase& other) {
DCHECK(dynamic_cast<const ColumnStats<StringValue>*>(&other));
const ColumnStats<StringValue>* cs = static_cast<
const ColumnStats<StringValue>*>(&other);
if (cs->has_min_max_values_) {
if (has_min_max_values_) {
// Make sure that we copied the previous page's min/max values
// to their own buffer.
if (prev_page_min_value_.ptr != nullptr) {
DCHECK_NE(static_cast<void*>(prev_page_min_value_.ptr),
static_cast<void*>(cs->min_value_.ptr));
}
if (prev_page_max_value_.ptr != nullptr) {
DCHECK_NE(static_cast<void*>(prev_page_max_value_.ptr),
static_cast<void*>(cs->max_value_.ptr));
}
if (ascending_boundary_order_) {
if (prev_page_max_value_ > cs->max_value_ ||
prev_page_min_value_ > cs->min_value_) {
ascending_boundary_order_ = false;
}
}
if (descending_boundary_order_) {
if (prev_page_max_value_ < cs->max_value_ ||
prev_page_min_value_ < cs->min_value_) {
descending_boundary_order_ = false;
}
}
}
Update(cs->min_value_, cs->max_value_);
prev_page_min_value_ = cs->min_value_;
prev_page_max_value_ = cs->max_value_;
prev_page_min_buffer_.Clear();
prev_page_max_buffer_.Clear();
}
IncrementNullCount(cs->null_count_);
}
// StringValues need to be copied at the end of processing a row batch, since the batch
// memory will be released.
template <>
inline Status ColumnStats<StringValue>::MaterializeStringValuesToInternalBuffers() {
if (min_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&min_buffer_, &min_value_));
if (max_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&max_buffer_, &max_value_));
if (prev_page_min_buffer_.IsEmpty()) {
RETURN_IF_ERROR(CopyToBuffer(&prev_page_min_buffer_, &prev_page_min_value_));
}
if (prev_page_max_buffer_.IsEmpty()) {
RETURN_IF_ERROR(CopyToBuffer(&prev_page_max_buffer_, &prev_page_max_value_));
}
return Status::OK();
}
} // end ns impala
#endif