| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #ifndef STORAGE_SRC_STORAGE_FORMAT_ORC_READER_H_ |
| #define STORAGE_SRC_STORAGE_FORMAT_ORC_READER_H_ |
| |
| #include <algorithm> |
| #include <list> |
| #include <map> |
| #include <memory> |
| #include <string> |
| #include <vector> |
| |
| #include "univplan/common/statistics.h" |
| |
| #include "storage/common/bloom-filter.h" |
| #include "storage/format/orc/byte-rle.h" |
| #include "storage/format/orc/orc-proto-definition.h" |
| #include "storage/format/orc/reader.h" |
| #include "storage/format/orc/rle.h" |
| #include "storage/format/orc/seekable-input-stream.h" |
| #include "storage/format/orc/timezone.h" |
| #include "storage/format/orc/type.h" |
| #include "storage/format/orc/vector.h" |
| |
| #include "storage/format/orc/orc_proto.pb.h" |
| |
| namespace orc { |
| |
| // classes that hold data members so we can maintain binary compatibility |
| struct ReaderOptionsPrivate; |
| |
| // Options for creating a Reader. |
| class ReaderOptions { |
| private: |
| std::unique_ptr<ReaderOptionsPrivate> privateBits; |
| |
| public: |
| ReaderOptions(); |
| ReaderOptions(const ReaderOptions&); |
| ReaderOptions(ReaderOptions&); |
| ReaderOptions& operator=(const ReaderOptions&); |
| virtual ~ReaderOptions(); |
| |
| // For files that have structs as the top-level object, select the fields |
| // to read. The first field is 0, the second 1, and so on. By default, |
| // all columns are read. This option clears any previous setting of |
| // the selected columns. |
| // @param include a list of fields to read |
| // @return this |
| ReaderOptions& include(const std::list<uint64_t>& include); |
| |
| // For files that have structs as the top-level object, select the fields |
| // to read by name. By default, all columns are read. This option clears |
| // any previous setting of the selected columns. |
| // @param include a list of fields to read |
| // @return this |
| ReaderOptions& include(const std::list<std::string>& include); |
| |
| // Selects which type ids to read. The root type is always 0 and the |
| // rest of the types are labeled in a preorder traversal of the tree. |
| // The parent types are automatically selected, but the children are not. |
| // |
| // This option clears any previous setting of the selected columns or |
| // types. |
| // @param types a list of the type ids to read |
| // @return this |
| ReaderOptions& includeTypes(const std::list<uint64_t>& types); |
| |
| // Set the section of the file to process. |
| // @param offset the starting byte offset |
| // @param length the number of bytes to read |
| // @return this |
| ReaderOptions& range(uint64_t offset, uint64_t length); |
| |
| // For Hive 0.11 (and 0.12) decimals, the precision was unlimited |
| // and thus may overflow the 38 digits that is supported. If one |
| // of the Hive 0.11 decimals is too large, the reader may either convert |
| // the value to NULL or throw an exception. That choice is controlled |
| // by this setting. |
| // |
| // Defaults to true. |
| // |
| // @param shouldThrow should the reader throw a ParseError? |
| // @return returns *this |
| ReaderOptions& throwOnHive11DecimalOverflow(bool shouldThrow); |
| |
| // For Hive 0.11 (and 0.12) written decimals, which have unlimited |
| // scale and precision, the reader forces the scale to a consistent |
| // number that is configured. This setting changes the scale that is |
| // forced upon these old decimals. See also throwOnHive11DecimalOverflow. |
| // |
| // Defaults to 6. |
| // |
| // @param forcedScale the scale that will be forced on Hive 0.11 decimals |
| // @return returns *this |
| ReaderOptions& forcedScaleOnHive11Decimal(int32_t forcedScale); |
| |
| // Set the location of the tail as defined by the logical length of the |
| // file. |
| ReaderOptions& setTailLocation(uint64_t offset); |
| |
| // Set the stream to use for printing warning or error messages. |
| ReaderOptions& setErrorStream(std::ostream& stream); // NOLINT |
| |
| // Open the file used a serialized copy of the file tail. |
| // |
| // When one process opens the file and other processes need to read |
| // the rows, we want to enable clients to just read the tail once. |
| // By passing the string returned by Reader.getSerializedFileTail(), to |
| // this function, the second reader will not need to read the file tail |
| // from disk. |
| // |
| // @param serialization the bytes of the serialized tail to use |
| ReaderOptions& setSerializedFileTail(const std::string& serialization); |
| |
| // Get the memory allocator. |
| dbcommon::MemoryPool* getMemoryPool() const; |
| |
| // Were the field ids set? |
| bool getIndexesSet() const; |
| |
| // Were the type ids set? |
| bool getTypeIdsSet() const; |
| |
| // Get the list of selected field or type ids to read. |
| const std::list<uint64_t>& getInclude() const; |
| |
| // Were the include names set? |
| bool getNamesSet() const; |
| |
| // Get the list of selected columns to read. All children of the selected |
| // columns are also selected. |
| const std::list<std::string>& getIncludeNames() const; |
| |
| // Get the start of the range for the data being processed. |
| // @return if not set, return 0 |
| uint64_t getOffset() const; |
| |
| // Get the end of the range for the data being processed. |
| // @return if not set, return the maximum long |
| uint64_t getLength() const; |
| |
| // Get the desired tail location. |
| // @return if not set, return the maximum long. |
| uint64_t getTailLocation() const; |
| |
| // Should the reader throw a ParseError when a Hive 0.11 decimal is |
| // larger than the supported 38 digits of precision? Otherwise, the |
| // data item is replaced by a NULL. |
| bool getThrowOnHive11DecimalOverflow() const; |
| |
| // What scale should all Hive 0.11 decimals be normalized to? |
| int32_t getForcedScaleOnHive11Decimal() const; |
| |
| // Get the stream to write warnings or errors to. |
| std::ostream* getErrorStream() const; |
| |
| // Get the serialized file tail that the user passed in. |
| std::string getSerializedFileTail() const; |
| |
| void setPredicateExprs(const univplan::UnivPlanExprPolyList* predicateExprs); |
| const univplan::UnivPlanExprPolyList* getPredicateExprs() const; |
| |
| void setTupleDesc(const dbcommon::TupleDesc* td); |
| const dbcommon::TupleDesc* getTupleDesc() const; |
| |
| void setReadStatsOnlyFlag(bool readStatsOnly); |
| bool readStatsOnly() const; |
| }; |
| |
| class StripeStreams { |
| public: |
| virtual ~StripeStreams(); |
| |
| // Get the reader options. |
| // @return Reader options |
| virtual const ReaderOptions& getReaderOptions() const = 0; |
| |
| // Get the array of booleans for which columns are selected. |
| // @return the address of an array which contains true at the index of |
| // each columnId is selected. |
| virtual const std::vector<bool> getSelectedColumns() const = 0; |
| |
| // Get the encoding for the given column for this stripe. |
| // @param columnId The column id |
| // @return The column encoding |
| virtual proto::ColumnEncoding getEncoding(uint64_t columnId) const = 0; |
| |
| // Get the stream for the given column/kind in this stripe. |
| // @param columnId The id of the column |
| // @param kind The kind of the stream |
| // @param shouldStream Should the reading page the stream in |
| // @return The new stream |
| virtual std::unique_ptr<SeekableInputStream> getStream( |
| uint64_t columnId, proto::Stream_Kind kind, bool shouldStream) const = 0; |
| |
| virtual std::unique_ptr<SeekableInputStream> getStreamForBloomFilter( |
| uint64_t columnId, proto::Stream_Kind kind, bool shouldStream) const = 0; |
| |
| // Get the memory pool for this reader. |
| // @return The memory pool |
| virtual dbcommon::MemoryPool& getMemoryPool() const = 0; |
| |
| // Get the writer's timezone, so that we can convert their dates correctly. |
| // @return The timezone |
| virtual const Timezone& getWriterTimezone() const = 0; |
| }; |
| |
| // The interface for reading ORC data types. |
| class ColumnReader { |
| protected: |
| std::unique_ptr<ByteRleDecoder> notNullDecoder; // it is exact a |
| // BooleanRleDecoderImpl, pay |
| // attention to the vitual |
| // function call |
| uint64_t columnId; |
| dbcommon::MemoryPool& memoryPool; |
| |
| public: |
| ColumnReader(const Type& type, StripeStreams& stipe); // NOLINT |
| |
| virtual ~ColumnReader(); |
| |
| // Skip number of specified rows. |
| // @param numValues the number of values to skip |
| // @return the number of non-null values skipped |
| virtual uint64_t skip(uint64_t numValues); |
| |
| // Read the next group of values into this rowBatch. |
| // @param rowBatch the memory to read into. |
| // @param numValues the number of values to read |
| // @param notNull if null, all values are not null. Otherwise, it is |
| // a mask (with at least numValues bytes) for which values to |
| // set. |
| virtual void next(ColumnVectorBatch& rowBatch, uint64_t numValues, // NOLINT |
| char* notNull); |
| }; |
| |
| class BooleanColumnReader : public ColumnReader { |
| private: |
| std::unique_ptr<orc::ByteRleDecoder> rle; |
| |
| public: |
| BooleanColumnReader(const Type& type, StripeStreams& stipe); // NOLINT |
| ~BooleanColumnReader(); |
| |
| uint64_t skip(uint64_t numValues) override; |
| |
| void next(ColumnVectorBatch& rowBatch, uint64_t numValues, |
| char* notNull) override; // NOLINT |
| }; |
| |
| class ByteColumnReader : public ColumnReader { |
| private: |
| std::unique_ptr<orc::ByteRleDecoder> rle; |
| |
| public: |
| ByteColumnReader(const Type& type, StripeStreams& stipe); // NOLINT |
| ~ByteColumnReader(); |
| |
| uint64_t skip(uint64_t numValues) override; |
| |
| void next(ColumnVectorBatch& rowBatch, uint64_t numValues, |
| char* notNull) override; // NOLINT |
| }; |
| |
| extern RleVersion convertRleVersion(proto::ColumnEncoding_Kind kind); |
| |
| template <class IntType> |
| class IntegerColumnReader : public ColumnReader { |
| protected: |
| std::unique_ptr<orc::RleDecoder> rle; |
| |
| public: |
| IntegerColumnReader(const Type& type, StripeStreams& stripe); // NOLINT |
| |
| // : //NOLINT |
| // ColumnReader(type, stripe) { //NOLINT |
| // RleVersion vers = |
| // convertRleVersion(stripe.getEncoding(columnId).kind()); |
| // rle = createRleDecoder<IntType>( |
| // stripe.getStream(columnId, proto::Stream_Kind_DATA, true), true, |
| // vers, |
| // memoryPool); |
| // } |
| |
| ~IntegerColumnReader() {} |
| |
| uint64_t skip(uint64_t numValues) override; |
| |
| void next(ColumnVectorBatch& rowBatch, uint64_t numValues, |
| char* notNull) override; // NOLINT |
| }; |
| |
| class LongColumnReader : public IntegerColumnReader<int64_t> { |
| public: |
| LongColumnReader(const Type& type, StripeStreams& stripe) // NOLINT |
| : IntegerColumnReader<int64_t>(type, stripe) { |
| RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind()); |
| rle = createRleDecoder( |
| stripe.getStream(columnId, proto::Stream_Kind_DATA, true), true, vers, |
| memoryPool, LONG); |
| } |
| ~LongColumnReader() {} |
| }; |
| |
| class IntColumnReader : public IntegerColumnReader<int32_t> { |
| public: |
| IntColumnReader(const Type& type, StripeStreams& stripe) // NOLINT |
| : IntegerColumnReader<int32_t>(type, stripe) { |
| RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind()); |
| rle = createRleDecoder( |
| stripe.getStream(columnId, proto::Stream_Kind_DATA, true), true, vers, |
| memoryPool, INT); |
| } |
| ~IntColumnReader() {} |
| }; |
| |
| class ShortColumnReader : public IntegerColumnReader<int16_t> { |
| public: |
| ShortColumnReader(const Type& type, StripeStreams& stripe) // NOLINT |
| : IntegerColumnReader<int16_t>(type, stripe) { |
| RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind()); |
| rle = createRleDecoder( |
| stripe.getStream(columnId, proto::Stream_Kind_DATA, true), true, vers, |
| memoryPool, SHORT); |
| } |
| ~ShortColumnReader() {} |
| }; |
| |
| class DateColumnReader : public ColumnReader { |
| private: |
| std::unique_ptr<orc::RleDecoder> rle; |
| |
| public: |
| DateColumnReader(const Type& type, StripeStreams& stripe); // NOLINT |
| |
| ~DateColumnReader() {} |
| |
| uint64_t skip(uint64_t numValues) override; |
| |
| void next(ColumnVectorBatch& rowBatch, uint64_t numValues, |
| char* notNull) override; // NOLINT |
| }; |
| |
| class TimeColumnReader : public ColumnReader { |
| private: |
| std::unique_ptr<orc::RleDecoder> rle; |
| |
| public: |
| TimeColumnReader(const Type& type, StripeStreams& stripe); // NOLINT |
| |
| ~TimeColumnReader() {} |
| |
| uint64_t skip(uint64_t numValues) override; |
| |
| void next(ColumnVectorBatch& rowBatch, uint64_t numValues, |
| char* notNull) override; // NOLINT |
| }; |
| |
| class TimestampColumnReader : public ColumnReader { |
| private: |
| std::unique_ptr<orc::RleDecoder> secondsRle; |
| std::unique_ptr<orc::RleDecoder> nanoRle; |
| const Timezone& writerTimezone; |
| const int64_t epochOffset; |
| |
| public: |
| TimestampColumnReader(const Type& type, StripeStreams& stripe); // NOLINT |
| ~TimestampColumnReader(); |
| |
| uint64_t skip(uint64_t numValues) override; |
| |
| void next(ColumnVectorBatch& rowBatch, uint64_t numValues, |
| char* notNull) override; // NOLINT |
| }; |
| |
| class DoubleColumnReader : public ColumnReader { |
| public: |
| DoubleColumnReader(const Type& type, StripeStreams& stripe); // NOLINT |
| ~DoubleColumnReader(); |
| |
| uint64_t skip(uint64_t numValues) override; |
| |
| void next(ColumnVectorBatch& rowBatch, uint64_t numValues, |
| char* notNull) override; // NOLINT |
| |
| private: |
| std::unique_ptr<SeekableInputStream> inputStream; |
| ORCTypeKind columnKind; |
| const uint64_t bytesPerValue; |
| const char* bufferPointer; |
| const char* bufferEnd; |
| const char* bufferS = nullptr; |
| const char* bufferE = nullptr; |
| |
| void nextBuffer() { |
| int bufferLength = 0; |
| const void* bufferPointer = nullptr; |
| bool result = inputStream->Next(&bufferPointer, &bufferLength); |
| if (!result) { |
| LOG_ERROR(ERRCODE_INTERNAL_ERROR, "bad read in nextBuffer"); |
| } |
| bufferS = static_cast<const char*>(bufferPointer); |
| bufferE = bufferS + bufferLength; |
| } |
| |
| void readData(char* data, uint64_t numValues) { |
| uint64_t i = 0; |
| uint64_t count; |
| if (columnKind == FLOAT) |
| count = numValues * sizeof(float); |
| else |
| count = numValues * sizeof(double); |
| while (i < count) { |
| if (bufferS == bufferE) { |
| nextBuffer(); |
| } |
| uint64_t copyBytes = |
| std::min(count - i, static_cast<uint64_t>(bufferE - bufferS)); |
| memcpy(data, bufferS, copyBytes); |
| bufferS += copyBytes; |
| data += copyBytes; |
| i += copyBytes; |
| } |
| } |
| }; |
| |
| class StringDictionaryColumnReader : public ColumnReader { |
| private: |
| DataBuffer<char> dictionaryBlob; |
| DataBuffer<int64_t> dictionaryOffset; |
| std::unique_ptr<RleDecoder> rle; |
| uint64_t dictionaryCount; |
| |
| public: |
| StringDictionaryColumnReader(const Type& type, |
| StripeStreams& stipe); // NOLINT |
| ~StringDictionaryColumnReader(); |
| |
| uint64_t skip(uint64_t numValues) override; |
| |
| void next(ColumnVectorBatch& rowBatch, uint64_t numValues, |
| char* notNull) override; // NOLINT |
| }; |
| |
| class StringDirectColumnReader : public ColumnReader { |
| private: |
| DataBuffer<char> blobBuffer; |
| std::unique_ptr<RleDecoder> lengthRle; |
| std::unique_ptr<SeekableInputStream> blobStream; |
| const char* lastBuffer; |
| size_t lastBufferLength; |
| |
| // Compute the total length of the values. |
| // @param lengths the array of lengths |
| // @param notNull the array of notNull flags |
| // @param numValues the lengths of the arrays |
| // @return the total number of bytes for the non-null values |
| size_t computeSize(const int64_t* lengths, const char* notNull, |
| uint64_t numValues); |
| |
| public: |
| StringDirectColumnReader(const Type& type, StripeStreams& stipe); // NOLINT |
| ~StringDirectColumnReader(); |
| |
| uint64_t skip(uint64_t numValues) override; |
| |
| void next(ColumnVectorBatch& rowBatch, uint64_t numValues, |
| char* notNull) override; // NOLINT |
| }; |
| |
| class StructColumnReader : public ColumnReader { |
| private: |
| std::vector<ColumnReader*> children; |
| |
| public: |
| StructColumnReader(const Type& type, StripeStreams& stipe); // NOLINT |
| ~StructColumnReader(); |
| |
| uint64_t skip(uint64_t numValues) override; |
| |
| void next(ColumnVectorBatch& rowBatch, uint64_t numValues, |
| char* notNull) override; // NOLINT |
| }; |
| |
| class ListColumnReader : public ColumnReader { |
| private: |
| std::unique_ptr<ColumnReader> child; |
| std::unique_ptr<RleDecoder> rle; |
| |
| public: |
| ListColumnReader(const Type& type, StripeStreams& stipe); // NOLINT |
| ~ListColumnReader(); |
| |
| uint64_t skip(uint64_t numValues) override; |
| |
| void next(ColumnVectorBatch& rowBatch, uint64_t numValues, |
| char* notNull) override; // NOLINT |
| }; |
| |
| class MapColumnReader : public ColumnReader { |
| private: |
| std::unique_ptr<ColumnReader> keyReader; |
| std::unique_ptr<ColumnReader> elementReader; |
| std::unique_ptr<RleDecoder> rle; |
| |
| public: |
| MapColumnReader(const Type& type, StripeStreams& stipe); // NOLINT |
| ~MapColumnReader(); |
| |
| uint64_t skip(uint64_t numValues) override; |
| |
| void next(ColumnVectorBatch& rowBatch, uint64_t numValues, |
| char* notNull) override; // NOLINT |
| }; |
| |
| class UnionColumnReader : public ColumnReader { |
| private: |
| std::unique_ptr<ByteRleDecoder> rle; |
| std::vector<ColumnReader*> childrenReader; |
| std::vector<int64_t> childrenCounts; |
| uint64_t numChildren; |
| |
| public: |
| UnionColumnReader(const Type& type, StripeStreams& stipe); // NOLINT |
| ~UnionColumnReader(); |
| |
| uint64_t skip(uint64_t numValues) override; |
| |
| void next(ColumnVectorBatch& rowBatch, uint64_t numValues, |
| char* notNull) override; // NOLINT |
| }; |
| |
| class Decimal64ColumnReader : public ColumnReader { |
| public: |
| static const uint32_t MAX_PRECISION_64 = 18; |
| static const uint32_t MAX_PRECISION_128 = 38; |
| static const int64_t POWERS_OF_TEN[MAX_PRECISION_64 + 1]; |
| |
| protected: |
| std::unique_ptr<SeekableInputStream> valueStream; |
| int32_t precision; |
| int32_t scale; |
| const char* buffer; |
| const char* bufferEnd; |
| |
| std::unique_ptr<RleDecoder> scaleDecoder; |
| |
| // Read the valueStream for more bytes. |
| void readBuffer() { |
| while (buffer == bufferEnd) { |
| int length; |
| if (!valueStream->Next(reinterpret_cast<const void**>(&buffer), |
| &length)) { |
| LOG_ERROR(ERRCODE_INTERNAL_ERROR, |
| "Read past end of stream in Decimal64ColumnReader %s", |
| valueStream->getName().c_str()); |
| } |
| bufferEnd = buffer + length; |
| } |
| } |
| |
| void readInt64(int64_t& value, int32_t currentScale) { // NOLINT |
| value = 0; |
| size_t offset = 0; |
| while (true) { |
| readBuffer(); |
| unsigned char ch = static_cast<unsigned char>(*(buffer++)); |
| value |= static_cast<uint64_t>(ch & 0x7f) << offset; |
| offset += 7; |
| if (!(ch & 0x80)) { |
| break; |
| } |
| } |
| value = unZigZag(static_cast<uint64_t>(value)); |
| if (scale > currentScale) { |
| value *= POWERS_OF_TEN[scale - currentScale]; |
| } else if (scale < currentScale) { |
| value /= POWERS_OF_TEN[currentScale - scale]; |
| } |
| } |
| |
| public: |
| Decimal64ColumnReader(const Type& type, StripeStreams& stipe); // NOLINT |
| ~Decimal64ColumnReader(); |
| |
| uint64_t skip(uint64_t numValues) override; |
| |
| void next(ColumnVectorBatch& rowBatch, uint64_t numValues, |
| char* notNull) override; // NOLINT |
| }; |
| |
| extern void unZigZagInt128(Int128& value); // NOLINT |
| extern void scaleInt128(Int128& value, uint32_t scale, // NOLINT |
| uint32_t currentScale); |
| |
| class Decimal128ColumnReader : public Decimal64ColumnReader { |
| public: |
| Decimal128ColumnReader(const Type& type, StripeStreams& stipe); // NOLINT |
| ~Decimal128ColumnReader(); |
| |
| void next(ColumnVectorBatch& rowBatch, uint64_t numValues, |
| char* notNull) override; // NOLINT |
| |
| private: |
| void readInt128(Int128& value, int32_t currentScale) { // NOLINT |
| value = 0; |
| Int128 work; |
| uint32_t offset = 0; |
| while (true) { |
| readBuffer(); |
| unsigned char ch = static_cast<unsigned char>(*(buffer++)); |
| work = ch & 0x7f; |
| work <<= offset; |
| value |= work; |
| offset += 7; |
| if (!(ch & 0x80)) { |
| break; |
| } |
| } |
| unZigZagInt128(value); |
| scaleInt128(value, static_cast<uint32_t>(scale), |
| static_cast<uint32_t>(currentScale)); |
| } |
| }; |
| |
| class DecimalHive11ColumnReader : public Decimal64ColumnReader { |
| private: |
| bool throwOnOverflow; |
| std::ostream* errorStream; |
| |
| // Read an Int128 from the stream and correct it to the desired scale. |
| bool readInt128(Int128& value, int32_t currentScale) { // NOLINT |
| // -/+ 99999999999999999999999999999999999999 |
| static const Int128 MIN_VALUE(-0x4b3b4ca85a86c47b, 0xf675ddc000000001); |
| static const Int128 MAX_VALUE(0x4b3b4ca85a86c47a, 0x098a223fffffffff); |
| |
| value = 0; |
| Int128 work; |
| uint32_t offset = 0; |
| bool result = true; |
| while (true) { |
| readBuffer(); |
| unsigned char ch = static_cast<unsigned char>(*(buffer++)); |
| work = ch & 0x7f; |
| // If we have read more than 128 bits, we flag the error, but keep |
| // reading bytes so the stream isn't thrown off. |
| if (offset > 128 || (offset == 126 && work > 3)) { |
| result = false; |
| } |
| work <<= offset; |
| value |= work; |
| offset += 7; |
| if (!(ch & 0x80)) { |
| break; |
| } |
| } |
| |
| if (!result) { |
| return result; |
| } |
| unZigZagInt128(value); |
| scaleInt128(value, static_cast<uint32_t>(scale), |
| static_cast<uint32_t>(currentScale)); |
| return value >= MIN_VALUE && value <= MAX_VALUE; |
| } |
| |
| public: |
| DecimalHive11ColumnReader(const Type& type, StripeStreams& stipe); // NOLINT |
| ~DecimalHive11ColumnReader(); |
| |
| void next(ColumnVectorBatch& rowBatch, uint64_t numValues, |
| char* notNull) override; // NOLINT |
| }; |
| |
| // The interface for reading ORC files. |
| // This is an an abstract class that will subclassed as necessary. |
| class Reader { |
| public: |
| virtual ~Reader(); |
| |
| // Get the format version of the file. Currently known values are: |
| // "0.11" and "0.12" |
| // @return the version string |
| virtual std::string getFormatVersion() const = 0; |
| |
| // Get the number of rows in the file. |
| // @return the number of rows |
| virtual uint64_t getNumberOfRows() const = 0; |
| |
| // Get the user metadata keys. |
| // @return the set of metadata keys |
| virtual std::list<std::string> getMetadataKeys() const = 0; |
| |
| // Get a user metadata value. |
| // @param key a key given by the user |
| // @return the bytes associated with the given key |
| virtual std::string getMetadataValue(const std::string& key) const = 0; |
| |
| // Did the user set the given metadata value. |
| // @param key the key to check |
| // @return true if the metadata value was set |
| virtual bool hasMetadataValue(const std::string& key) const = 0; |
| |
| // Get the compression kind. |
| // @return the kind of compression in the file |
| virtual CompressionKind getCompression() const = 0; |
| |
| // Get the buffer size for the compression. |
| // @return number of bytes to buffer for the compression codec. |
| virtual uint64_t getCompressionSize() const = 0; |
| |
| // Get the version of the writer. |
| // @return the version of the writer. |
| virtual WriterVersion getWriterVersion() const = 0; |
| |
| // Get the number of rows per a entry in the row index. |
| // @return the number of rows per an entry in the row index or 0 if there |
| // is no row index. |
| virtual uint64_t getRowIndexStride() const = 0; |
| |
| // Get the number of stripes in the file. |
| // @return the number of stripes |
| virtual uint64_t getNumberOfStripes() const = 0; |
| |
| // Get the information about a stripe. |
| // @param stripeIndex the stripe 0 to N-1 to get information about |
| // @return the information about that stripe |
| virtual std::unique_ptr<StripeInformation> getStripe( |
| uint64_t stripeIndex) const = 0; |
| |
| // Get the number of stripe statistics in the file. |
| // @return the number of stripe statistics |
| virtual uint64_t getNumberOfStripeStatistics() const = 0; |
| |
| // Get the statistics about a stripe. |
| // @param stripeIndex the stripe 0 to N-1 to get statistics about |
| // @return the statistics about that stripe |
| virtual std::unique_ptr<univplan::Statistics> getStripeStatistics( |
| uint64_t stripeIndex) const = 0; |
| |
| // Get the length of the data stripes in the file. |
| // @return the number of bytes in stripes |
| virtual uint64_t getContentLength() const = 0; |
| |
| // Get the length of the file stripe statistics |
| // @return the number of compressed bytes in the file stripe statistics |
| virtual uint64_t getStripeStatisticsLength() const = 0; |
| |
| // Get the length of the file footer |
| // @return the number of compressed bytes in the file footer |
| virtual uint64_t getFileFooterLength() const = 0; |
| |
| // Get the length of the file postscript |
| // @return the number of bytes in the file postscript |
| virtual uint64_t getFilePostscriptLength() const = 0; |
| |
| // Get the total length of the file. |
| // @return the number of bytes in the file |
| virtual uint64_t getFileLength() const = 0; |
| |
| // Get the statistics about the columns in the file. |
| // @return the information about the column |
| virtual std::unique_ptr<univplan::Statistics> getStatistics() const = 0; |
| |
| // Get the statistics about a single column in the file. |
| // @return the information about the column |
| virtual std::unique_ptr<univplan::ColumnStatistics> getColumnStatistics( |
| uint32_t columnId) const = 0; |
| |
| // Get the type of the rows in the file. The top level is typically a |
| // struct. |
| // @return the root type |
| virtual const Type& getType() const = 0; |
| |
| // Get the selected type of the rows in the file. The file's row type |
| // is projected down to just the selected columns. Thus, if the file's |
| // type is struct<col0:int,col1:double,col2:string> and the selected |
| // columns are "col0,col2" the selected type would be |
| // struct<col0:int,col2:string>. |
| // @return the root type |
| virtual const Type& getSelectedType() const = 0; |
| |
| // Get the selected columns of the file. |
| virtual const std::vector<bool> getSelectedColumns() const = 0; |
| |
| // Create a row batch for reading the selected columns of this file. |
| // @param size the number of rows to read |
| // @return a new ColumnVectorBatch to read into |
| virtual std::unique_ptr<ColumnVectorBatch> createRowBatch( |
| uint64_t size) const = 0; |
| |
| // Read the next row batch from the current position. |
| // Caller must look at numElements in the row batch to determine how |
| // many rows were read. |
| // @param data the row batch to read into. |
| // @return true if a non-zero number of rows were read or false if the |
| // end of the file was reached. |
| virtual bool next(ColumnVectorBatch& data) = 0; // NOLINT |
| |
| // Get the row number of the first row in the previously read batch. |
| // @return the row number of the previous batch. |
| virtual uint64_t getRowNumber() const = 0; |
| |
| // Seek to a given row. |
| // @param rowNumber the next row the reader should return |
| virtual void seekToRow(uint64_t rowNumber) = 0; |
| |
| // Get the name of the input stream. |
| virtual const std::string& getStreamName() const = 0; |
| |
| // check file has correct column statistics |
| virtual bool hasCorrectStatistics() const = 0; |
| |
| // Get the serialized file tail. |
| // Usefull if another reader of the same file wants to avoid re-reading |
| // the file tail. See ReaderOptions.setSerializedFileTail(). |
| // @return a string of bytes with the file tail |
| virtual std::string getSerializedFileTail() const = 0; |
| |
| // Estimate an upper bound on heap memory allocation by the Reader |
| // based on the information in the file footer. |
| // The bound is less tight if only few columns are read or compression is |
| // used. |
| // @param stripeIx index of the stripe to be read (if not specified, |
| // all stripes are considered). |
| // @return upper bound on memory use |
| virtual uint64_t getMemoryUse(int stripeIx = -1) = 0; |
| |
| virtual void collectPredicateStats(uint32_t* scanned, uint32_t* skipped) = 0; |
| |
| virtual std::unique_ptr<orc::InputStream> ownInputStream() = 0; |
| }; |
| |
| // Create a reader to the for the ORC file. |
| // @param stream the stream to read |
| // @param options the options for reading the file |
| std::unique_ptr<Reader> createReader(std::unique_ptr<InputStream> stream, |
| const ReaderOptions& options); |
| |
| class StripeStreamsImpl; |
| class ReaderImpl : public Reader { |
| private: |
| const Timezone& localTimezone; |
| |
| // inputs |
| std::unique_ptr<InputStream> stream; |
| ReaderOptions options; |
| const uint64_t fileLength; |
| const uint64_t postscriptLength; |
| std::vector<bool> selectedColumns; |
| |
| // custom memory pool |
| dbcommon::MemoryPool& memoryPool; |
| |
| // postscript |
| std::unique_ptr<proto::PostScript> postscript; |
| const uint64_t blockSize; |
| const CompressionKind compression; |
| |
| // footer |
| std::unique_ptr<proto::Footer> footer; |
| DataBuffer<uint64_t> firstRowOfStripe; |
| uint64_t numberOfStripes; |
| std::unique_ptr<Type> schema; |
| mutable std::unique_ptr<Type> selectedSchema; |
| |
| // metadata |
| mutable std::unique_ptr<proto::Metadata> metadata; |
| mutable bool isMetadataLoaded; |
| |
| // reading state |
| uint64_t previousRow; |
| uint64_t firstStripe; |
| uint64_t currentStripe; |
| uint64_t lastStripe; // the stripe AFTER the last one |
| uint64_t currentRowInStripe; |
| uint64_t rowsInCurrentStripe; |
| proto::StripeInformation currentStripeInfo; |
| std::unique_ptr<StripeStreamsImpl> currentStripeStream = nullptr; |
| std::vector<proto::StripeFooter> stripeFooters; |
| std::unique_ptr<ColumnReader> curReader; |
| std::map<std::string, uint64_t> nameIdMap; |
| std::map<uint64_t, const Type*> idTypeMap; |
| |
| // count for filter push down |
| uint32_t skippedStripe = 0; |
| uint32_t scannedStripe = 0; |
| |
| // for read stats only |
| std::unique_ptr<univplan::Statistics> currentStripeStats; |
| |
| // internal methods |
| proto::StripeFooter getStripeFooter( |
| const proto::StripeInformation& info) const; |
| void startNextStripe(); |
| void checkOrcVersion(); |
| void readMetadata() const; |
| bool notIncludeType(ColumnVectorBatch* data, orc::ORCTypeKind typekind); |
| |
| // build map from type name and id, id to Type |
| void buildTypeNameIdMap(const Type* type, |
| std::vector<std::string>& columns); // NOLINT |
| std::string toDotColumnPath(const std::vector<std::string>& columns); |
| |
| // Select the columns from the options object |
| void updateSelected(); |
| |
| // Select a field by name |
| void updateSelectedByName(const std::string& name); |
| // Select a field by id |
| void updateSelectedByFieldId(uint64_t fieldId); |
| // Select a type by id |
| void updateSelectedByTypeId(uint64_t typeId); |
| |
| // Select all of the recursive children of the given type. |
| void selectChildren(const Type& type); |
| |
| // For each child of type, select it if one of its children |
| // is selected. |
| bool selectParents(const Type& type); |
| |
| public: |
| // Constructor that lets the user specify additional options. |
| // @param stream the stream to read from |
| // @param options options for reading |
| // @param postscript the postscript for the file |
| // @param footer the footer for the file |
| // @param fileLength the length of the file in bytes |
| // @param postscriptLength the length of the postscript in bytes |
| ReaderImpl(std::unique_ptr<InputStream> stream, const ReaderOptions& options, |
| std::unique_ptr<proto::PostScript> postscript, |
| std::unique_ptr<proto::Footer> footer, uint64_t fileLength, |
| uint64_t postscriptLength); |
| |
| const ReaderOptions& getReaderOptions() const; |
| |
| CompressionKind getCompression() const override; |
| |
| std::string getFormatVersion() const override; |
| |
| WriterVersion getWriterVersion() const override; |
| |
| uint64_t getNumberOfRows() const override; |
| |
| uint64_t getRowIndexStride() const override; |
| |
| const std::string& getStreamName() const override; |
| |
| std::list<std::string> getMetadataKeys() const override; |
| |
| std::string getMetadataValue(const std::string& key) const override; |
| |
| bool hasMetadataValue(const std::string& key) const override; |
| |
| uint64_t getCompressionSize() const override; |
| |
| uint64_t getNumberOfStripes() const override; |
| |
| std::unique_ptr<StripeInformation> getStripe(uint64_t) const override; |
| |
| uint64_t getNumberOfStripeStatistics() const override; |
| |
| std::unique_ptr<univplan::Statistics> getStripeStatistics( |
| uint64_t stripeIndex) const override; |
| |
| uint64_t getContentLength() const override; |
| uint64_t getStripeStatisticsLength() const override; |
| uint64_t getFileFooterLength() const override; |
| uint64_t getFilePostscriptLength() const override; |
| uint64_t getFileLength() const override; |
| |
| std::unique_ptr<univplan::Statistics> getStatistics() const override; |
| |
| std::unique_ptr<univplan::ColumnStatistics> getColumnStatistics( |
| uint32_t columnId) const override; |
| |
| const Type& getType() const override; |
| |
| const Type& getSelectedType() const override; |
| |
| const std::vector<bool> getSelectedColumns() const override; |
| |
| std::unique_ptr<ColumnVectorBatch> createRowBatch( |
| uint64_t size) const override; |
| |
| bool next(ColumnVectorBatch& data) override; |
| |
| uint64_t getRowNumber() const override; |
| |
| void seekToRow(uint64_t rowNumber) override; |
| |
| bool hasCorrectStatistics() const override; |
| |
| std::string getSerializedFileTail() const override; |
| |
| uint64_t getMemoryUse(int stripeIx = -1) override; |
| |
| void collectPredicateStats(uint32_t* scanned, uint32_t* skipped) override; |
| |
| std::unique_ptr<orc::InputStream> ownInputStream() override; |
| |
| proto::BloomFilterIndex rebuildBloomFilter(uint32_t colId); |
| |
| bool doReadStatsOnly(ColumnVectorBatch* data); |
| }; |
| |
| // Create a reader for the given stripe. |
| // @param type The reader type |
| // @param stripe The strip stream |
| std::unique_ptr<ColumnReader> buildReader(const Type& type, |
| StripeStreams& stripe); // NOLINT |
| |
| class StripeStreamsImpl : public StripeStreams { |
| private: |
| const ReaderImpl& reader; |
| const proto::StripeFooter& footer; |
| const uint64_t stripeStart; |
| InputStream& input; |
| dbcommon::MemoryPool& memoryPool; |
| const Timezone& writerTimezone; |
| |
| public: |
| StripeStreamsImpl(const ReaderImpl& reader, const proto::StripeFooter& footer, |
| uint64_t stripeStart, |
| InputStream& input, // NOLINT |
| dbcommon::MemoryPool& memoryPool, // NOLINT |
| const Timezone& writerTimezone); |
| |
| virtual ~StripeStreamsImpl(); |
| |
| const ReaderOptions& getReaderOptions() const override; |
| |
| const std::vector<bool> getSelectedColumns() const override; |
| |
| proto::ColumnEncoding getEncoding(uint64_t columnId) const override; |
| |
| std::unique_ptr<SeekableInputStream> getStream( |
| uint64_t columnId, proto::Stream_Kind kind, |
| bool shouldStream) const override; |
| |
| std::unique_ptr<SeekableInputStream> getStreamForBloomFilter( |
| uint64_t columnId, proto::Stream_Kind kind, |
| bool shouldStream) const override; |
| |
| dbcommon::MemoryPool& getMemoryPool() const override; |
| |
| const Timezone& getWriterTimezone() const override; |
| }; |
| |
| } // end of namespace orc |
| |
| #endif // STORAGE_SRC_STORAGE_FORMAT_ORC_READER_H_ |