| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "orc/Int128.hh" |
| #include "orc/OrcFile.hh" |
| #include "orc/Reader.hh" |
| |
| #include "Adaptor.hh" |
| #include "ColumnReader.hh" |
| #include "Exceptions.hh" |
| #include "RLE.hh" |
| #include "TypeImpl.hh" |
| |
| #include "wrap/coded-stream-wrapper.h" |
| |
| #include <algorithm> |
| #include <iostream> |
| #include <limits> |
| #include <memory> |
| #include <sstream> |
| #include <string> |
| #include <vector> |
| |
| namespace orc { |
| |
| std::string compressionKindToString(CompressionKind kind) { |
| switch (static_cast<int>(kind)) { |
| case CompressionKind_NONE: |
| return "none"; |
| case CompressionKind_ZLIB: |
| return "zlib"; |
| case CompressionKind_SNAPPY: |
| return "snappy"; |
| case CompressionKind_LZO: |
| return "LZO"; |
| case CompressionKind_LZ4: |
| return "LZ4"; |
| case CompressionKind_ZSTD: |
| return "ZSTD"; |
| } |
| std::stringstream buffer; |
| buffer << "unknown - " << kind; |
| return buffer.str(); |
| } |
| |
| std::string writerVersionToString(WriterVersion version) { |
| switch (static_cast<int>(version)) { |
| case WriterVersion_ORIGINAL: |
| return "original"; |
| case WriterVersion_HIVE_8732: |
| return "HIVE-8732"; |
| case WriterVersion_HIVE_4243: |
| return "HIVE-4243"; |
| } |
| std::stringstream buffer; |
| buffer << "future - " << version; |
| return buffer.str(); |
| } |
| |
| struct ReaderOptionsPrivate { |
| bool setIndexes; |
| bool setNames; |
| std::list<uint64_t> includedColumnIndexes; |
| std::list<std::string> includedColumnNames; |
| uint64_t dataStart; |
| uint64_t dataLength; |
| uint64_t tailLocation; |
| bool throwOnHive11DecimalOverflow; |
| int32_t forcedScaleOnHive11Decimal; |
| std::ostream* errorStream; |
| MemoryPool* memoryPool; |
| std::string serializedTail; |
| |
| ReaderOptionsPrivate() { |
| setIndexes = false; |
| setNames = false; |
| dataStart = 0; |
| dataLength = std::numeric_limits<uint64_t>::max(); |
| tailLocation = std::numeric_limits<uint64_t>::max(); |
| throwOnHive11DecimalOverflow = true; |
| forcedScaleOnHive11Decimal = 6; |
| errorStream = &std::cerr; |
| memoryPool = getDefaultPool(); |
| } |
| }; |
| |
| ReaderOptions::ReaderOptions(): |
| privateBits(std::unique_ptr<ReaderOptionsPrivate> |
| (new ReaderOptionsPrivate())) { |
| // PASS |
| } |
| |
| ReaderOptions::ReaderOptions(const ReaderOptions& rhs): |
| privateBits(std::unique_ptr<ReaderOptionsPrivate> |
| (new ReaderOptionsPrivate(*(rhs.privateBits.get())))) { |
| // PASS |
| } |
| |
| ReaderOptions::ReaderOptions(ReaderOptions& rhs) { |
| // swap privateBits with rhs |
| ReaderOptionsPrivate* l = privateBits.release(); |
| privateBits.reset(rhs.privateBits.release()); |
| rhs.privateBits.reset(l); |
| } |
| |
| ReaderOptions& ReaderOptions::operator=(const ReaderOptions& rhs) { |
| if (this != &rhs) { |
| privateBits.reset(new ReaderOptionsPrivate(*(rhs.privateBits.get()))); |
| } |
| return *this; |
| } |
| |
| ReaderOptions::~ReaderOptions() { |
| // PASS |
| } |
| |
| ReaderOptions& ReaderOptions::include(const std::list<uint64_t>& include) { |
| privateBits->setIndexes = true; |
| privateBits->includedColumnIndexes.assign(include.begin(), include.end()); |
| privateBits->setNames = false; |
| privateBits->includedColumnNames.clear(); |
| return *this; |
| } |
| |
| ReaderOptions& ReaderOptions::include |
| (const std::list<std::string>& include) { |
| privateBits->setNames = true; |
| privateBits->includedColumnNames.assign(include.begin(), include.end()); |
| privateBits->setIndexes = false; |
| privateBits->includedColumnIndexes.clear(); |
| return *this; |
| } |
| |
| ReaderOptions& ReaderOptions::range(uint64_t offset, |
| uint64_t length) { |
| privateBits->dataStart = offset; |
| privateBits->dataLength = length; |
| return *this; |
| } |
| |
| ReaderOptions& ReaderOptions::setTailLocation(uint64_t offset) { |
| privateBits->tailLocation = offset; |
| return *this; |
| } |
| |
| ReaderOptions& ReaderOptions::setMemoryPool(MemoryPool& pool) { |
| privateBits->memoryPool = &pool; |
| return *this; |
| } |
| |
| ReaderOptions& ReaderOptions::setSerializedFileTail(const std::string& value |
| ) { |
| privateBits->serializedTail = value; |
| return *this; |
| } |
| |
| MemoryPool* ReaderOptions::getMemoryPool() const{ |
| return privateBits->memoryPool; |
| } |
| |
| bool ReaderOptions::getIndexesSet() const { |
| return privateBits->setIndexes; |
| } |
| |
| const std::list<uint64_t>& ReaderOptions::getInclude() const { |
| return privateBits->includedColumnIndexes; |
| } |
| |
| bool ReaderOptions::getNamesSet() const { |
| return privateBits->setNames; |
| } |
| |
| const std::list<std::string>& ReaderOptions::getIncludeNames() const { |
| return privateBits->includedColumnNames; |
| } |
| |
| uint64_t ReaderOptions::getOffset() const { |
| return privateBits->dataStart; |
| } |
| |
| uint64_t ReaderOptions::getLength() const { |
| return privateBits->dataLength; |
| } |
| |
| uint64_t ReaderOptions::getTailLocation() const { |
| return privateBits->tailLocation; |
| } |
| |
| ReaderOptions& ReaderOptions::throwOnHive11DecimalOverflow(bool shouldThrow){ |
| privateBits->throwOnHive11DecimalOverflow = shouldThrow; |
| return *this; |
| } |
| |
| bool ReaderOptions::getThrowOnHive11DecimalOverflow() const { |
| return privateBits->throwOnHive11DecimalOverflow; |
| } |
| |
| ReaderOptions& ReaderOptions::forcedScaleOnHive11Decimal(int32_t forcedScale |
| ) { |
| privateBits->forcedScaleOnHive11Decimal = forcedScale; |
| return *this; |
| } |
| |
| int32_t ReaderOptions::getForcedScaleOnHive11Decimal() const { |
| return privateBits->forcedScaleOnHive11Decimal; |
| } |
| |
| ReaderOptions& ReaderOptions::setErrorStream(std::ostream& stream) { |
| privateBits->errorStream = &stream; |
| return *this; |
| } |
| |
| std::ostream* ReaderOptions::getErrorStream() const { |
| return privateBits->errorStream; |
| } |
| |
| std::string ReaderOptions::getSerializedFileTail() const { |
| return privateBits->serializedTail; |
| } |
| |
| StreamInformation::~StreamInformation() { |
| // PASS |
| } |
| |
| StripeInformation::~StripeInformation() { |
| // PASS |
| } |
| |
| class ColumnStatisticsImpl: public ColumnStatistics { |
| private: |
| uint64_t valueCount; |
| |
| public: |
| ColumnStatisticsImpl(const proto::ColumnStatistics& stats); |
| virtual ~ColumnStatisticsImpl(); |
| |
| uint64_t getNumberOfValues() const override { |
| return valueCount; |
| } |
| |
| std::string toString() const override { |
| std::ostringstream buffer; |
| buffer << "Column has " << valueCount << " values" << std::endl; |
| return buffer.str(); |
| } |
| }; |
| |
| class BinaryColumnStatisticsImpl: public BinaryColumnStatistics { |
| private: |
| bool _hasTotalLength; |
| uint64_t valueCount; |
| uint64_t totalLength; |
| |
| public: |
| BinaryColumnStatisticsImpl(const proto::ColumnStatistics& stats, |
| bool correctStats); |
| virtual ~BinaryColumnStatisticsImpl(); |
| |
| bool hasTotalLength() const override { |
| return _hasTotalLength; |
| } |
| uint64_t getNumberOfValues() const override { |
| return valueCount; |
| } |
| |
| uint64_t getTotalLength() const override { |
| if(_hasTotalLength){ |
| return totalLength; |
| }else{ |
| throw ParseError("Total length is not defined."); |
| } |
| } |
| |
| std::string toString() const override { |
| std::ostringstream buffer; |
| buffer << "Data type: Binary" << std::endl |
| << "Values: " << valueCount << std::endl; |
| if(_hasTotalLength){ |
| buffer << "Total length: " << totalLength << std::endl; |
| }else{ |
| buffer << "Total length: not defined" << std::endl; |
| } |
| return buffer.str(); |
| } |
| }; |
| |
| class BooleanColumnStatisticsImpl: public BooleanColumnStatistics { |
| private: |
| bool _hasCount; |
| uint64_t valueCount; |
| uint64_t trueCount; |
| |
| public: |
| BooleanColumnStatisticsImpl(const proto::ColumnStatistics& stats, bool correctStats); |
| virtual ~BooleanColumnStatisticsImpl(); |
| |
| bool hasCount() const override { |
| return _hasCount; |
| } |
| |
| uint64_t getNumberOfValues() const override { |
| return valueCount; |
| } |
| |
| uint64_t getFalseCount() const override { |
| if(_hasCount){ |
| return valueCount - trueCount; |
| }else{ |
| throw ParseError("False count is not defined."); |
| } |
| } |
| |
| uint64_t getTrueCount() const override { |
| if(_hasCount){ |
| return trueCount; |
| }else{ |
| throw ParseError("True count is not defined."); |
| } |
| } |
| |
| std::string toString() const override { |
| std::ostringstream buffer; |
| buffer << "Data type: Boolean" << std::endl |
| << "Values: " << valueCount << std::endl; |
| if(_hasCount){ |
| buffer << "(true: " << trueCount << "; false: " |
| << valueCount - trueCount << ")" << std::endl; |
| } else { |
| buffer << "(true: not defined; false: not defined)" << std::endl; |
| buffer << "True and false count are not defined" << std::endl; |
| } |
| return buffer.str(); |
| } |
| }; |
| |
| class DateColumnStatisticsImpl: public DateColumnStatistics { |
| private: |
| bool _hasMinimum; |
| bool _hasMaximum; |
| uint64_t valueCount; |
| int32_t minimum; |
| int32_t maximum; |
| |
| public: |
| DateColumnStatisticsImpl(const proto::ColumnStatistics& stats, bool correctStats); |
| virtual ~DateColumnStatisticsImpl(); |
| |
| bool hasMinimum() const override { |
| return _hasMinimum; |
| } |
| |
| bool hasMaximum() const override { |
| return _hasMaximum; |
| } |
| |
| uint64_t getNumberOfValues() const override { |
| return valueCount; |
| } |
| |
| int32_t getMinimum() const override { |
| if(_hasMinimum){ |
| return minimum; |
| }else{ |
| throw ParseError("Minimum is not defined."); |
| } |
| } |
| |
| int32_t getMaximum() const override { |
| if(_hasMaximum){ |
| return maximum; |
| }else{ |
| throw ParseError("Maximum is not defined."); |
| } |
| } |
| |
| std::string toString() const override { |
| std::ostringstream buffer; |
| buffer << "Data type: Date" << std::endl |
| << "Values: " << valueCount << std::endl; |
| if(_hasMinimum){ |
| buffer << "Minimum: " << minimum << std::endl; |
| }else{ |
| buffer << "Minimum: not defined" << std::endl; |
| } |
| |
| if(_hasMaximum){ |
| buffer << "Maximum: " << maximum << std::endl; |
| }else{ |
| buffer << "Maximum: not defined" << std::endl; |
| } |
| return buffer.str(); |
| } |
| }; |
| |
| class DecimalColumnStatisticsImpl: public DecimalColumnStatistics { |
| private: |
| bool _hasMinimum; |
| bool _hasMaximum; |
| bool _hasSum; |
| uint64_t valueCount; |
| std::string minimum; |
| std::string maximum; |
| std::string sum; |
| |
| public: |
| DecimalColumnStatisticsImpl(const proto::ColumnStatistics& stats, bool correctStats); |
| virtual ~DecimalColumnStatisticsImpl(); |
| |
| bool hasMinimum() const override { |
| return _hasMinimum; |
| } |
| |
| bool hasMaximum() const override { |
| return _hasMaximum; |
| } |
| |
| bool hasSum() const override { |
| return _hasSum; |
| } |
| |
| uint64_t getNumberOfValues() const override { |
| return valueCount; |
| } |
| |
| Decimal getMinimum() const override { |
| if(_hasMinimum){ |
| return Decimal(minimum); |
| }else{ |
| throw ParseError("Minimum is not defined."); |
| } |
| } |
| |
| Decimal getMaximum() const override { |
| if(_hasMaximum){ |
| return Decimal(maximum); |
| }else{ |
| throw ParseError("Maximum is not defined."); |
| } |
| } |
| |
| Decimal getSum() const override { |
| if(_hasSum){ |
| return Decimal(sum); |
| }else{ |
| throw ParseError("Sum is not defined."); |
| } |
| } |
| |
| std::string toString() const override { |
| std::ostringstream buffer; |
| buffer << "Data type: Decimal" << std::endl |
| << "Values: " << valueCount << std::endl; |
| if(_hasMinimum){ |
| buffer << "Minimum: " << minimum << std::endl; |
| }else{ |
| buffer << "Minimum: not defined" << std::endl; |
| } |
| |
| if(_hasMaximum){ |
| buffer << "Maximum: " << maximum << std::endl; |
| }else{ |
| buffer << "Maximum: not defined" << std::endl; |
| } |
| |
| if(_hasSum){ |
| buffer << "Sum: " << sum << std::endl; |
| }else{ |
| buffer << "Sum: not defined" << std::endl; |
| } |
| |
| return buffer.str(); |
| } |
| }; |
| |
| class DoubleColumnStatisticsImpl: public DoubleColumnStatistics { |
| private: |
| bool _hasMinimum; |
| bool _hasMaximum; |
| bool _hasSum; |
| uint64_t valueCount; |
| double minimum; |
| double maximum; |
| double sum; |
| |
| public: |
| DoubleColumnStatisticsImpl(const proto::ColumnStatistics& stats); |
| virtual ~DoubleColumnStatisticsImpl(); |
| |
| bool hasMinimum() const override { |
| return _hasMinimum; |
| } |
| |
| bool hasMaximum() const override { |
| return _hasMaximum; |
| } |
| |
| bool hasSum() const override { |
| return _hasSum; |
| } |
| |
| uint64_t getNumberOfValues() const override { |
| return valueCount; |
| } |
| |
| double getMinimum() const override { |
| if(_hasMinimum){ |
| return minimum; |
| }else{ |
| throw ParseError("Minimum is not defined."); |
| } |
| } |
| |
| double getMaximum() const override { |
| if(_hasMaximum){ |
| return maximum; |
| }else{ |
| throw ParseError("Maximum is not defined."); |
| } |
| } |
| |
| double getSum() const override { |
| if(_hasSum){ |
| return sum; |
| }else{ |
| throw ParseError("Sum is not defined."); |
| } |
| } |
| |
| std::string toString() const override { |
| std::ostringstream buffer; |
| buffer << "Data type: Double" << std::endl |
| << "Values: " << valueCount << std::endl; |
| if(_hasMinimum){ |
| buffer << "Minimum: " << minimum << std::endl; |
| }else{ |
| buffer << "Minimum: not defined" << std::endl; |
| } |
| |
| if(_hasMaximum){ |
| buffer << "Maximum: " << maximum << std::endl; |
| }else{ |
| buffer << "Maximum: not defined" << std::endl; |
| } |
| |
| if(_hasSum){ |
| buffer << "Sum: " << sum << std::endl; |
| }else{ |
| buffer << "Sum: not defined" << std::endl; |
| } |
| return buffer.str(); |
| } |
| }; |
| |
| class IntegerColumnStatisticsImpl: public IntegerColumnStatistics { |
| private: |
| bool _hasMinimum; |
| bool _hasMaximum; |
| bool _hasSum; |
| uint64_t valueCount; |
| int64_t minimum; |
| int64_t maximum; |
| int64_t sum; |
| |
| public: |
| IntegerColumnStatisticsImpl(const proto::ColumnStatistics& stats); |
| virtual ~IntegerColumnStatisticsImpl(); |
| |
| bool hasMinimum() const override { |
| return _hasMinimum; |
| } |
| |
| bool hasMaximum() const override { |
| return _hasMaximum; |
| } |
| |
| bool hasSum() const override { |
| return _hasSum; |
| } |
| |
| uint64_t getNumberOfValues() const override { |
| return valueCount; |
| } |
| |
| int64_t getMinimum() const override { |
| if(_hasMinimum){ |
| return minimum; |
| }else{ |
| throw ParseError("Minimum is not defined."); |
| } |
| } |
| |
| int64_t getMaximum() const override { |
| if(_hasMaximum){ |
| return maximum; |
| }else{ |
| throw ParseError("Maximum is not defined."); |
| } |
| } |
| |
| int64_t getSum() const override { |
| if(_hasSum){ |
| return sum; |
| }else{ |
| throw ParseError("Sum is not defined."); |
| } |
| } |
| |
| std::string toString() const override { |
| std::ostringstream buffer; |
| buffer << "Data type: Integer" << std::endl |
| << "Values: " << valueCount << std::endl; |
| if(_hasMinimum){ |
| buffer << "Minimum: " << minimum << std::endl; |
| }else{ |
| buffer << "Minimum: not defined" << std::endl; |
| } |
| |
| if(_hasMaximum){ |
| buffer << "Maximum: " << maximum << std::endl; |
| }else{ |
| buffer << "Maximum: not defined" << std::endl; |
| } |
| |
| if(_hasSum){ |
| buffer << "Sum: " << sum << std::endl; |
| }else{ |
| buffer << "Sum: not defined" << std::endl; |
| } |
| return buffer.str(); |
| } |
| }; |
| |
| class StringColumnStatisticsImpl: public StringColumnStatistics { |
| private: |
| bool _hasMinimum; |
| bool _hasMaximum; |
| bool _hasTotalLength; |
| uint64_t valueCount; |
| std::string minimum; |
| std::string maximum; |
| uint64_t totalLength; |
| |
| public: |
| StringColumnStatisticsImpl(const proto::ColumnStatistics& stats, bool correctStats); |
| virtual ~StringColumnStatisticsImpl(); |
| |
| bool hasMinimum() const override { |
| return _hasMinimum; |
| } |
| |
| bool hasMaximum() const override { |
| return _hasMaximum; |
| } |
| |
| bool hasTotalLength() const override { |
| return _hasTotalLength; |
| } |
| |
| uint64_t getNumberOfValues() const override { |
| return valueCount; |
| } |
| |
| std::string getMinimum() const override { |
| if(_hasMinimum){ |
| return minimum; |
| }else{ |
| throw ParseError("Minimum is not defined."); |
| } |
| } |
| |
| std::string getMaximum() const override { |
| if(_hasMaximum){ |
| return maximum; |
| }else{ |
| throw ParseError("Maximum is not defined."); |
| } |
| } |
| |
| uint64_t getTotalLength() const override { |
| if(_hasTotalLength){ |
| return totalLength; |
| }else{ |
| throw ParseError("Total length is not defined."); |
| } |
| } |
| |
| std::string toString() const override { |
| std::ostringstream buffer; |
| buffer << "Data type: String" << std::endl |
| << "Values: " << valueCount << std::endl; |
| if(_hasMinimum){ |
| buffer << "Minimum: " << minimum << std::endl; |
| }else{ |
| buffer << "Minimum is not defined" << std::endl; |
| } |
| |
| if(_hasMaximum){ |
| buffer << "Maximum: " << maximum << std::endl; |
| }else{ |
| buffer << "Maximum is not defined" << std::endl; |
| } |
| |
| if(_hasTotalLength){ |
| buffer << "Total length: " << totalLength << std::endl; |
| }else{ |
| buffer << "Total length is not defined" << std::endl; |
| } |
| return buffer.str(); |
| } |
| }; |
| |
| class TimestampColumnStatisticsImpl: public TimestampColumnStatistics { |
| private: |
| bool _hasMinimum; |
| bool _hasMaximum; |
| uint64_t valueCount; |
| int64_t minimum; |
| int64_t maximum; |
| |
| public: |
| TimestampColumnStatisticsImpl(const proto::ColumnStatistics& stats, |
| bool correctStats); |
| virtual ~TimestampColumnStatisticsImpl(); |
| |
| bool hasMinimum() const override { |
| return _hasMinimum; |
| } |
| |
| bool hasMaximum() const override { |
| return _hasMaximum; |
| } |
| |
| uint64_t getNumberOfValues() const override { |
| return valueCount; |
| } |
| |
| int64_t getMinimum() const override { |
| if(_hasMinimum){ |
| return minimum; |
| }else{ |
| throw ParseError("Minimum is not defined."); |
| } |
| } |
| |
| int64_t getMaximum() const override { |
| if(_hasMaximum){ |
| return maximum; |
| }else{ |
| throw ParseError("Maximum is not defined."); |
| } |
| } |
| |
| std::string toString() const override { |
| std::ostringstream buffer; |
| buffer << "Data type: Timestamp" << std::endl |
| << "Values: " << valueCount << std::endl; |
| if(_hasMinimum){ |
| buffer << "Minimum: " << minimum << std::endl; |
| }else{ |
| buffer << "Minimum is not defined" << std::endl; |
| } |
| |
| if(_hasMaximum){ |
| buffer << "Maximum: " << maximum << std::endl; |
| }else{ |
| buffer << "Maximum is not defined" << std::endl; |
| } |
| return buffer.str(); |
| } |
| }; |
| |
| std::string streamKindToString(StreamKind kind) { |
| switch (static_cast<int>(kind)) { |
| case StreamKind_PRESENT: |
| return "present"; |
| case StreamKind_DATA: |
| return "data"; |
| case StreamKind_LENGTH: |
| return "length"; |
| case StreamKind_DICTIONARY_DATA: |
| return "dictionary"; |
| case StreamKind_DICTIONARY_COUNT: |
| return "dictionary count"; |
| case StreamKind_SECONDARY: |
| return "secondary"; |
| case StreamKind_ROW_INDEX: |
| return "index"; |
| case StreamKind_BLOOM_FILTER: |
| return "bloom"; |
| } |
| std::stringstream buffer; |
| buffer << "unknown - " << kind; |
| return buffer.str(); |
| } |
| |
| std::string columnEncodingKindToString(ColumnEncodingKind kind) { |
| switch (static_cast<int>(kind)) { |
| case ColumnEncodingKind_DIRECT: |
| return "direct"; |
| case ColumnEncodingKind_DICTIONARY: |
| return "dictionary"; |
| case ColumnEncodingKind_DIRECT_V2: |
| return "direct rle2"; |
| case ColumnEncodingKind_DICTIONARY_V2: |
| return "dictionary rle2"; |
| } |
| std::stringstream buffer; |
| buffer << "unknown - " << kind; |
| return buffer.str(); |
| } |
| |
| class StreamInformationImpl: public StreamInformation { |
| private: |
| StreamKind kind; |
| uint64_t column; |
| uint64_t offset; |
| uint64_t length; |
| public: |
| StreamInformationImpl(uint64_t _offset, |
| const proto::Stream& stream |
| ): kind(static_cast<StreamKind>(stream.kind())), |
| column(stream.column()), |
| offset(_offset), |
| length(stream.length()) { |
| // PASS |
| } |
| |
| ~StreamInformationImpl(); |
| |
| StreamKind getKind() const override { |
| return kind; |
| } |
| |
| uint64_t getColumnId() const override { |
| return column; |
| } |
| |
| uint64_t getOffset() const override { |
| return offset; |
| } |
| |
| uint64_t getLength() const override { |
| return length; |
| } |
| }; |
| |
| StreamInformationImpl::~StreamInformationImpl() { |
| // PASS |
| } |
| |
| class StripeInformationImpl : public StripeInformation { |
| uint64_t offset; |
| uint64_t indexLength; |
| uint64_t dataLength; |
| uint64_t footerLength; |
| uint64_t numRows; |
| InputStream* stream; |
| MemoryPool& memory; |
| CompressionKind compression; |
| uint64_t blockSize; |
| mutable std::unique_ptr<proto::StripeFooter> stripeFooter; |
| void ensureStripeFooterLoaded() const; |
| public: |
| |
| StripeInformationImpl(uint64_t _offset, |
| uint64_t _indexLength, |
| uint64_t _dataLength, |
| uint64_t _footerLength, |
| uint64_t _numRows, |
| InputStream* _stream, |
| MemoryPool& _memory, |
| CompressionKind _compression, |
| uint64_t _blockSize |
| ) : offset(_offset), |
| indexLength(_indexLength), |
| dataLength(_dataLength), |
| footerLength(_footerLength), |
| numRows(_numRows), |
| stream(_stream), |
| memory(_memory), |
| compression(_compression), |
| blockSize(_blockSize) { |
| // PASS |
| } |
| |
| virtual ~StripeInformationImpl() { |
| // PASS |
| } |
| |
| uint64_t getOffset() const override { |
| return offset; |
| } |
| |
| uint64_t getLength() const override { |
| return indexLength + dataLength + footerLength; |
| } |
| uint64_t getIndexLength() const override { |
| return indexLength; |
| } |
| |
| uint64_t getDataLength()const override { |
| return dataLength; |
| } |
| |
| uint64_t getFooterLength() const override { |
| return footerLength; |
| } |
| |
| uint64_t getNumberOfRows() const override { |
| return numRows; |
| } |
| |
| uint64_t getNumberOfStreams() const override { |
| ensureStripeFooterLoaded(); |
| return static_cast<uint64_t>(stripeFooter->streams_size()); |
| } |
| |
| std::unique_ptr<StreamInformation> getStreamInformation(uint64_t streamId |
| ) const override; |
| |
| ColumnEncodingKind getColumnEncoding(uint64_t colId) const override { |
| ensureStripeFooterLoaded(); |
| return static_cast<ColumnEncodingKind>(stripeFooter-> |
| columns(static_cast<int>(colId)) |
| .kind()); |
| } |
| |
| uint64_t getDictionarySize(uint64_t colId) const override { |
| ensureStripeFooterLoaded(); |
| return static_cast<ColumnEncodingKind>(stripeFooter-> |
| columns(static_cast<int>(colId)) |
| .dictionarysize()); |
| } |
| |
| const std::string& getWriterTimezone() const override { |
| ensureStripeFooterLoaded(); |
| return stripeFooter->writertimezone(); |
| } |
| }; |
| |
| void StripeInformationImpl::ensureStripeFooterLoaded() const { |
| if (stripeFooter.get() == nullptr) { |
| std::unique_ptr<SeekableInputStream> pbStream = |
| createDecompressor(compression, |
| std::unique_ptr<SeekableInputStream> |
| (new SeekableFileInputStream(stream, |
| offset + |
| indexLength + |
| dataLength, |
| footerLength, |
| memory)), |
| blockSize, |
| memory); |
| stripeFooter.reset(new proto::StripeFooter()); |
| if (!stripeFooter->ParseFromZeroCopyStream(pbStream.get())) { |
| throw ParseError("Failed to parse the stripe footer"); |
| } |
| } |
| } |
| |
| std::unique_ptr<StreamInformation> |
| StripeInformationImpl::getStreamInformation(uint64_t streamId) const { |
| ensureStripeFooterLoaded(); |
| uint64_t streamOffset = offset; |
| for(uint64_t s=0; s < streamId; ++s) { |
| streamOffset += stripeFooter->streams(static_cast<int>(s)).length(); |
| } |
| return ORC_UNIQUE_PTR<StreamInformation> |
| (new StreamInformationImpl(streamOffset, |
| stripeFooter-> |
| streams(static_cast<int>(streamId)))); |
| } |
| |
| ColumnStatistics* convertColumnStatistics(const proto::ColumnStatistics& s, |
| bool correctStats) { |
| if (s.has_intstatistics()) { |
| return new IntegerColumnStatisticsImpl(s); |
| } else if (s.has_doublestatistics()) { |
| return new DoubleColumnStatisticsImpl(s); |
| } else if (s.has_stringstatistics()) { |
| return new StringColumnStatisticsImpl(s, correctStats); |
| } else if (s.has_bucketstatistics()) { |
| return new BooleanColumnStatisticsImpl(s, correctStats); |
| } else if (s.has_decimalstatistics()) { |
| return new DecimalColumnStatisticsImpl(s, correctStats); |
| } else if (s.has_timestampstatistics()) { |
| return new TimestampColumnStatisticsImpl(s, correctStats); |
| } else if (s.has_datestatistics()) { |
| return new DateColumnStatisticsImpl(s, correctStats); |
| } else if (s.has_binarystatistics()) { |
| return new BinaryColumnStatisticsImpl(s, correctStats); |
| } else { |
| return new ColumnStatisticsImpl(s); |
| } |
| } |
| |
| Statistics::~Statistics() { |
| // PASS |
| } |
| |
| class StatisticsImpl: public Statistics { |
| private: |
| std::list<ColumnStatistics*> colStats; |
| |
| // DELIBERATELY NOT IMPLEMENTED |
| StatisticsImpl(const StatisticsImpl&); |
| StatisticsImpl& operator=(const StatisticsImpl&); |
| |
| public: |
| StatisticsImpl(const proto::StripeStatistics& stripeStats, bool correctStats) { |
| for(int i = 0; i < stripeStats.colstats_size(); i++) { |
| colStats.push_back(convertColumnStatistics |
| (stripeStats.colstats(i), correctStats)); |
| } |
| } |
| |
| StatisticsImpl(const proto::Footer& footer, bool correctStats) { |
| for(int i = 0; i < footer.statistics_size(); i++) { |
| colStats.push_back(convertColumnStatistics |
| (footer.statistics(i), correctStats)); |
| } |
| } |
| |
| virtual const ColumnStatistics* getColumnStatistics(uint32_t columnId |
| ) const override { |
| std::list<ColumnStatistics*>::const_iterator it = colStats.begin(); |
| std::advance(it, static_cast<int64_t>(columnId)); |
| return *it; |
| } |
| |
| virtual ~StatisticsImpl(); |
| |
| uint32_t getNumberOfColumns() const override { |
| return static_cast<uint32_t>(colStats.size()); |
| } |
| }; |
| |
| StatisticsImpl::~StatisticsImpl() { |
| for(std::list<ColumnStatistics*>::iterator ptr = colStats.begin(); |
| ptr != colStats.end(); |
| ++ptr) { |
| delete *ptr; |
| } |
| } |
| |
| Reader::~Reader() { |
| // PASS |
| } |
| |
| static const uint64_t DIRECTORY_SIZE_GUESS = 16 * 1024; |
| |
| class ReaderImpl : public Reader { |
| private: |
| const Timezone& localTimezone; |
| |
| // inputs |
| std::unique_ptr<InputStream> stream; |
| ReaderOptions options; |
| const uint64_t fileLength; |
| const uint64_t postscriptLength; |
| std::vector<bool> selectedColumns; |
| |
| // custom memory pool |
| MemoryPool& memoryPool; |
| |
| // postscript |
| std::unique_ptr<proto::PostScript> postscript; |
| const uint64_t blockSize; |
| const CompressionKind compression; |
| |
| // footer |
| std::unique_ptr<proto::Footer> footer; |
| DataBuffer<uint64_t> firstRowOfStripe; |
| uint64_t numberOfStripes; |
| std::unique_ptr<Type> schema; |
| mutable std::unique_ptr<Type> selectedSchema; |
| |
| // metadata |
| mutable std::unique_ptr<proto::Metadata> metadata; |
| mutable bool isMetadataLoaded; |
| |
| // reading state |
| uint64_t previousRow; |
| uint64_t firstStripe; |
| uint64_t currentStripe; |
| uint64_t lastStripe; // the stripe AFTER the last one |
| uint64_t currentRowInStripe; |
| uint64_t rowsInCurrentStripe; |
| proto::StripeInformation currentStripeInfo; |
| proto::StripeFooter currentStripeFooter; |
| std::unique_ptr<ColumnReader> reader; |
| |
| // internal methods |
| proto::StripeFooter getStripeFooter(const proto::StripeInformation& info); |
| void startNextStripe(); |
| void checkOrcVersion(); |
| void selectType(const Type& type); |
| void readMetadata() const; |
| void updateSelected(const std::list<uint64_t>& fieldIds); |
| void updateSelected(const std::list<std::string>& fieldNames); |
| |
| public: |
| /** |
| * Constructor that lets the user specify additional options. |
| * @param stream the stream to read from |
| * @param options options for reading |
| * @param postscript the postscript for the file |
| * @param footer the footer for the file |
| * @param fileLength the length of the file in bytes |
| * @param postscriptLength the length of the postscript in bytes |
| */ |
| ReaderImpl(std::unique_ptr<InputStream> stream, |
| const ReaderOptions& options, |
| std::unique_ptr<proto::PostScript> postscript, |
| std::unique_ptr<proto::Footer> footer, |
| uint64_t fileLength, |
| uint64_t postscriptLength); |
| |
| const ReaderOptions& getReaderOptions() const; |
| |
| CompressionKind getCompression() const override; |
| |
| std::string getFormatVersion() const override; |
| |
| WriterVersion getWriterVersion() const override; |
| |
| uint64_t getNumberOfRows() const override; |
| |
| uint64_t getRowIndexStride() const override; |
| |
| const std::string& getStreamName() const override; |
| |
| std::list<std::string> getMetadataKeys() const override; |
| |
| std::string getMetadataValue(const std::string& key) const override; |
| |
| bool hasMetadataValue(const std::string& key) const override; |
| |
| uint64_t getCompressionSize() const override; |
| |
| uint64_t getNumberOfStripes() const override; |
| |
| std::unique_ptr<StripeInformation> getStripe(uint64_t |
| ) const override; |
| |
| uint64_t getNumberOfStripeStatistics() const override; |
| |
| std::unique_ptr<Statistics> |
| getStripeStatistics(uint64_t stripeIndex) const override; |
| |
| |
| uint64_t getContentLength() const override; |
| uint64_t getStripeStatisticsLength() const override; |
| uint64_t getFileFooterLength() const override; |
| uint64_t getFilePostscriptLength() const override; |
| uint64_t getFileLength() const override; |
| |
| std::unique_ptr<Statistics> getStatistics() const override; |
| |
| std::unique_ptr<ColumnStatistics> getColumnStatistics(uint32_t columnId |
| ) const override; |
| |
| const Type& getType() const override; |
| |
| const Type& getSelectedType() const override; |
| |
| const std::vector<bool> getSelectedColumns() const override; |
| |
| std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size |
| ) const override; |
| |
| bool next(ColumnVectorBatch& data) override; |
| |
| uint64_t getRowNumber() const override; |
| |
| void seekToRow(uint64_t rowNumber) override; |
| |
| MemoryPool* getMemoryPool() const ; |
| |
| bool hasCorrectStatistics() const override; |
| |
| std::string getSerializedFileTail() const override; |
| |
| uint64_t getMemoryUse(int stripeIx = -1) override; |
| }; |
| |
| InputStream::~InputStream() { |
| // PASS |
| }; |
| |
| uint64_t getCompressionBlockSize(const proto::PostScript& ps) { |
| if (ps.has_compressionblocksize()) { |
| return ps.compressionblocksize(); |
| } else { |
| return 256 * 1024; |
| } |
| } |
| |
| CompressionKind convertCompressionKind(const proto::PostScript& ps) { |
| if (ps.has_compression()) { |
| return static_cast<CompressionKind>(ps.compression()); |
| } else { |
| throw ParseError("Unknown compression type"); |
| } |
| } |
| |
| ReaderImpl::ReaderImpl(std::unique_ptr<InputStream> input, |
| const ReaderOptions& opts, |
| std::unique_ptr<proto::PostScript> _postscript, |
| std::unique_ptr<proto::Footer> _footer, |
| uint64_t _fileLength, |
| uint64_t _postscriptLength |
| ): localTimezone(getLocalTimezone()), |
| stream(std::move(input)), |
| options(opts), |
| fileLength(_fileLength), |
| postscriptLength(_postscriptLength), |
| memoryPool(*opts.getMemoryPool()), |
| postscript(std::move(_postscript)), |
| blockSize(getCompressionBlockSize(*postscript)), |
| compression(convertCompressionKind(*postscript)), |
| footer(std::move(_footer)), |
| firstRowOfStripe(memoryPool, 0) { |
| isMetadataLoaded = false; |
| checkOrcVersion(); |
| numberOfStripes = static_cast<uint64_t>(footer->stripes_size()); |
| currentStripe = static_cast<uint64_t>(footer->stripes_size()); |
| lastStripe = 0; |
| currentRowInStripe = 0; |
| uint64_t rowTotal = 0; |
| |
| firstRowOfStripe.resize(static_cast<uint64_t>(footer->stripes_size())); |
| for(size_t i=0; i < static_cast<size_t>(footer->stripes_size()); ++i) { |
| firstRowOfStripe[i] = rowTotal; |
| proto::StripeInformation stripeInfo = |
| footer->stripes(static_cast<int>(i)); |
| rowTotal += stripeInfo.numberofrows(); |
| bool isStripeInRange = stripeInfo.offset() >= opts.getOffset() && |
| stripeInfo.offset() < opts.getOffset() + opts.getLength(); |
| if (isStripeInRange) { |
| if (i < currentStripe) { |
| currentStripe = i; |
| } |
| if (i >= lastStripe) { |
| lastStripe = i + 1; |
| } |
| } |
| } |
| firstStripe = currentStripe; |
| |
| if (currentStripe == 0) { |
| previousRow = (std::numeric_limits<uint64_t>::max)(); |
| } else if (currentStripe == |
| static_cast<uint64_t>(footer->stripes_size())) { |
| previousRow = footer->numberofrows(); |
| } else { |
| previousRow = firstRowOfStripe[firstStripe]-1; |
| } |
| |
| schema = convertType(footer->types(0), *footer); |
| |
| selectedColumns.assign(static_cast<size_t>(footer->types_size()), false); |
| if (schema->getKind() == STRUCT && options.getIndexesSet()) { |
| updateSelected(options.getInclude()); |
| } else if (schema->getKind() == STRUCT && options.getNamesSet()) { |
| updateSelected(options.getIncludeNames()); |
| } else { |
| std::fill(selectedColumns.begin(), selectedColumns.end(), true); |
| } |
| selectedColumns[0] = true; |
| } |
| |
| void ReaderImpl::selectType(const Type& type) { |
| if (!selectedColumns[static_cast<size_t>(type.getColumnId())]) { |
| selectedColumns[static_cast<size_t>(type.getColumnId())] = true; |
| for (uint64_t i=0; i < type.getSubtypeCount(); i++) { |
| selectType(*type.getSubtype(i)); |
| } |
| } |
| } |
| |
| std::string ReaderImpl::getSerializedFileTail() const { |
| proto::FileTail tail; |
| proto::PostScript *mutable_ps = tail.mutable_postscript(); |
| mutable_ps->CopyFrom(*postscript); |
| proto::Footer *mutableFooter = tail.mutable_footer(); |
| mutableFooter->CopyFrom(*footer); |
| tail.set_filelength(fileLength); |
| tail.set_postscriptlength(postscriptLength); |
| std::string result; |
| if (!tail.SerializeToString(&result)) { |
| throw ParseError("Failed to serialize file tail"); |
| } |
| return result; |
| } |
| |
| const ReaderOptions& ReaderImpl::getReaderOptions() const { |
| return options; |
| } |
| |
| CompressionKind ReaderImpl::getCompression() const { |
| return compression; |
| } |
| |
| uint64_t ReaderImpl::getCompressionSize() const { |
| return blockSize; |
| } |
| |
| uint64_t ReaderImpl::getNumberOfStripes() const { |
| return numberOfStripes; |
| } |
| |
| uint64_t ReaderImpl::getNumberOfStripeStatistics() const { |
| if (!isMetadataLoaded) { |
| readMetadata(); |
| } |
| return metadata.get() == nullptr ? 0 : |
| static_cast<uint64_t>(metadata->stripestats_size()); |
| } |
| |
| std::unique_ptr<StripeInformation> |
| ReaderImpl::getStripe(uint64_t stripeIndex) const { |
| if (stripeIndex > getNumberOfStripes()) { |
| throw std::logic_error("stripe index out of range"); |
| } |
| proto::StripeInformation stripeInfo = |
| footer->stripes(static_cast<int>(stripeIndex)); |
| |
| return std::unique_ptr<StripeInformation> |
| (new StripeInformationImpl |
| (stripeInfo.offset(), |
| stripeInfo.indexlength(), |
| stripeInfo.datalength(), |
| stripeInfo.footerlength(), |
| stripeInfo.numberofrows(), |
| stream.get(), |
| memoryPool, |
| compression, |
| blockSize)); |
| } |
| |
| std::string ReaderImpl::getFormatVersion() const { |
| std::stringstream result; |
| for(int i=0; i < postscript->version_size(); ++i) { |
| if (i != 0) { |
| result << "."; |
| } |
| result << postscript->version(i); |
| } |
| return result.str(); |
| } |
| |
| uint64_t ReaderImpl::getNumberOfRows() const { |
| return footer->numberofrows(); |
| } |
| |
| WriterVersion ReaderImpl::getWriterVersion() const { |
| if (!postscript->has_writerversion()) { |
| return WriterVersion_ORIGINAL; |
| } |
| return static_cast<WriterVersion>(postscript->writerversion()); |
| } |
| |
| uint64_t ReaderImpl::getContentLength() const { |
| return footer->contentlength(); |
| } |
| |
| uint64_t ReaderImpl::getStripeStatisticsLength() const { |
| return postscript->metadatalength(); |
| } |
| |
| uint64_t ReaderImpl::getFileFooterLength() const { |
| return postscript->footerlength(); |
| } |
| |
| uint64_t ReaderImpl::getFilePostscriptLength() const { |
| return postscriptLength; |
| } |
| |
| uint64_t ReaderImpl::getFileLength() const { |
| return fileLength; |
| } |
| |
| uint64_t ReaderImpl::getRowIndexStride() const { |
| return footer->rowindexstride(); |
| } |
| |
| const std::string& ReaderImpl::getStreamName() const { |
| return stream->getName(); |
| } |
| |
| std::list<std::string> ReaderImpl::getMetadataKeys() const { |
| std::list<std::string> result; |
| for(int i=0; i < footer->metadata_size(); ++i) { |
| result.push_back(footer->metadata(i).name()); |
| } |
| return result; |
| } |
| |
| std::string ReaderImpl::getMetadataValue(const std::string& key) const { |
| for(int i=0; i < footer->metadata_size(); ++i) { |
| if (footer->metadata(i).name() == key) { |
| return footer->metadata(i).value(); |
| } |
| } |
| throw std::range_error("key not found"); |
| } |
| |
| bool ReaderImpl::hasMetadataValue(const std::string& key) const { |
| for(int i=0; i < footer->metadata_size(); ++i) { |
| if (footer->metadata(i).name() == key) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| const std::vector<bool> ReaderImpl::getSelectedColumns() const { |
| return selectedColumns; |
| } |
| |
| const Type& ReaderImpl::getType() const { |
| return *(schema.get()); |
| } |
| |
| const Type& ReaderImpl::getSelectedType() const { |
| if (selectedSchema.get() == nullptr) { |
| selectedSchema = buildSelectedType(schema.get(), |
| selectedColumns); |
| } |
| return *(selectedSchema.get()); |
| } |
| |
| uint64_t ReaderImpl::getRowNumber() const { |
| return previousRow; |
| } |
| |
| std::unique_ptr<Statistics> ReaderImpl::getStatistics() const { |
| return std::unique_ptr<Statistics> |
| (new StatisticsImpl(*footer, |
| hasCorrectStatistics())); |
| } |
| |
| std::unique_ptr<ColumnStatistics> |
| ReaderImpl::getColumnStatistics(uint32_t index) const { |
| if (index >= static_cast<uint64_t>(footer->statistics_size())) { |
| throw std::logic_error("column index out of range"); |
| } |
| proto::ColumnStatistics col = |
| footer->statistics(static_cast<int32_t>(index)); |
| return std::unique_ptr<ColumnStatistics> (convertColumnStatistics |
| (col, hasCorrectStatistics())); |
| } |
| |
| void ReaderImpl::readMetadata() const { |
| uint64_t metadataSize = postscript->metadatalength(); |
| uint64_t metadataStart = fileLength - metadataSize |
| - postscript->footerlength() - postscriptLength - 1; |
| if (metadataSize != 0) { |
| std::unique_ptr<SeekableInputStream> pbStream = |
| createDecompressor(compression, |
| std::unique_ptr<SeekableInputStream> |
| (new SeekableFileInputStream(stream.get(), |
| metadataStart, |
| metadataSize, |
| memoryPool)), |
| blockSize, |
| memoryPool); |
| metadata.reset(new proto::Metadata()); |
| if (!metadata->ParseFromZeroCopyStream(pbStream.get())) { |
| throw ParseError("Failed to parse the metadata"); |
| } |
| } |
| isMetadataLoaded = true; |
| } |
| |
| std::unique_ptr<Statistics> |
| ReaderImpl::getStripeStatistics(uint64_t stripeIndex) const { |
| if (!isMetadataLoaded) { |
| readMetadata(); |
| } |
| if (metadata.get() == nullptr) { |
| throw std::logic_error("No stripe statistics in file"); |
| } |
| return std::unique_ptr<Statistics> |
| (new StatisticsImpl(metadata->stripestats |
| (static_cast<int>(stripeIndex)), |
| hasCorrectStatistics())); |
| } |
| |
| |
| void ReaderImpl::seekToRow(uint64_t rowNumber) { |
| // Empty file |
| if (lastStripe == 0) { |
| return; |
| } |
| |
| // If we are reading only a portion of the file |
| // (bounded by firstStripe and lastStripe), |
| // seeking before or after the portion of interest should return no data. |
| // Implement this by setting previousRow to the number of rows in the file. |
| |
| // seeking past lastStripe |
| if ( (lastStripe == static_cast<uint64_t>(footer->stripes_size()) |
| && rowNumber >= footer->numberofrows()) || |
| (lastStripe < static_cast<uint64_t>(footer->stripes_size()) |
| && rowNumber >= firstRowOfStripe[lastStripe]) ) { |
| currentStripe = static_cast<uint64_t>(footer->stripes_size()); |
| previousRow = footer->numberofrows(); |
| return; |
| } |
| |
| uint64_t seekToStripe = 0; |
| while (seekToStripe+1 < lastStripe && |
| firstRowOfStripe[seekToStripe+1] <= rowNumber) { |
| seekToStripe++; |
| } |
| |
| // seeking before the first stripe |
| if (seekToStripe < firstStripe) { |
| currentStripe = static_cast<uint64_t>(footer->stripes_size()); |
| previousRow = footer->numberofrows(); |
| return; |
| } |
| |
| currentStripe = seekToStripe; |
| currentRowInStripe = rowNumber - firstRowOfStripe[currentStripe]; |
| previousRow = rowNumber; |
| startNextStripe(); |
| reader->skip(currentRowInStripe); |
| } |
| |
| bool ReaderImpl::hasCorrectStatistics() const { |
| return getWriterVersion() != WriterVersion_ORIGINAL; |
| } |
| |
| proto::StripeFooter ReaderImpl::getStripeFooter |
| (const proto::StripeInformation& info) { |
| uint64_t stripeFooterStart = info.offset() + info.indexlength() + |
| info.datalength(); |
| uint64_t stripeFooterLength = info.footerlength(); |
| std::unique_ptr<SeekableInputStream> pbStream = |
| createDecompressor(compression, |
| std::unique_ptr<SeekableInputStream> |
| (new SeekableFileInputStream(stream.get(), |
| stripeFooterStart, |
| stripeFooterLength, |
| memoryPool)), |
| blockSize, |
| memoryPool); |
| proto::StripeFooter result; |
| if (!result.ParseFromZeroCopyStream(pbStream.get())) { |
| throw ParseError(std::string("bad StripeFooter from ") + |
| pbStream->getName()); |
| } |
| return result; |
| } |
| |
| class StripeStreamsImpl: public StripeStreams { |
| private: |
| const ReaderImpl& reader; |
| const proto::StripeFooter& footer; |
| const uint64_t stripeStart; |
| InputStream& input; |
| MemoryPool& memoryPool; |
| const Timezone& writerTimezone; |
| |
| public: |
| StripeStreamsImpl(const ReaderImpl& reader, |
| const proto::StripeFooter& footer, |
| uint64_t stripeStart, |
| InputStream& input, |
| MemoryPool& memoryPool, |
| const Timezone& writerTimezone); |
| |
| virtual ~StripeStreamsImpl(); |
| |
| virtual const ReaderOptions& getReaderOptions() const override; |
| |
| virtual const std::vector<bool> getSelectedColumns() const override; |
| |
| virtual proto::ColumnEncoding getEncoding(uint64_t columnId |
| ) const override; |
| |
| virtual std::unique_ptr<SeekableInputStream> |
| getStream(uint64_t columnId, |
| proto::Stream_Kind kind, |
| bool shouldStream) const override; |
| |
| MemoryPool& getMemoryPool() const override; |
| |
| const Timezone& getWriterTimezone() const override; |
| }; |
| |
| uint64_t maxStreamsForType(const proto::Type& type) { |
| switch (static_cast<int64_t>(type.kind())) { |
| case proto::Type_Kind_STRUCT: |
| return 1; |
| case proto::Type_Kind_INT: |
| case proto::Type_Kind_LONG: |
| case proto::Type_Kind_SHORT: |
| case proto::Type_Kind_FLOAT: |
| case proto::Type_Kind_DOUBLE: |
| case proto::Type_Kind_BOOLEAN: |
| case proto::Type_Kind_BYTE: |
| case proto::Type_Kind_DATE: |
| case proto::Type_Kind_LIST: |
| case proto::Type_Kind_MAP: |
| case proto::Type_Kind_UNION: |
| return 2; |
| case proto::Type_Kind_BINARY: |
| case proto::Type_Kind_DECIMAL: |
| case proto::Type_Kind_TIMESTAMP: |
| return 3; |
| case proto::Type_Kind_CHAR: |
| case proto::Type_Kind_STRING: |
| case proto::Type_Kind_VARCHAR: |
| return 4; |
| default: |
| return 0; |
| } |
| } |
| |
| uint64_t ReaderImpl::getMemoryUse(int stripeIx) { |
| uint64_t maxDataLength = 0; |
| |
| if (stripeIx >= 0 && stripeIx < footer->stripes_size()) { |
| uint64_t stripe = footer->stripes(stripeIx).datalength(); |
| if (maxDataLength < stripe) { |
| maxDataLength = stripe; |
| } |
| } else { |
| for (int i=0; i < footer->stripes_size(); i++) { |
| uint64_t stripe = footer->stripes(i).datalength(); |
| if (maxDataLength < stripe) { |
| maxDataLength = stripe; |
| } |
| } |
| } |
| |
| bool hasStringColumn = false; |
| uint64_t nSelectedStreams = 0; |
| for (int i=0; !hasStringColumn && i < footer->types_size(); i++) { |
| if (selectedColumns[static_cast<size_t>(i)]) { |
| const proto::Type& type = footer->types(i); |
| nSelectedStreams += maxStreamsForType(type) ; |
| switch (static_cast<int64_t>(type.kind())) { |
| case proto::Type_Kind_CHAR: |
| case proto::Type_Kind_STRING: |
| case proto::Type_Kind_VARCHAR: |
| case proto::Type_Kind_BINARY: { |
| hasStringColumn = true; |
| break; |
| } |
| default: { |
| break; |
| } |
| } |
| } |
| } |
| |
| /* If a string column is read, use stripe datalength as a memory estimate |
| * because we don't know the dictionary size. Multiply by 2 because |
| * a string column requires two buffers: |
| * in the input stream and in the seekable input stream. |
| * If no string column is read, estimate from the number of streams. |
| */ |
| uint64_t memory = hasStringColumn ? 2 * maxDataLength : |
| std::min(uint64_t(maxDataLength), |
| nSelectedStreams * stream->getNaturalReadSize()); |
| |
| // Do we need even more memory to read the footer or the metadata? |
| if (memory < postscript->footerlength() + DIRECTORY_SIZE_GUESS) { |
| memory = postscript->footerlength() + DIRECTORY_SIZE_GUESS; |
| } |
| if (memory < postscript->metadatalength()) { |
| memory = postscript->metadatalength(); |
| } |
| |
| // Account for firstRowOfStripe. |
| memory += firstRowOfStripe.capacity() * sizeof(uint64_t); |
| |
| // Decompressors need buffers for each stream |
| uint64_t decompressorMemory = 0; |
| if (compression != CompressionKind_NONE) { |
| for (int i=0; i < footer->types_size(); i++) { |
| if (selectedColumns[static_cast<size_t>(i)]) { |
| const proto::Type& type = footer->types(i); |
| decompressorMemory += maxStreamsForType(type) * blockSize; |
| } |
| } |
| if (compression == CompressionKind_SNAPPY) { |
| decompressorMemory *= 2; // Snappy decompressor uses a second buffer |
| } |
| } |
| |
| return memory + decompressorMemory ; |
| } |
| |
| StripeStreamsImpl::StripeStreamsImpl(const ReaderImpl& _reader, |
| const proto::StripeFooter& _footer, |
| uint64_t _stripeStart, |
| InputStream& _input, |
| MemoryPool& _memoryPool, |
| const Timezone& _writerTimezone |
| ): reader(_reader), |
| footer(_footer), |
| stripeStart(_stripeStart), |
| input(_input), |
| memoryPool(_memoryPool), |
| writerTimezone(_writerTimezone) { |
| // PASS |
| } |
| |
| StripeStreamsImpl::~StripeStreamsImpl() { |
| // PASS |
| } |
| |
| const ReaderOptions& StripeStreamsImpl::getReaderOptions() const { |
| return reader.getReaderOptions(); |
| } |
| |
| const std::vector<bool> StripeStreamsImpl::getSelectedColumns() const { |
| return reader.getSelectedColumns(); |
| } |
| |
| proto::ColumnEncoding StripeStreamsImpl::getEncoding(uint64_t columnId |
| ) const { |
| return footer.columns(static_cast<int>(columnId)); |
| } |
| |
| const Timezone& StripeStreamsImpl::getWriterTimezone() const { |
| return writerTimezone; |
| } |
| |
| std::unique_ptr<SeekableInputStream> |
| StripeStreamsImpl::getStream(uint64_t columnId, |
| proto::Stream_Kind kind, |
| bool shouldStream) const { |
| uint64_t offset = stripeStart; |
| for(int i = 0; i < footer.streams_size(); ++i) { |
| const proto::Stream& stream = footer.streams(i); |
| if (stream.has_kind() && |
| stream.kind() == kind && |
| stream.column() == static_cast<uint64_t>(columnId)) { |
| uint64_t myBlock = shouldStream ? input.getNaturalReadSize(): |
| stream.length(); |
| return createDecompressor(reader.getCompression(), |
| std::unique_ptr<SeekableInputStream> |
| (new SeekableFileInputStream |
| (&input, |
| offset, |
| stream.length(), |
| memoryPool, |
| myBlock)), |
| reader.getCompressionSize(), |
| memoryPool); |
| } |
| offset += stream.length(); |
| } |
| return std::unique_ptr<SeekableInputStream>(); |
| } |
| |
| MemoryPool& StripeStreamsImpl::getMemoryPool() const { |
| return memoryPool; |
| } |
| |
| void ReaderImpl::startNextStripe() { |
| reader.reset(); // ColumnReaders use lots of memory; free old memory first |
| currentStripeInfo = footer->stripes(static_cast<int>(currentStripe)); |
| currentStripeFooter = getStripeFooter(currentStripeInfo); |
| rowsInCurrentStripe = currentStripeInfo.numberofrows(); |
| const Timezone& writerTimezone = |
| currentStripeFooter.has_writertimezone() ? |
| getTimezoneByName(currentStripeFooter.writertimezone()) : |
| localTimezone; |
| StripeStreamsImpl stripeStreams(*this, currentStripeFooter, |
| currentStripeInfo.offset(), |
| *(stream.get()), |
| memoryPool, |
| writerTimezone); |
| reader = buildReader(*(schema.get()), stripeStreams); |
| } |
| |
| void ReaderImpl::checkOrcVersion() { |
| std::string version = getFormatVersion(); |
| if (version != "0.11" && version != "0.12") { |
| *(options.getErrorStream()) |
| << "Warning: ORC file " << stream->getName() |
| << " was written in an unknown format version " |
| << version << "\n"; |
| } |
| } |
| |
| bool ReaderImpl::next(ColumnVectorBatch& data) { |
| if (currentStripe >= lastStripe) { |
| data.numElements = 0; |
| if (lastStripe > 0) { |
| previousRow = firstRowOfStripe[lastStripe - 1] + |
| footer->stripes(static_cast<int>(lastStripe - 1)).numberofrows(); |
| } else { |
| previousRow = 0; |
| } |
| return false; |
| } |
| if (currentRowInStripe == 0) { |
| startNextStripe(); |
| } |
| uint64_t rowsToRead = |
| std::min(static_cast<uint64_t>(data.capacity), |
| rowsInCurrentStripe - currentRowInStripe); |
| data.numElements = rowsToRead; |
| reader->next(data, rowsToRead, 0); |
| // update row number |
| previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe; |
| currentRowInStripe += rowsToRead; |
| if (currentRowInStripe >= rowsInCurrentStripe) { |
| currentStripe += 1; |
| currentRowInStripe = 0; |
| } |
| return rowsToRead != 0; |
| } |
| |
| std::unique_ptr<ColumnVectorBatch> ReaderImpl::createRowBatch |
| (uint64_t capacity) const { |
| return getSelectedType().createRowBatch(capacity, memoryPool); |
| } |
| |
| void ensureOrcFooter(InputStream* stream, |
| DataBuffer<char> *buffer, |
| uint64_t postscriptLength) { |
| |
| const std::string MAGIC("ORC"); |
| const uint64_t magicLength = MAGIC.length(); |
| const char * const bufferStart = buffer->data(); |
| const uint64_t bufferLength = buffer->size(); |
| |
| if (postscriptLength < magicLength || bufferLength < magicLength) { |
| throw ParseError("Invalid ORC postscript length"); |
| } |
| const char* magicStart = bufferStart + bufferLength - 1 - magicLength; |
| |
| // Look for the magic string at the end of the postscript. |
| if (memcmp(magicStart, MAGIC.c_str(), magicLength) != 0) { |
| // If there is no magic string at the end, check the beginning. |
| // Only files written by Hive 0.11.0 don't have the tail ORC string. |
| char *frontBuffer = new char[magicLength]; |
| stream->read(frontBuffer, magicLength, 0); |
| bool foundMatch = memcmp(frontBuffer, MAGIC.c_str(), magicLength) == 0; |
| delete[] frontBuffer; |
| if (!foundMatch) { |
| throw ParseError("Not an ORC file"); |
| } |
| } |
| } |
| |
| /** |
| * Read the file's postscript from the given buffer. |
| * @param stream the file stream |
| * @param buffer the buffer with the tail of the file. |
| * @param postscriptSize the length of postscript in bytes |
| */ |
| std::unique_ptr<proto::PostScript> readPostscript(InputStream *stream, |
| DataBuffer<char> *buffer, |
| uint64_t postscriptSize) { |
| char *ptr = buffer->data(); |
| uint64_t readSize = buffer->size(); |
| |
| ensureOrcFooter(stream, buffer, postscriptSize); |
| |
| std::unique_ptr<proto::PostScript> postscript = |
| std::unique_ptr<proto::PostScript>(new proto::PostScript()); |
| if (!postscript->ParseFromArray(ptr + readSize - 1 - postscriptSize, |
| static_cast<int>(postscriptSize))) { |
| throw ParseError("Failed to parse the postscript from " + |
| stream->getName()); |
| } |
| return REDUNDANT_MOVE(postscript); |
| } |
| |
| /** |
| * Parse the footer from the given buffer. |
| * @param stream the file's stream |
| * @param buffer the buffer to parse the footer from |
| * @param footerOffset the offset within the buffer that contains the footer |
| * @param ps the file's postscript |
| * @param memoryPool the memory pool to use |
| */ |
| std::unique_ptr<proto::Footer> readFooter(InputStream* stream, |
| DataBuffer<char> *&buffer, |
| uint64_t footerOffset, |
| const proto::PostScript& ps, |
| MemoryPool& memoryPool) { |
| char *footerPtr = buffer->data() + footerOffset; |
| |
| std::unique_ptr<SeekableInputStream> pbStream = |
| createDecompressor(convertCompressionKind(ps), |
| std::unique_ptr<SeekableInputStream> |
| (new SeekableArrayInputStream(footerPtr, |
| ps.footerlength())), |
| getCompressionBlockSize(ps), |
| memoryPool); |
| |
| std::unique_ptr<proto::Footer> footer = |
| std::unique_ptr<proto::Footer>(new proto::Footer()); |
| if (!footer->ParseFromZeroCopyStream(pbStream.get())) { |
| throw ParseError("Failed to parse the footer from " + |
| stream->getName()); |
| } |
| return REDUNDANT_MOVE(footer); |
| } |
| |
| std::unique_ptr<Reader> createReader(std::unique_ptr<InputStream> stream, |
| const ReaderOptions& options) { |
| MemoryPool *memoryPool = options.getMemoryPool(); |
| std::unique_ptr<proto::PostScript> ps; |
| std::unique_ptr<proto::Footer> footer; |
| std::string serializedFooter = options.getSerializedFileTail(); |
| uint64_t fileLength; |
| uint64_t postscriptLength; |
| if (serializedFooter.length() != 0) { |
| // Parse the file tail from the serialized one. |
| proto::FileTail tail; |
| if (!tail.ParseFromString(serializedFooter)) { |
| throw ParseError("Failed to parse the file tail from string"); |
| } |
| ps.reset(new proto::PostScript(tail.postscript())); |
| footer.reset(new proto::Footer(tail.footer())); |
| fileLength = tail.filelength(); |
| postscriptLength = tail.postscriptlength(); |
| } else { |
| // figure out the size of the file using the option or filesystem |
| fileLength = std::min(options.getTailLocation(), |
| static_cast<uint64_t>(stream->getLength())); |
| |
| //read last bytes into buffer to get PostScript |
| uint64_t readSize = std::min(fileLength, DIRECTORY_SIZE_GUESS); |
| if (readSize < 4) { |
| throw ParseError("File size too small"); |
| } |
| DataBuffer<char> *buffer = new DataBuffer<char>(*memoryPool, readSize); |
| stream->read(buffer->data(), readSize, fileLength - readSize); |
| |
| postscriptLength = buffer->data()[readSize - 1] & 0xff; |
| ps = readPostscript(stream.get(), buffer, postscriptLength); |
| uint64_t footerSize = ps->footerlength(); |
| uint64_t tailSize = 1 + postscriptLength + footerSize; |
| uint64_t footerOffset; |
| |
| if (tailSize > readSize) { |
| buffer->resize(footerSize); |
| stream->read(buffer->data(), footerSize, fileLength - tailSize); |
| footerOffset = 0; |
| } else { |
| footerOffset = readSize - tailSize; |
| } |
| |
| footer = readFooter(stream.get(), buffer, footerOffset, *ps, |
| *memoryPool); |
| delete buffer; |
| } |
| return std::unique_ptr<Reader>(new ReaderImpl(std::move(stream), |
| options, |
| std::move(ps), |
| std::move(footer), |
| fileLength, |
| postscriptLength)); |
| } |
| |
| ColumnStatistics::~ColumnStatistics() { |
| // PASS |
| } |
| |
| BinaryColumnStatistics::~BinaryColumnStatistics() { |
| // PASS |
| } |
| |
| BooleanColumnStatistics::~BooleanColumnStatistics() { |
| // PASS |
| } |
| |
| DateColumnStatistics::~DateColumnStatistics() { |
| // PASS |
| } |
| |
| DecimalColumnStatistics::~DecimalColumnStatistics() { |
| // PASS |
| } |
| |
| DoubleColumnStatistics::~DoubleColumnStatistics() { |
| // PASS |
| } |
| |
| IntegerColumnStatistics::~IntegerColumnStatistics() { |
| // PASS |
| } |
| |
| StringColumnStatistics::~StringColumnStatistics() { |
| // PASS |
| } |
| |
| TimestampColumnStatistics::~TimestampColumnStatistics() { |
| // PASS |
| } |
| |
| ColumnStatisticsImpl::~ColumnStatisticsImpl() { |
| // PASS |
| } |
| |
| BinaryColumnStatisticsImpl::~BinaryColumnStatisticsImpl() { |
| // PASS |
| } |
| |
| BooleanColumnStatisticsImpl::~BooleanColumnStatisticsImpl() { |
| // PASS |
| } |
| |
| DateColumnStatisticsImpl::~DateColumnStatisticsImpl() { |
| // PASS |
| } |
| |
| DecimalColumnStatisticsImpl::~DecimalColumnStatisticsImpl() { |
| // PASS |
| } |
| |
| DoubleColumnStatisticsImpl::~DoubleColumnStatisticsImpl() { |
| // PASS |
| } |
| |
| IntegerColumnStatisticsImpl::~IntegerColumnStatisticsImpl() { |
| // PASS |
| } |
| |
| StringColumnStatisticsImpl::~StringColumnStatisticsImpl() { |
| // PASS |
| } |
| |
| TimestampColumnStatisticsImpl::~TimestampColumnStatisticsImpl() { |
| // PASS |
| } |
| |
| ColumnStatisticsImpl::ColumnStatisticsImpl |
| (const proto::ColumnStatistics& pb) { |
| valueCount = pb.numberofvalues(); |
| } |
| |
| BinaryColumnStatisticsImpl::BinaryColumnStatisticsImpl |
| (const proto::ColumnStatistics& pb, bool correctStats){ |
| valueCount = pb.numberofvalues(); |
| if (!pb.has_binarystatistics() || !correctStats) { |
| _hasTotalLength = false; |
| |
| totalLength = 0; |
| }else{ |
| _hasTotalLength = pb.binarystatistics().has_sum(); |
| totalLength = static_cast<uint64_t>(pb.binarystatistics().sum()); |
| } |
| } |
| |
| BooleanColumnStatisticsImpl::BooleanColumnStatisticsImpl |
| (const proto::ColumnStatistics& pb, bool correctStats){ |
| valueCount = pb.numberofvalues(); |
| if (!pb.has_bucketstatistics() || !correctStats) { |
| _hasCount = false; |
| trueCount = 0; |
| }else{ |
| _hasCount = true; |
| trueCount = pb.bucketstatistics().count(0); |
| } |
| } |
| |
| DateColumnStatisticsImpl::DateColumnStatisticsImpl |
| (const proto::ColumnStatistics& pb, bool correctStats){ |
| valueCount = pb.numberofvalues(); |
| if (!pb.has_datestatistics() || !correctStats) { |
| _hasMinimum = false; |
| _hasMaximum = false; |
| |
| minimum = 0; |
| maximum = 0; |
| } else { |
| _hasMinimum = pb.datestatistics().has_minimum(); |
| _hasMaximum = pb.datestatistics().has_maximum(); |
| minimum = pb.datestatistics().minimum(); |
| maximum = pb.datestatistics().maximum(); |
| } |
| } |
| |
| DecimalColumnStatisticsImpl::DecimalColumnStatisticsImpl |
| (const proto::ColumnStatistics& pb, bool correctStats){ |
| valueCount = pb.numberofvalues(); |
| if (!pb.has_decimalstatistics() || !correctStats) { |
| _hasMinimum = false; |
| _hasMaximum = false; |
| _hasSum = false; |
| }else{ |
| const proto::DecimalStatistics& stats = pb.decimalstatistics(); |
| _hasMinimum = stats.has_minimum(); |
| _hasMaximum = stats.has_maximum(); |
| _hasSum = stats.has_sum(); |
| |
| minimum = stats.minimum(); |
| maximum = stats.maximum(); |
| sum = stats.sum(); |
| } |
| } |
| |
| DoubleColumnStatisticsImpl::DoubleColumnStatisticsImpl |
| (const proto::ColumnStatistics& pb){ |
| valueCount = pb.numberofvalues(); |
| if (!pb.has_doublestatistics()) { |
| _hasMinimum = false; |
| _hasMaximum = false; |
| _hasSum = false; |
| |
| minimum = 0; |
| maximum = 0; |
| sum = 0; |
| }else{ |
| const proto::DoubleStatistics& stats = pb.doublestatistics(); |
| _hasMinimum = stats.has_minimum(); |
| _hasMaximum = stats.has_maximum(); |
| _hasSum = stats.has_sum(); |
| |
| minimum = stats.minimum(); |
| maximum = stats.maximum(); |
| sum = stats.sum(); |
| } |
| } |
| |
| IntegerColumnStatisticsImpl::IntegerColumnStatisticsImpl |
| (const proto::ColumnStatistics& pb){ |
| valueCount = pb.numberofvalues(); |
| if (!pb.has_intstatistics()) { |
| _hasMinimum = false; |
| _hasMaximum = false; |
| _hasSum = false; |
| |
| minimum = 0; |
| maximum = 0; |
| sum = 0; |
| }else{ |
| const proto::IntegerStatistics& stats = pb.intstatistics(); |
| _hasMinimum = stats.has_minimum(); |
| _hasMaximum = stats.has_maximum(); |
| _hasSum = stats.has_sum(); |
| |
| minimum = stats.minimum(); |
| maximum = stats.maximum(); |
| sum = stats.sum(); |
| } |
| } |
| |
| StringColumnStatisticsImpl::StringColumnStatisticsImpl |
| (const proto::ColumnStatistics& pb, bool correctStats){ |
| valueCount = pb.numberofvalues(); |
| if (!pb.has_stringstatistics() || !correctStats) { |
| _hasMinimum = false; |
| _hasMaximum = false; |
| _hasTotalLength = false; |
| |
| totalLength = 0; |
| }else{ |
| const proto::StringStatistics& stats = pb.stringstatistics(); |
| _hasMinimum = stats.has_minimum(); |
| _hasMaximum = stats.has_maximum(); |
| _hasTotalLength = stats.has_sum(); |
| |
| minimum = stats.minimum(); |
| maximum = stats.maximum(); |
| totalLength = static_cast<uint64_t>(stats.sum()); |
| } |
| } |
| |
| TimestampColumnStatisticsImpl::TimestampColumnStatisticsImpl |
| (const proto::ColumnStatistics& pb, bool correctStats) { |
| valueCount = pb.numberofvalues(); |
| if (!pb.has_timestampstatistics() || !correctStats) { |
| _hasMinimum = false; |
| _hasMaximum = false; |
| minimum = 0; |
| maximum = 0; |
| }else{ |
| const proto::TimestampStatistics& stats = pb.timestampstatistics(); |
| _hasMinimum = stats.has_minimum(); |
| _hasMaximum = stats.has_maximum(); |
| |
| minimum = stats.minimum(); |
| maximum = stats.maximum(); |
| } |
| } |
| |
| void ReaderImpl::updateSelected(const std::list<uint64_t>& fieldIds) { |
| uint64_t childCount = schema->getSubtypeCount(); |
| for(std::list<uint64_t>::const_iterator i = fieldIds.begin(); |
| i != fieldIds.end(); ++i) { |
| if (*i >= childCount) { |
| std::stringstream buffer; |
| buffer << "Invalid column selected " << *i << " out of " |
| << childCount; |
| throw ParseError(buffer.str()); |
| } |
| const Type& child = *schema->getSubtype(*i); |
| for(size_t c = child.getColumnId(); |
| c <= child.getMaximumColumnId(); ++c){ |
| selectedColumns[c] = true; |
| } |
| } |
| } |
| |
| void ReaderImpl::updateSelected(const std::list<std::string>& fieldNames) { |
| uint64_t childCount = schema->getSubtypeCount(); |
| for(std::list<std::string>::const_iterator i = fieldNames.begin(); |
| i != fieldNames.end(); ++i) { |
| bool foundMatch = false; |
| for(size_t field=0; field < childCount; ++field) { |
| if (schema->getFieldName(field) == *i) { |
| const Type& child = *schema->getSubtype(field); |
| for(size_t c = child.getColumnId(); |
| c <= child.getMaximumColumnId(); ++c){ |
| selectedColumns[c] = true; |
| } |
| foundMatch = true; |
| break; |
| } |
| } |
| if (!foundMatch) { |
| throw ParseError("Invalid column selected " + *i); |
| } |
| } |
| } |
| }// namespace |