blob: 47b67cdb93421b51aaf1845f146b66b5a7327d5b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef STORAGE_SRC_STORAGE_FORMAT_ORC_ORC_PROTO_DEFINITION_H_
#define STORAGE_SRC_STORAGE_FORMAT_ORC_ORC_PROTO_DEFINITION_H_
#include <list>
#include <memory>
#include <sstream>
#include <string>
#include "univplan/common/statistics.h"
#include "storage/format/orc/input-stream.h"
#include "storage/format/orc/type-impl.h"
#include "storage/format/orc/vector.h"
namespace orc {
static const uint64_t ORC_COMPRESSION_BLOCK_SIZE = 256 * 1024; // 256K
enum WriterId {
ORC_JAVA_WRITER = 0,
ORC_CPP_WRITER = 1,
PRESTO_WRITER = 2,
UNKNOWN_WRITER = INT32_MAX
};
enum CompressionKind {
CompressionKind_NONE = 0,
CompressionKind_ZLIB = 1,
CompressionKind_SNAPPY = 2,
CompressionKind_LZO = 3,
CompressionKind_LZ4 = 4,
CompressionKind_ZSTD = 5,
CompressionKind_MAX = INT64_MAX
};
enum WriterVersion {
WriterVersion_ORIGINAL = 0,
WriterVersion_HIVE_8732 = 1,
WriterVersion_HIVE_4243 = 2,
WriterVersion_HIVE_12055 = 3,
WriterVersion_HIVE_13083 = 4,
WriterVersion_ORC_101 = 5,
WriterVersion_ORC_135 = 6,
WriterVersion_MAX = INT64_MAX
};
enum StreamKind {
StreamKind_PRESENT = 0,
StreamKind_DATA = 1,
StreamKind_LENGTH = 2,
StreamKind_DICTIONARY_DATA = 3,
StreamKind_DICTIONARY_COUNT = 4,
StreamKind_SECONDARY = 5,
StreamKind_ROW_INDEX = 6,
StreamKind_BLOOM_FILTER = 7
};
class ColumnStatisticsImpl : public univplan::ColumnStatistics {
public:
ColumnStatisticsImpl() { reset(); }
explicit ColumnStatisticsImpl(const proto::ColumnStatistics& stats) {
if (stats.has_numberofvalues())
valueCount = stats.numberofvalues();
else
valueCount = 0;
if (stats.has_hasnull())
hasNullValue = stats.hasnull();
else
hasNullValue = true;
}
virtual ~ColumnStatisticsImpl() {}
virtual void serialize(proto::ColumnStatistics* pb) {
assert(pb != nullptr);
pb->set_numberofvalues(valueCount);
pb->set_hasnull(hasNullValue);
}
};
class IntegerColumnStatisticsImpl : public ColumnStatisticsImpl {
private:
bool _hasStats;
int64_t minimum;
int64_t maximum;
int64_t sum;
public:
IntegerColumnStatisticsImpl() : ColumnStatisticsImpl() { resetInternal(); }
explicit IntegerColumnStatisticsImpl(const proto::ColumnStatistics& stats)
: ColumnStatisticsImpl(stats) {
if (!stats.has_intstatistics()) {
resetInternal();
} else {
const proto::IntegerStatistics& s = stats.intstatistics();
_hasStats = s.has_minimum();
if (_hasStats) {
minimum = s.minimum();
maximum = s.maximum();
sum = s.sum();
}
}
}
virtual ~IntegerColumnStatisticsImpl() {}
bool hasMinimum() const { return _hasStats; }
bool hasMaximum() const { return _hasStats; }
bool hasSum() const { return _hasStats; }
int64_t getMinimum() const {
if (_hasStats) {
return minimum;
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "Minimum is not defined.");
}
}
int64_t getMaximum() const {
if (_hasStats) {
return maximum;
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "Maximum is not defined.");
}
}
int64_t getSum() const {
if (_hasStats) {
return sum;
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "Sum is not defined.");
}
}
void updateInteger(int64_t value) {
if (!_hasStats) {
_hasStats = true;
minimum = value;
maximum = value;
} else if (value < minimum) {
minimum = value;
} else if (value > maximum) {
maximum = value;
}
sum += value;
}
void merge(const ColumnStatistics& stats) override {
ColumnStatisticsImpl::merge(stats);
const IntegerColumnStatisticsImpl* other =
dynamic_cast<const IntegerColumnStatisticsImpl*>(&stats);
assert(other != nullptr);
if (other->hasMinimum()) {
if (!_hasStats) {
_hasStats = other->hasMinimum();
minimum = other->getMinimum();
maximum = other->getMaximum();
sum = other->getSum();
} else {
if (other->getMinimum() < minimum) {
minimum = other->getMinimum();
}
if (other->getMaximum() > maximum) {
maximum = other->getMaximum();
}
sum += other->getSum();
}
}
}
void serialize(proto::ColumnStatistics* pb) override {
assert(pb != nullptr);
ColumnStatisticsImpl::serialize(pb);
proto::IntegerStatistics* stats = pb->mutable_intstatistics();
if (_hasStats) {
stats->set_minimum(minimum);
stats->set_maximum(maximum);
stats->set_sum(sum);
}
}
void reset() override {
ColumnStatisticsImpl::reset();
resetInternal();
}
private:
void resetInternal() {
_hasStats = false;
minimum = 0;
maximum = 0;
sum = 0;
}
};
class DoubleColumnStatisticsImpl : public ColumnStatisticsImpl {
private:
bool _hasStats;
double minimum;
double maximum;
double sum;
public:
DoubleColumnStatisticsImpl() : ColumnStatisticsImpl() { resetInternal(); }
explicit DoubleColumnStatisticsImpl(const proto::ColumnStatistics& stats)
: ColumnStatisticsImpl(stats) {
if (!stats.has_doublestatistics()) {
resetInternal();
} else {
const proto::DoubleStatistics& s = stats.doublestatistics();
_hasStats = s.has_minimum();
if (_hasStats) {
minimum = s.minimum();
maximum = s.maximum();
sum = s.sum();
}
}
}
virtual ~DoubleColumnStatisticsImpl() {}
bool hasMinimum() const { return _hasStats; }
bool hasMaximum() const { return _hasStats; }
bool hasSum() const { return _hasStats; }
double getMinimum() const {
if (_hasStats) {
return minimum;
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "Minimum is not defined.");
}
}
double getMaximum() const {
if (_hasStats) {
return maximum;
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "Maximum is not defined.");
}
}
double getSum() const {
if (_hasStats) {
return sum;
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "Sum is not defined.");
}
}
void updateDouble(double value) {
if (!_hasStats) {
_hasStats = true;
minimum = value;
maximum = value;
} else if (value < minimum) {
minimum = value;
} else if (value > maximum) {
maximum = value;
}
sum += value;
}
void merge(const ColumnStatistics& stats) override {
ColumnStatisticsImpl::merge(stats);
const DoubleColumnStatisticsImpl* other =
dynamic_cast<const DoubleColumnStatisticsImpl*>(&stats);
assert(other != nullptr);
if (other->hasMinimum()) {
if (!_hasStats) {
_hasStats = other->hasMinimum();
minimum = other->getMinimum();
maximum = other->getMaximum();
sum = other->getSum();
} else {
if (other->getMinimum() < minimum) {
minimum = other->getMinimum();
}
if (other->getMaximum() > maximum) {
maximum = other->getMaximum();
}
sum += other->getSum();
}
}
}
void serialize(proto::ColumnStatistics* pb) override {
assert(pb != nullptr);
ColumnStatisticsImpl::serialize(pb);
proto::DoubleStatistics* stats = pb->mutable_doublestatistics();
if (_hasStats) {
stats->set_minimum(minimum);
stats->set_maximum(maximum);
stats->set_sum(sum);
}
}
void reset() override {
ColumnStatisticsImpl::reset();
resetInternal();
}
private:
void resetInternal() {
_hasStats = false;
minimum = 0;
maximum = 0;
sum = 0;
}
};
class StringColumnStatisticsImpl : public ColumnStatisticsImpl {
private:
bool _hasStats;
std::string minimum;
std::string maximum;
int64_t totalLength;
public:
StringColumnStatisticsImpl() : ColumnStatisticsImpl() { resetInternal(); }
StringColumnStatisticsImpl(const proto::ColumnStatistics& stats,
bool correctStats)
: ColumnStatisticsImpl(stats) {
if (!stats.has_stringstatistics() || !correctStats) {
resetInternal();
} else {
const proto::StringStatistics& s = stats.stringstatistics();
_hasStats = s.has_minimum();
if (_hasStats) {
minimum = s.minimum();
maximum = s.maximum();
totalLength = s.sum();
}
}
}
virtual ~StringColumnStatisticsImpl() {}
bool hasMinimum() const { return _hasStats; }
bool hasMaximum() const { return _hasStats; }
bool hasTotalLength() const { return _hasStats; }
const char* getMinimum() const {
if (_hasStats) {
return minimum.c_str();
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "Minimum is not defined.");
}
}
const char* getMaximum() const {
if (_hasStats) {
return maximum.c_str();
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "Maximum is not defined.");
}
}
int64_t getTotalLength() const {
if (_hasStats) {
return totalLength;
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "Total length is not defined.");
}
}
void updateString(const char* buffer, uint64_t len) {
std::string text(buffer, len);
if (!_hasStats) {
_hasStats = true;
maximum = minimum = text;
} else if (minimum > text) {
minimum = text;
} else if (maximum < text) {
maximum = text;
}
totalLength += len;
}
void merge(const ColumnStatistics& stats) override {
ColumnStatisticsImpl::merge(stats);
const StringColumnStatisticsImpl* other =
dynamic_cast<const StringColumnStatisticsImpl*>(&stats);
assert(other != nullptr);
if (other->hasMinimum()) {
if (!_hasStats) {
_hasStats = other->hasMinimum();
minimum = other->getMinimum();
maximum = other->getMaximum();
totalLength = other->getTotalLength();
} else {
if (other->getMinimum() < minimum) {
minimum = other->getMinimum();
}
if (other->getMaximum() > maximum) {
maximum = other->getMaximum();
}
totalLength += other->getTotalLength();
}
}
}
void serialize(proto::ColumnStatistics* pb) override {
assert(pb != nullptr);
ColumnStatisticsImpl::serialize(pb);
proto::StringStatistics* stats = pb->mutable_stringstatistics();
if (_hasStats) {
stats->set_minimum(minimum);
stats->set_maximum(maximum);
stats->set_sum(totalLength);
}
}
void reset() override {
ColumnStatisticsImpl::reset();
resetInternal();
}
private:
void resetInternal() {
_hasStats = false;
minimum.clear();
maximum.clear();
totalLength = 0;
}
};
class StreamInformation {
public:
virtual ~StreamInformation();
virtual StreamKind getKind() const = 0;
virtual uint64_t getColumnId() const = 0;
virtual uint64_t getOffset() const = 0;
virtual uint64_t getLength() const = 0;
};
class StripeInformation {
public:
virtual ~StripeInformation();
// Get the byte offset of the start of the stripe.
// @return the bytes from the start of the file
virtual uint64_t getOffset() const = 0;
// Get the total length of the stripe in bytes.
// @return the number of bytes in the stripe
virtual uint64_t getLength() const = 0;
// Get the length of the stripe's indexes.
// @return the number of bytes in the index
virtual uint64_t getIndexLength() const = 0;
// Get the length of the stripe's data.
// @return the number of bytes in the stripe
virtual uint64_t getDataLength() const = 0;
// Get the length of the stripe's tail section, which contains its index.
// @return the number of bytes in the tail
virtual uint64_t getFooterLength() const = 0;
// Get the number of rows in the stripe.
// @return a count of the number of rows
virtual uint64_t getNumberOfRows() const = 0;
// Get the number of streams in the stripe.
virtual uint64_t getNumberOfStreams() const = 0;
// Get the StreamInformation for the given stream.
virtual std::unique_ptr<StreamInformation> getStreamInformation(
uint64_t streamId) const = 0;
// Get the dictionary size.
// @param colId the columnId
// @return the size of the dictionary or 0 if there isn't one
virtual uint64_t getDictionarySize(uint64_t colId) const = 0;
// Get the writer timezone.
virtual const std::string& getWriterTimezone() const = 0;
};
class BinaryColumnStatisticsImpl : public ColumnStatisticsImpl {
public:
BinaryColumnStatisticsImpl() { resetInternal(); }
BinaryColumnStatisticsImpl(const proto::ColumnStatistics& stats,
bool correctStats)
: ColumnStatisticsImpl(stats) {
if (!stats.has_binarystatistics() || !correctStats) {
resetInternal();
} else {
const proto::BinaryStatistics& s = stats.binarystatistics();
_hasStats = s.has_sum();
if (_hasStats) {
totalLength = s.sum();
}
}
}
virtual ~BinaryColumnStatisticsImpl() {}
bool hasTotalLength() const { return _hasStats; }
uint64_t getTotalLength() const {
if (_hasStats) {
return totalLength;
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "Total length is not defined.");
}
}
void update(size_t length) {
_hasStats = true;
totalLength += length;
}
void merge(const ColumnStatistics& stats) override {
ColumnStatisticsImpl::merge(stats);
const BinaryColumnStatisticsImpl* other =
dynamic_cast<const BinaryColumnStatisticsImpl*>(&stats);
assert(other != nullptr);
if (other->hasTotalLength()) {
if (!_hasStats) {
_hasStats = other->hasTotalLength();
totalLength = other->getTotalLength();
} else {
totalLength += other->getTotalLength();
}
}
}
void serialize(proto::ColumnStatistics* pb) override {
assert(pb != nullptr);
ColumnStatisticsImpl::serialize(pb);
proto::BinaryStatistics* binStats = pb->mutable_binarystatistics();
if (_hasStats) {
binStats->set_sum(totalLength);
}
}
private:
void resetInternal() {
ColumnStatisticsImpl::reset();
_hasStats = false;
totalLength = 0;
}
bool _hasStats;
int64_t totalLength;
};
class BooleanColumnStatisticsImpl : public ColumnStatisticsImpl {
private:
bool _hasCount;
uint64_t trueCount;
public:
BooleanColumnStatisticsImpl() : ColumnStatisticsImpl() { resetInternal(); }
BooleanColumnStatisticsImpl(const proto::ColumnStatistics& stats,
bool correctStats)
: ColumnStatisticsImpl(stats) {
if (!stats.has_bucketstatistics() || !correctStats ||
stats.bucketstatistics().count_size() == 0) {
resetInternal();
} else {
_hasCount = true;
trueCount = stats.bucketstatistics().count(0);
}
}
virtual ~BooleanColumnStatisticsImpl() {}
bool hasCount() const { return _hasCount; }
uint64_t getFalseCount() const {
if (_hasCount) {
return valueCount - trueCount;
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "False count is not defined.");
}
}
uint64_t getTrueCount() const {
if (_hasCount) {
return trueCount;
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "True count is not defined.");
}
}
void updateBoolean(bool value) {
if (!_hasCount) _hasCount = true;
if (value) trueCount += 1;
}
void merge(const ColumnStatistics& stats) override {
ColumnStatisticsImpl::merge(stats);
const BooleanColumnStatisticsImpl* other =
dynamic_cast<const BooleanColumnStatisticsImpl*>(&stats);
assert(other != nullptr);
if (other->hasCount()) {
if (!_hasCount) {
_hasCount = true;
trueCount = other->trueCount;
} else {
trueCount += other->trueCount;
}
}
}
void serialize(proto::ColumnStatistics* pb) override {
assert(pb != nullptr);
ColumnStatisticsImpl::serialize(pb);
proto::BucketStatistics* stats = pb->mutable_bucketstatistics();
if (_hasCount) stats->add_count(trueCount);
}
void reset() override {
ColumnStatisticsImpl::reset();
resetInternal();
}
private:
void resetInternal() {
_hasCount = false;
trueCount = 0;
}
};
class DateColumnStatisticsImpl : public ColumnStatisticsImpl {
private:
bool _hasStats;
int32_t minimum;
int32_t maximum;
public:
DateColumnStatisticsImpl() : ColumnStatisticsImpl() { resetInternal(); }
DateColumnStatisticsImpl(const proto::ColumnStatistics& stats,
bool correctStats);
virtual ~DateColumnStatisticsImpl();
bool hasMinimum() const { return _hasStats; }
bool hasMaximum() const { return _hasStats; }
int32_t getMinimum() const {
if (_hasStats) {
return minimum;
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "Minimum is not defined.");
}
}
int32_t getMaximum() const {
if (_hasStats) {
return maximum;
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "Maximum is not defined.");
}
}
void updateDate(int64_t value) {
if (!_hasStats) {
_hasStats = true;
minimum = value;
maximum = value;
} else if (value < minimum) {
minimum = value;
} else if (value > maximum) {
maximum = value;
}
}
void merge(const ColumnStatistics& stats) override {
ColumnStatisticsImpl::merge(stats);
const DateColumnStatisticsImpl* other =
dynamic_cast<const DateColumnStatisticsImpl*>(&stats);
assert(other != nullptr);
if (other->hasMinimum()) {
if (!_hasStats) {
_hasStats = other->hasMinimum();
minimum = other->getMinimum();
maximum = other->getMaximum();
} else {
if (other->getMinimum() < minimum) {
minimum = other->getMinimum();
}
if (other->getMaximum() > maximum) {
maximum = other->getMaximum();
}
}
}
}
void serialize(proto::ColumnStatistics* pb) override {
assert(pb != nullptr);
ColumnStatisticsImpl::serialize(pb);
proto::DateStatistics* stats = pb->mutable_datestatistics();
if (_hasStats) {
stats->set_minimum(minimum);
stats->set_maximum(maximum);
}
}
void reset() override {
ColumnStatisticsImpl::reset();
resetInternal();
}
private:
void resetInternal() {
_hasStats = false;
minimum = 0;
maximum = 0;
}
};
class DecimalColumnStatisticsImpl : public ColumnStatisticsImpl {
private:
bool _hasMinimum;
bool _hasMaximum;
bool _hasSum;
std::string minimum;
std::string maximum;
std::string sum;
public:
DecimalColumnStatisticsImpl() : ColumnStatisticsImpl() { resetInternal(); }
DecimalColumnStatisticsImpl(const proto::ColumnStatistics& stats,
bool correctStats);
virtual ~DecimalColumnStatisticsImpl();
bool hasMinimum() const { return _hasMinimum; }
bool hasMaximum() const { return _hasMaximum; }
bool hasSum() const { return _hasSum; }
Decimal getMinimum() const {
if (_hasMinimum) {
return Decimal(minimum);
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "Minimum is not defined.");
}
}
Decimal getMaximum() const {
if (_hasMaximum) {
return Decimal(maximum);
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "Maximum is not defined.");
}
}
Decimal getSum() const {
if (_hasSum) {
return Decimal(sum);
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "Sum is not defined.");
}
}
const char* getMinimumStr() const {
if (_hasMinimum) {
return minimum.c_str();
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "Minimum is not defined.");
}
}
const char* getMaximumStr() const {
if (_hasMaximum) {
return maximum.c_str();
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "Maximum is not defined.");
}
}
const char* getSumStr() const {
if (_hasSum) {
return sum.c_str();
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "Sum is not defined.");
}
}
void updateDecimal(const orc::Decimal& value) {
if (!_hasMinimum) {
_hasMinimum = true;
_hasMaximum = true;
minimum = value.toString();
maximum = value.toString();
} else if (value < this->getMinimum()) {
minimum = value.toString();
} else if (value > this->getMaximum()) {
maximum = value.toString();
}
if (_hasSum) {
updateSum(value);
} else {
_hasSum = true;
sum = value.toString();
}
}
void merge(const ColumnStatistics& stats) override {
ColumnStatisticsImpl::merge(stats);
const DecimalColumnStatisticsImpl* other =
dynamic_cast<const DecimalColumnStatisticsImpl*>(&stats);
assert(other != nullptr);
if (other->hasMinimum()) {
if (!_hasMinimum) {
_hasMinimum = true;
_hasMaximum = true;
minimum = other->getMinimum().toString();
maximum = other->getMaximum().toString();
} else {
if (other->getMinimum() < this->getMinimum()) {
minimum = other->getMinimum().toString();
}
if (other->getMaximum() > this->getMaximum()) {
maximum = other->getMaximum().toString();
}
}
}
if (other->hasSum()) {
if (_hasSum) {
Decimal otherSum = other->getSum();
updateSum(otherSum);
} else {
_hasSum = true;
sum = other->getSum().toString();
}
}
}
void serialize(proto::ColumnStatistics* pb) override {
assert(pb != nullptr);
ColumnStatisticsImpl::serialize(pb);
proto::DecimalStatistics* stats = pb->mutable_decimalstatistics();
if (_hasMinimum) {
stats->set_minimum(minimum);
stats->set_maximum(maximum);
}
if (_hasSum) stats->set_sum(sum);
}
void reset() override {
ColumnStatisticsImpl::reset();
resetInternal();
}
private:
void updateSum(Decimal value);
void resetInternal() {
_hasMaximum = false;
_hasMinimum = false;
_hasSum = false;
minimum.clear();
maximum.clear();
sum.clear();
}
};
class TimestampColumnStatisticsImpl : public ColumnStatisticsImpl {
private:
bool _hasStats;
int64_t minimum;
int64_t maximum;
public:
TimestampColumnStatisticsImpl() : ColumnStatisticsImpl() { resetInternal(); }
TimestampColumnStatisticsImpl(const proto::ColumnStatistics& stats,
bool correctStats);
virtual ~TimestampColumnStatisticsImpl();
bool hasMinimum() const { return _hasStats; }
bool hasMaximum() const { return _hasStats; }
int64_t getMinimum() const {
if (_hasStats) {
return minimum;
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "Minimum is not defined.");
}
}
int64_t getMaximum() const {
if (_hasStats) {
return maximum;
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "Maximum is not defined.");
}
}
void updateTimestamp(const int64_t val) {
if (!_hasStats) {
_hasStats = true;
maximum = minimum = val;
} else if (minimum > val) {
minimum = val;
} else if (maximum < val) {
maximum = val;
}
}
void merge(const ColumnStatistics& stats) override {
ColumnStatisticsImpl::merge(stats);
const TimestampColumnStatisticsImpl* other =
dynamic_cast<const TimestampColumnStatisticsImpl*>(&stats);
assert(other != nullptr);
if (other->hasMinimum()) {
if (!_hasStats) {
_hasStats = other->hasMinimum();
minimum = other->getMinimum();
maximum = other->getMaximum();
} else {
if (other->getMinimum() < minimum) {
minimum = other->getMinimum();
}
if (other->getMaximum() > maximum) {
maximum = other->getMaximum();
}
}
}
}
void serialize(proto::ColumnStatistics* pb) override {
assert(pb != nullptr);
ColumnStatisticsImpl::serialize(pb);
proto::TimestampStatistics* stats = pb->mutable_timestampstatistics();
if (_hasStats) {
stats->set_minimum(minimum);
stats->set_maximum(maximum);
}
}
void reset() override {
ColumnStatisticsImpl::reset();
resetInternal();
}
private:
void resetInternal() {
_hasStats = false;
minimum = 0;
maximum = 0;
}
int8_t compare(dbcommon::Timestamp ts1, dbcommon::Timestamp ts2) {
const int64_t val1 = ts1.second;
const int64_t nano1 = ts1.nanosecond;
const int64_t val2 = ts2.second;
const int64_t nano2 = ts2.nanosecond;
if (val1 == val2) {
if (nano1 > nano2)
return 1;
else if (nano1 == nano2)
return 0;
else
return -1;
} else if (val1 > val2) {
return 1;
} else {
return -1;
}
}
};
class StreamInformationImpl : public StreamInformation {
private:
StreamKind kind;
uint64_t column;
uint64_t offset;
uint64_t length;
public:
StreamInformationImpl(uint64_t _offset, const proto::Stream& stream)
: kind(static_cast<StreamKind>(stream.kind())),
column(stream.column()),
offset(_offset),
length(stream.length()) {
// PASS
}
~StreamInformationImpl() {}
StreamKind getKind() const override { return kind; }
uint64_t getColumnId() const override { return column; }
uint64_t getOffset() const override { return offset; }
uint64_t getLength() const override { return length; }
};
class StripeInformationImpl : public StripeInformation {
uint64_t offset;
uint64_t indexLength;
uint64_t dataLength;
uint64_t footerLength;
uint64_t numRows;
InputStream* stream;
dbcommon::MemoryPool& memoryPool;
CompressionKind compression;
uint64_t blockSize;
mutable std::unique_ptr<proto::StripeFooter> stripeFooter;
void ensureStripeFooterLoaded() const;
public:
StripeInformationImpl(uint64_t _offset, uint64_t _indexLength,
uint64_t _dataLength, uint64_t _footerLength,
uint64_t _numRows, InputStream* _stream,
dbcommon::MemoryPool& pool, // NOLINT
CompressionKind _compression, uint64_t _blockSize)
: offset(_offset),
indexLength(_indexLength),
dataLength(_dataLength),
footerLength(_footerLength),
numRows(_numRows),
stream(_stream),
memoryPool(pool),
compression(_compression),
blockSize(_blockSize) {
// PASS
}
virtual ~StripeInformationImpl() {
// PASS
}
uint64_t getOffset() const override { return offset; }
uint64_t getLength() const override {
return indexLength + dataLength + footerLength;
}
uint64_t getIndexLength() const override { return indexLength; }
uint64_t getDataLength() const override { return dataLength; }
uint64_t getFooterLength() const override { return footerLength; }
uint64_t getNumberOfRows() const override { return numRows; }
uint64_t getNumberOfStreams() const override {
ensureStripeFooterLoaded();
return static_cast<uint64_t>(stripeFooter->streams_size());
}
std::unique_ptr<StreamInformation> getStreamInformation(
uint64_t streamId) const override;
uint64_t getDictionarySize(uint64_t colId) const override {
ensureStripeFooterLoaded();
return static_cast<uint64_t>(
stripeFooter->columns(static_cast<int>(colId)).dictionarysize());
}
const std::string& getWriterTimezone() const override {
ensureStripeFooterLoaded();
return stripeFooter->writertimezone();
}
};
univplan::ColumnStatistics* convertColumnStatistics(
const proto::ColumnStatistics& s, bool correctStats);
std::unique_ptr<ColumnStatisticsImpl> createColumnStatistics(
const orc::Type* type);
class StatisticsImpl : public univplan::Statistics {
private:
std::list<univplan::ColumnStatistics*> colStats;
// DELIBERATELY NOT IMPLEMENTED
StatisticsImpl(const StatisticsImpl&);
StatisticsImpl& operator=(const StatisticsImpl&);
public:
StatisticsImpl(const proto::StripeStatistics& stripeStats,
bool correctStats) {
for (int i = 0; i < stripeStats.colstats_size(); i++) {
colStats.push_back(
convertColumnStatistics(stripeStats.colstats(i), correctStats));
}
}
StatisticsImpl(const proto::Footer& footer, bool correctStats) {
for (int i = 0; i < footer.statistics_size(); i++) {
colStats.push_back(
convertColumnStatistics(footer.statistics(i), correctStats));
}
}
const univplan::ColumnStatistics* getColumnStatistics(
uint32_t columnId) const override {
std::list<univplan::ColumnStatistics*>::const_iterator it =
colStats.begin();
std::advance(it, static_cast<int64_t>(columnId));
return *it;
}
virtual ~StatisticsImpl();
uint32_t getNumberOfColumns() const override {
return static_cast<uint32_t>(colStats.size());
}
};
} // namespace orc
#endif // STORAGE_SRC_STORAGE_FORMAT_ORC_ORC_PROTO_DEFINITION_H_