blob: af434c37cad284428672edebc87ee02bbc028e80 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "orc/Int128.hh"
#include "Adaptor.hh"
#include "ByteRLE.hh"
#include "ColumnReader.hh"
#include "ConvertColumnReader.hh"
#include "RLE.hh"
#include "SchemaEvolution.hh"
#include "orc/Exceptions.hh"
#include <math.h>
#include <iostream>
namespace orc {
StripeStreams::~StripeStreams() {
// PASS
}
inline RleVersion convertRleVersion(proto::ColumnEncoding_Kind kind) {
switch (static_cast<int64_t>(kind)) {
case proto::ColumnEncoding_Kind_DIRECT:
case proto::ColumnEncoding_Kind_DICTIONARY:
return RleVersion_1;
case proto::ColumnEncoding_Kind_DIRECT_V2:
case proto::ColumnEncoding_Kind_DICTIONARY_V2:
return RleVersion_2;
default:
throw ParseError("Unknown encoding in convertRleVersion");
}
}
ColumnReader::ColumnReader(const Type& type, StripeStreams& stripe)
: columnId(type.getColumnId()),
memoryPool(stripe.getMemoryPool()),
metrics(stripe.getReaderMetrics()) {
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_PRESENT, true);
if (stream.get()) {
notNullDecoder = createBooleanRleDecoder(std::move(stream), metrics);
}
}
ColumnReader::~ColumnReader() {
// PASS
}
uint64_t ColumnReader::skip(uint64_t numValues) {
ByteRleDecoder* decoder = notNullDecoder.get();
if (decoder) {
// page through the values that we want to skip
// and count how many are non-null
const size_t MAX_BUFFER_SIZE = 32768;
size_t bufferSize = std::min(MAX_BUFFER_SIZE, static_cast<size_t>(numValues));
char buffer[MAX_BUFFER_SIZE];
uint64_t remaining = numValues;
while (remaining > 0) {
uint64_t chunkSize = std::min(remaining, static_cast<uint64_t>(bufferSize));
decoder->next(buffer, chunkSize, nullptr);
remaining -= chunkSize;
for (uint64_t i = 0; i < chunkSize; ++i) {
if (!buffer[i]) {
numValues -= 1;
}
}
}
}
return numValues;
}
void ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* incomingMask) {
if (numValues > rowBatch.capacity) {
rowBatch.resize(numValues);
}
rowBatch.numElements = numValues;
ByteRleDecoder* decoder = notNullDecoder.get();
if (decoder) {
char* notNullArray = rowBatch.notNull.data();
decoder->next(notNullArray, numValues, incomingMask);
// check to see if there are nulls in this batch
for (uint64_t i = 0; i < numValues; ++i) {
if (!notNullArray[i]) {
rowBatch.hasNulls = true;
return;
}
}
} else if (incomingMask) {
// If we don't have a notNull stream, copy the incomingMask
rowBatch.hasNulls = true;
memcpy(rowBatch.notNull.data(), incomingMask, numValues);
return;
}
rowBatch.hasNulls = false;
}
void ColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) {
if (notNullDecoder.get()) {
notNullDecoder->seek(positions.at(columnId));
}
}
/**
* Expand an array of bytes in place to the corresponding array of integer.
* Has to work backwards so that they data isn't clobbered during the
* expansion.
* @param buffer the array of chars and array of longs that need to be
* expanded
* @param numValues the number of bytes to convert to longs
*/
template <typename T>
void expandBytesToIntegers(T* buffer, uint64_t numValues) {
if (sizeof(T) == sizeof(int8_t)) {
return;
}
for (uint64_t i = 0UL; i < numValues; ++i) {
buffer[numValues - 1 - i] = reinterpret_cast<int8_t*>(buffer)[numValues - 1 - i];
}
}
template <typename BatchType>
class BooleanColumnReader : public ColumnReader {
private:
std::unique_ptr<orc::ByteRleDecoder> rle_;
public:
BooleanColumnReader(const Type& type, StripeStreams& stipe);
~BooleanColumnReader() override;
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;
};
template <typename BatchType>
BooleanColumnReader<BatchType>::BooleanColumnReader(const Type& type, StripeStreams& stripe)
: ColumnReader(type, stripe) {
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (stream == nullptr) throw ParseError("DATA stream not found in Boolean column");
rle_ = createBooleanRleDecoder(std::move(stream), metrics);
}
template <typename BatchType>
BooleanColumnReader<BatchType>::~BooleanColumnReader() {
// PASS
}
template <typename BatchType>
uint64_t BooleanColumnReader<BatchType>::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
rle_->skip(numValues);
return numValues;
}
template <typename BatchType>
void BooleanColumnReader<BatchType>::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
// Since the byte rle places the output in a char* and BatchType here may be
// LongVectorBatch with long*. We cheat here in that case and use the long*
// and then expand it in a second pass..
auto* ptr = dynamic_cast<BatchType&>(rowBatch).data.data();
rle_->next(reinterpret_cast<char*>(ptr), numValues,
rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
expandBytesToIntegers(ptr, numValues);
}
template <typename BatchType>
void BooleanColumnReader<BatchType>::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
rle_->seek(positions.at(columnId));
}
template <typename BatchType>
class ByteColumnReader : public ColumnReader {
private:
std::unique_ptr<orc::ByteRleDecoder> rle_;
public:
ByteColumnReader(const Type& type, StripeStreams& stripe) : ColumnReader(type, stripe) {
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (stream == nullptr) throw ParseError("DATA stream not found in Byte column");
rle_ = createByteRleDecoder(std::move(stream), metrics);
}
~ByteColumnReader() override = default;
uint64_t skip(uint64_t numValues) override {
numValues = ColumnReader::skip(numValues);
rle_->skip(numValues);
return numValues;
}
void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
ColumnReader::next(rowBatch, numValues, notNull);
// Since the byte rle places the output in a char* instead of long*,
// we cheat here and use the long* and then expand it in a second pass.
auto* ptr = dynamic_cast<BatchType&>(rowBatch).data.data();
rle_->next(reinterpret_cast<char*>(ptr), numValues,
rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
expandBytesToIntegers(ptr, numValues);
}
void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override {
ColumnReader::seekToRowGroup(positions);
rle_->seek(positions.at(columnId));
}
};
template <typename BatchType>
class IntegerColumnReader : public ColumnReader {
protected:
std::unique_ptr<orc::RleDecoder> rle;
public:
IntegerColumnReader(const Type& type, StripeStreams& stripe) : ColumnReader(type, stripe) {
RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (stream == nullptr) throw ParseError("DATA stream not found in Integer column");
rle = createRleDecoder(std::move(stream), true, vers, memoryPool, metrics);
}
~IntegerColumnReader() override {
// PASS
}
uint64_t skip(uint64_t numValues) override {
numValues = ColumnReader::skip(numValues);
rle->skip(numValues);
return numValues;
}
void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
ColumnReader::next(rowBatch, numValues, notNull);
rle->next(dynamic_cast<BatchType&>(rowBatch).data.data(), numValues,
rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
}
void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override {
ColumnReader::seekToRowGroup(positions);
rle->seek(positions.at(columnId));
}
};
class TimestampColumnReader : public ColumnReader {
private:
std::unique_ptr<orc::RleDecoder> secondsRle_;
std::unique_ptr<orc::RleDecoder> nanoRle_;
const Timezone* writerTimezone_;
const Timezone* readerTimezone_;
const int64_t epochOffset_;
const bool sameTimezone_;
public:
TimestampColumnReader(const Type& type, StripeStreams& stripe, bool isInstantType);
~TimestampColumnReader() override;
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;
};
TimestampColumnReader::TimestampColumnReader(const Type& type, StripeStreams& stripe,
bool isInstantType)
: ColumnReader(type, stripe),
writerTimezone_(isInstantType ? &getTimezoneByName("GMT") : &stripe.getWriterTimezone()),
readerTimezone_(isInstantType ? &getTimezoneByName("GMT") : &stripe.getReaderTimezone()),
epochOffset_(writerTimezone_->getEpoch()),
sameTimezone_(writerTimezone_ == readerTimezone_) {
RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (stream == nullptr) throw ParseError("DATA stream not found in Timestamp column");
secondsRle_ = createRleDecoder(std::move(stream), true, vers, memoryPool, metrics);
stream = stripe.getStream(columnId, proto::Stream_Kind_SECONDARY, true);
if (stream == nullptr) throw ParseError("SECONDARY stream not found in Timestamp column");
nanoRle_ = createRleDecoder(std::move(stream), false, vers, memoryPool, metrics);
}
TimestampColumnReader::~TimestampColumnReader() {
// PASS
}
uint64_t TimestampColumnReader::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
secondsRle_->skip(numValues);
nanoRle_->skip(numValues);
return numValues;
}
void TimestampColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
TimestampVectorBatch& timestampBatch = dynamic_cast<TimestampVectorBatch&>(rowBatch);
int64_t* secsBuffer = timestampBatch.data.data();
secondsRle_->next(secsBuffer, numValues, notNull);
int64_t* nanoBuffer = timestampBatch.nanoseconds.data();
nanoRle_->next(nanoBuffer, numValues, notNull);
// Construct the values
for (uint64_t i = 0; i < numValues; i++) {
if (notNull == nullptr || notNull[i]) {
uint64_t zeros = nanoBuffer[i] & 0x7;
nanoBuffer[i] >>= 3;
if (zeros != 0) {
for (uint64_t j = 0; j <= zeros; ++j) {
nanoBuffer[i] *= 10;
}
}
int64_t writerTime = secsBuffer[i] + epochOffset_;
if (!sameTimezone_) {
// adjust timestamp value to same wall clock time if writer and reader
// time zones have different rules, which is required for Apache Orc.
const auto& wv = writerTimezone_->getVariant(writerTime);
const auto& rv = readerTimezone_->getVariant(writerTime);
if (!wv.hasSameTzRule(rv)) {
// If the timezone adjustment moves the millis across a DST boundary,
// we need to reevaluate the offsets.
int64_t adjustedTime = writerTime + wv.gmtOffset - rv.gmtOffset;
const auto& adjustedReader = readerTimezone_->getVariant(adjustedTime);
writerTime = writerTime + wv.gmtOffset - adjustedReader.gmtOffset;
}
}
secsBuffer[i] = writerTime;
if (secsBuffer[i] < 0 && nanoBuffer[i] > 999999) {
secsBuffer[i] -= 1;
}
}
}
}
void TimestampColumnReader::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
secondsRle_->seek(positions.at(columnId));
nanoRle_->seek(positions.at(columnId));
}
template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType>
class DoubleColumnReader : public ColumnReader {
public:
DoubleColumnReader(const Type& type, StripeStreams& stripe);
~DoubleColumnReader() override {}
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;
private:
std::unique_ptr<SeekableInputStream> inputStream_;
const uint64_t bytesPerValue_ = (columnKind == FLOAT) ? 4 : 8;
const char* bufferPointer_;
const char* bufferEnd_;
unsigned char readByte() {
if (bufferPointer_ == bufferEnd_) {
int length;
if (!inputStream_->Next(reinterpret_cast<const void**>(&bufferPointer_), &length)) {
throw ParseError("bad read in DoubleColumnReader::next()");
}
bufferEnd_ = bufferPointer_ + length;
}
return static_cast<unsigned char>(*(bufferPointer_++));
}
template <typename FloatType>
FloatType readDouble() {
int64_t bits = 0;
if (bufferEnd_ - bufferPointer_ >= 8) {
if (isLittleEndian) {
memcpy(&bits, bufferPointer_, sizeof(bits));
} else {
bits = static_cast<int64_t>(static_cast<unsigned char>(bufferPointer_[0]));
bits |= static_cast<int64_t>(static_cast<unsigned char>(bufferPointer_[1])) << 8;
bits |= static_cast<int64_t>(static_cast<unsigned char>(bufferPointer_[2])) << 16;
bits |= static_cast<int64_t>(static_cast<unsigned char>(bufferPointer_[3])) << 24;
bits |= static_cast<int64_t>(static_cast<unsigned char>(bufferPointer_[4])) << 32;
bits |= static_cast<int64_t>(static_cast<unsigned char>(bufferPointer_[5])) << 40;
bits |= static_cast<int64_t>(static_cast<unsigned char>(bufferPointer_[6])) << 48;
bits |= static_cast<int64_t>(static_cast<unsigned char>(bufferPointer_[7])) << 56;
}
bufferPointer_ += 8;
} else {
for (uint64_t i = 0; i < 8; i++) {
bits |= static_cast<int64_t>(readByte()) << (i * 8);
}
}
FloatType* result = reinterpret_cast<FloatType*>(&bits);
return *result;
}
template <typename FloatType>
FloatType readFloat() {
int32_t bits = 0;
if (bufferEnd_ - bufferPointer_ >= 4) {
if (isLittleEndian) {
bits = *(reinterpret_cast<const int32_t*>(bufferPointer_));
} else {
bits = static_cast<unsigned char>(bufferPointer_[0]);
bits |= static_cast<unsigned char>(bufferPointer_[1]) << 8;
bits |= static_cast<unsigned char>(bufferPointer_[2]) << 16;
bits |= static_cast<unsigned char>(bufferPointer_[3]) << 24;
}
bufferPointer_ += 4;
} else {
for (uint64_t i = 0; i < 4; i++) {
bits |= readByte() << (i * 8);
}
}
float* result = reinterpret_cast<float*>(&bits);
if (!result) {
std::cerr << "read float empty." << std::endl;
}
return static_cast<FloatType>(*result);
}
};
template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType>
DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::DoubleColumnReader(
const Type& type, StripeStreams& stripe)
: ColumnReader(type, stripe), bufferPointer_(nullptr), bufferEnd_(nullptr) {
inputStream_ = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (inputStream_ == nullptr) throw ParseError("DATA stream not found in Double column");
}
template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType>
uint64_t DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::skip(
uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
if (static_cast<size_t>(bufferEnd_ - bufferPointer_) >= bytesPerValue_ * numValues) {
bufferPointer_ += bytesPerValue_ * numValues;
} else {
size_t sizeToSkip =
bytesPerValue_ * numValues - static_cast<size_t>(bufferEnd_ - bufferPointer_);
const size_t cap = static_cast<size_t>(std::numeric_limits<int>::max());
while (sizeToSkip != 0) {
size_t step = sizeToSkip > cap ? cap : sizeToSkip;
inputStream_->Skip(static_cast<int>(step));
sizeToSkip -= step;
}
bufferEnd_ = nullptr;
bufferPointer_ = nullptr;
}
return numValues;
}
template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType>
void DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::next(
ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
// update the notNull from the parent class
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
ValueType* outArray =
reinterpret_cast<ValueType*>(dynamic_cast<BatchType&>(rowBatch).data.data());
if constexpr (columnKind == FLOAT) {
if (notNull) {
for (size_t i = 0; i < numValues; ++i) {
if (notNull[i]) {
outArray[i] = readFloat<ValueType>();
}
}
} else {
for (size_t i = 0; i < numValues; ++i) {
outArray[i] = readFloat<ValueType>();
}
}
} else {
if (notNull) {
for (size_t i = 0; i < numValues; ++i) {
if (notNull[i]) {
outArray[i] = readDouble<ValueType>();
}
}
} else {
// Number of values in the buffer that we can copy directly.
// Only viable when the machine is little-endian.
uint64_t bufferNum = 0;
if (isLittleEndian) {
bufferNum = std::min(numValues,
static_cast<size_t>(bufferEnd_ - bufferPointer_) / bytesPerValue_);
uint64_t bufferBytes = bufferNum * bytesPerValue_;
if (bufferBytes > 0) {
memcpy(outArray, bufferPointer_, bufferBytes);
bufferPointer_ += bufferBytes;
}
}
for (size_t i = bufferNum; i < numValues; ++i) {
outArray[i] = readDouble<ValueType>();
}
}
}
}
template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType>
void DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
inputStream_->seek(positions.at(columnId));
// clear buffer state after seek
bufferEnd_ = nullptr;
bufferPointer_ = nullptr;
}
void readFully(char* buffer, int64_t bufferSize, SeekableInputStream* stream) {
int64_t posn = 0;
while (posn < bufferSize) {
const void* chunk;
int length;
if (!stream->Next(&chunk, &length)) {
throw ParseError("bad read in readFully");
}
if (posn + length > bufferSize) {
throw ParseError("Corrupt dictionary blob in StringDictionaryColumn");
}
memcpy(buffer + posn, chunk, static_cast<size_t>(length));
posn += length;
}
}
class StringDictionaryColumnReader : public ColumnReader {
private:
std::shared_ptr<StringDictionary> dictionary_;
std::unique_ptr<RleDecoder> rle_;
public:
StringDictionaryColumnReader(const Type& type, StripeStreams& stipe);
~StringDictionaryColumnReader() override;
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;
};
StringDictionaryColumnReader::StringDictionaryColumnReader(const Type& type,
StripeStreams& stripe)
: ColumnReader(type, stripe), dictionary_(new StringDictionary(stripe.getMemoryPool())) {
RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId).kind());
uint32_t dictSize = stripe.getEncoding(columnId).dictionary_size();
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (stream == nullptr) {
throw ParseError("DATA stream not found in StringDictionaryColumn");
}
rle_ = createRleDecoder(std::move(stream), false, rleVersion, memoryPool, metrics);
stream = stripe.getStream(columnId, proto::Stream_Kind_LENGTH, false);
if (dictSize > 0 && stream == nullptr) {
throw ParseError("LENGTH stream not found in StringDictionaryColumn");
}
std::unique_ptr<RleDecoder> lengthDecoder =
createRleDecoder(std::move(stream), false, rleVersion, memoryPool, metrics);
dictionary_->dictionaryOffset.resize(dictSize + 1);
int64_t* lengthArray = dictionary_->dictionaryOffset.data();
lengthDecoder->next(lengthArray + 1, dictSize, nullptr);
lengthArray[0] = 0;
for (uint32_t i = 1; i < dictSize + 1; ++i) {
if (lengthArray[i] < 0) {
throw ParseError("Negative dictionary entry length");
}
lengthArray[i] += lengthArray[i - 1];
}
int64_t blobSize = lengthArray[dictSize];
dictionary_->dictionaryBlob.resize(static_cast<uint64_t>(blobSize));
std::unique_ptr<SeekableInputStream> blobStream =
stripe.getStream(columnId, proto::Stream_Kind_DICTIONARY_DATA, false);
if (blobSize > 0 && blobStream == nullptr) {
throw ParseError("DICTIONARY_DATA stream not found in StringDictionaryColumn");
}
readFully(dictionary_->dictionaryBlob.data(), blobSize, blobStream.get());
}
StringDictionaryColumnReader::~StringDictionaryColumnReader() {
// PASS
}
uint64_t StringDictionaryColumnReader::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
rle_->skip(numValues);
return numValues;
}
void StringDictionaryColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
// update the notNull from the parent class
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
char* blob = dictionary_->dictionaryBlob.data();
int64_t* dictionaryOffsets = dictionary_->dictionaryOffset.data();
char** outputStarts = byteBatch.data.data();
int64_t* outputLengths = byteBatch.length.data();
rle_->next(outputLengths, numValues, notNull);
uint64_t dictionaryCount = dictionary_->dictionaryOffset.size() - 1;
if (notNull) {
for (uint64_t i = 0; i < numValues; ++i) {
if (notNull[i]) {
int64_t entry = outputLengths[i];
if (entry < 0 || static_cast<uint64_t>(entry) >= dictionaryCount) {
throw ParseError("Entry index out of range in StringDictionaryColumn");
}
outputStarts[i] = blob + dictionaryOffsets[entry];
outputLengths[i] = dictionaryOffsets[entry + 1] - dictionaryOffsets[entry];
}
}
} else {
for (uint64_t i = 0; i < numValues; ++i) {
int64_t entry = outputLengths[i];
if (entry < 0 || static_cast<uint64_t>(entry) >= dictionaryCount) {
throw ParseError("Entry index out of range in StringDictionaryColumn");
}
outputStarts[i] = blob + dictionaryOffsets[entry];
outputLengths[i] = dictionaryOffsets[entry + 1] - dictionaryOffsets[entry];
}
}
}
void StringDictionaryColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
rowBatch.isEncoded = true;
EncodedStringVectorBatch& batch = dynamic_cast<EncodedStringVectorBatch&>(rowBatch);
batch.dictionary = this->dictionary_;
// Length buffer is reused to save dictionary entry ids
rle_->next(batch.index.data(), numValues, notNull);
}
void StringDictionaryColumnReader::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
rle_->seek(positions.at(columnId));
}
class StringDirectColumnReader : public ColumnReader {
private:
std::unique_ptr<RleDecoder> lengthRle_;
std::unique_ptr<SeekableInputStream> blobStream_;
const char* lastBuffer_;
size_t lastBufferLength_;
/**
* Compute the total length of the values.
* @param lengths the array of lengths
* @param notNull the array of notNull flags
* @param numValues the lengths of the arrays
* @return the total number of bytes for the non-null values
*/
size_t computeSize(const int64_t* lengths, const char* notNull, uint64_t numValues);
public:
StringDirectColumnReader(const Type& type, StripeStreams& stipe);
~StringDirectColumnReader() override;
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;
};
StringDirectColumnReader::StringDirectColumnReader(const Type& type, StripeStreams& stripe)
: ColumnReader(type, stripe) {
RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId).kind());
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true);
if (stream == nullptr) throw ParseError("LENGTH stream not found in StringDirectColumn");
lengthRle_ = createRleDecoder(std::move(stream), false, rleVersion, memoryPool, metrics);
blobStream_ = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (blobStream_ == nullptr) throw ParseError("DATA stream not found in StringDirectColumn");
lastBuffer_ = nullptr;
lastBufferLength_ = 0;
}
StringDirectColumnReader::~StringDirectColumnReader() {
// PASS
}
uint64_t StringDirectColumnReader::skip(uint64_t numValues) {
const size_t BUFFER_SIZE = 1024;
numValues = ColumnReader::skip(numValues);
int64_t buffer[BUFFER_SIZE];
uint64_t done = 0;
size_t totalBytes = 0;
// read the lengths, so we know haw many bytes to skip
while (done < numValues) {
uint64_t step = std::min(BUFFER_SIZE, static_cast<size_t>(numValues - done));
lengthRle_->next(buffer, step, nullptr);
totalBytes += computeSize(buffer, nullptr, step);
done += step;
}
if (totalBytes <= lastBufferLength_) {
// subtract the needed bytes from the ones left over
lastBufferLength_ -= totalBytes;
lastBuffer_ += totalBytes;
} else {
// move the stream forward after accounting for the buffered bytes
totalBytes -= lastBufferLength_;
const size_t cap = static_cast<size_t>(std::numeric_limits<int>::max());
while (totalBytes != 0) {
size_t step = totalBytes > cap ? cap : totalBytes;
blobStream_->Skip(static_cast<int>(step));
totalBytes -= step;
}
lastBufferLength_ = 0;
lastBuffer_ = nullptr;
}
return numValues;
}
size_t StringDirectColumnReader::computeSize(const int64_t* lengths, const char* notNull,
uint64_t numValues) {
size_t totalLength = 0;
if (notNull) {
for (size_t i = 0; i < numValues; ++i) {
if (notNull[i]) {
totalLength += static_cast<size_t>(lengths[i]);
}
}
} else {
for (size_t i = 0; i < numValues; ++i) {
totalLength += static_cast<size_t>(lengths[i]);
}
}
return totalLength;
}
void StringDirectColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
// update the notNull from the parent class
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
char** startPtr = byteBatch.data.data();
int64_t* lengthPtr = byteBatch.length.data();
// read the length vector
lengthRle_->next(lengthPtr, numValues, notNull);
// figure out the total length of data we need from the blob stream
const size_t totalLength = computeSize(lengthPtr, notNull, numValues);
// Load data from the blob stream into our buffer until we have enough
// to get the rest directly out of the stream's buffer.
size_t bytesBuffered = 0;
byteBatch.blob.resize(totalLength);
char* ptr = byteBatch.blob.data();
while (bytesBuffered + lastBufferLength_ < totalLength) {
memcpy(ptr + bytesBuffered, lastBuffer_, lastBufferLength_);
bytesBuffered += lastBufferLength_;
const void* readBuffer;
int readLength;
if (!blobStream_->Next(&readBuffer, &readLength)) {
throw ParseError("failed to read in StringDirectColumnReader.next");
}
lastBuffer_ = static_cast<const char*>(readBuffer);
lastBufferLength_ = static_cast<size_t>(readLength);
}
if (bytesBuffered < totalLength) {
size_t moreBytes = totalLength - bytesBuffered;
memcpy(ptr + bytesBuffered, lastBuffer_, moreBytes);
lastBuffer_ += moreBytes;
lastBufferLength_ -= moreBytes;
}
size_t filledSlots = 0;
ptr = byteBatch.blob.data();
if (notNull) {
while (filledSlots < numValues) {
if (notNull[filledSlots]) {
startPtr[filledSlots] = const_cast<char*>(ptr);
ptr += lengthPtr[filledSlots];
}
filledSlots += 1;
}
} else {
while (filledSlots < numValues) {
startPtr[filledSlots] = const_cast<char*>(ptr);
ptr += lengthPtr[filledSlots];
filledSlots += 1;
}
}
}
void StringDirectColumnReader::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
blobStream_->seek(positions.at(columnId));
lengthRle_->seek(positions.at(columnId));
// clear buffer state after seek
lastBuffer_ = nullptr;
lastBufferLength_ = 0;
}
class StructColumnReader : public ColumnReader {
private:
std::vector<std::unique_ptr<ColumnReader>> children_;
public:
StructColumnReader(const Type& type, StripeStreams& stripe, bool useTightNumericVector = false,
bool throwOnSchemaEvolutionOverflow = false);
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;
private:
template <bool encoded>
void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull);
};
StructColumnReader::StructColumnReader(const Type& type, StripeStreams& stripe,
bool useTightNumericVector,
bool throwOnSchemaEvolutionOverflow)
: ColumnReader(type, stripe) {
// count the number of selected sub-columns
const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
switch (static_cast<int64_t>(stripe.getEncoding(columnId).kind())) {
case proto::ColumnEncoding_Kind_DIRECT:
for (unsigned int i = 0; i < type.getSubtypeCount(); ++i) {
const Type& child = *type.getSubtype(i);
if (selectedColumns[static_cast<uint64_t>(child.getColumnId())]) {
children_.push_back(
buildReader(child, stripe, useTightNumericVector, throwOnSchemaEvolutionOverflow));
}
}
break;
case proto::ColumnEncoding_Kind_DIRECT_V2:
case proto::ColumnEncoding_Kind_DICTIONARY:
case proto::ColumnEncoding_Kind_DICTIONARY_V2:
default:
throw ParseError("Unknown encoding for StructColumnReader");
}
}
uint64_t StructColumnReader::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
for (auto& ptr : children_) {
ptr->skip(numValues);
}
return numValues;
}
void StructColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
nextInternal<false>(rowBatch, numValues, notNull);
}
void StructColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull) {
nextInternal<true>(rowBatch, numValues, notNull);
}
template <bool encoded>
void StructColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
uint64_t i = 0;
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
for (auto iter = children_.begin(); iter != children_.end(); ++iter, ++i) {
if (encoded) {
(*iter)->nextEncoded(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]), numValues,
notNull);
} else {
(*iter)->next(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]), numValues, notNull);
}
}
}
void StructColumnReader::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
for (auto& ptr : children_) {
ptr->seekToRowGroup(positions);
}
}
class ListColumnReader : public ColumnReader {
private:
std::unique_ptr<ColumnReader> child_;
std::unique_ptr<RleDecoder> rle_;
public:
ListColumnReader(const Type& type, StripeStreams& stipe, bool useTightNumericVector = false,
bool throwOnSchemaEvolutionOverflow = false);
~ListColumnReader() override;
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;
private:
template <bool encoded>
void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull);
};
ListColumnReader::ListColumnReader(const Type& type, StripeStreams& stripe,
bool useTightNumericVector,
bool throwOnSchemaEvolutionOverflow)
: ColumnReader(type, stripe) {
// count the number of selected sub-columns
const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true);
if (stream == nullptr) throw ParseError("LENGTH stream not found in List column");
rle_ = createRleDecoder(std::move(stream), false, vers, memoryPool, metrics);
const Type& childType = *type.getSubtype(0);
if (selectedColumns[static_cast<uint64_t>(childType.getColumnId())]) {
child_ =
buildReader(childType, stripe, useTightNumericVector, throwOnSchemaEvolutionOverflow);
}
}
ListColumnReader::~ListColumnReader() {
// PASS
}
uint64_t ListColumnReader::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
ColumnReader* childReader = child_.get();
if (childReader) {
const uint64_t BUFFER_SIZE = 1024;
int64_t buffer[BUFFER_SIZE];
uint64_t childrenElements = 0;
uint64_t lengthsRead = 0;
while (lengthsRead < numValues) {
uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE);
rle_->next(buffer, chunk, nullptr);
for (size_t i = 0; i < chunk; ++i) {
childrenElements += static_cast<size_t>(buffer[i]);
}
lengthsRead += chunk;
}
childReader->skip(childrenElements);
} else {
rle_->skip(numValues);
}
return numValues;
}
void ListColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
nextInternal<false>(rowBatch, numValues, notNull);
}
void ListColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull) {
nextInternal<true>(rowBatch, numValues, notNull);
}
template <bool encoded>
void ListColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
ListVectorBatch& listBatch = dynamic_cast<ListVectorBatch&>(rowBatch);
int64_t* offsets = listBatch.offsets.data();
notNull = listBatch.hasNulls ? listBatch.notNull.data() : nullptr;
rle_->next(offsets, numValues, notNull);
uint64_t totalChildren = 0;
if (notNull) {
for (size_t i = 0; i < numValues; ++i) {
if (notNull[i]) {
uint64_t tmp = static_cast<uint64_t>(offsets[i]);
offsets[i] = static_cast<int64_t>(totalChildren);
totalChildren += tmp;
} else {
offsets[i] = static_cast<int64_t>(totalChildren);
}
}
} else {
for (size_t i = 0; i < numValues; ++i) {
uint64_t tmp = static_cast<uint64_t>(offsets[i]);
offsets[i] = static_cast<int64_t>(totalChildren);
totalChildren += tmp;
}
}
offsets[numValues] = static_cast<int64_t>(totalChildren);
ColumnReader* childReader = child_.get();
if (childReader) {
if (encoded) {
childReader->nextEncoded(*(listBatch.elements.get()), totalChildren, nullptr);
} else {
childReader->next(*(listBatch.elements.get()), totalChildren, nullptr);
}
}
}
void ListColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
rle_->seek(positions.at(columnId));
if (child_.get()) {
child_->seekToRowGroup(positions);
}
}
class MapColumnReader : public ColumnReader {
private:
std::unique_ptr<ColumnReader> keyReader_;
std::unique_ptr<ColumnReader> elementReader_;
std::unique_ptr<RleDecoder> rle_;
public:
MapColumnReader(const Type& type, StripeStreams& stipe, bool useTightNumericVector = false,
bool throwOnSchemaEvolutionOverflow = false);
~MapColumnReader() override;
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;
private:
template <bool encoded>
void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull);
};
MapColumnReader::MapColumnReader(const Type& type, StripeStreams& stripe,
bool useTightNumericVector, bool throwOnSchemaEvolutionOverflow)
: ColumnReader(type, stripe) {
// Determine if the key and/or value columns are selected
const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true);
if (stream == nullptr) throw ParseError("LENGTH stream not found in Map column");
rle_ = createRleDecoder(std::move(stream), false, vers, memoryPool, metrics);
const Type& keyType = *type.getSubtype(0);
if (selectedColumns[static_cast<uint64_t>(keyType.getColumnId())]) {
keyReader_ =
buildReader(keyType, stripe, useTightNumericVector, throwOnSchemaEvolutionOverflow);
}
const Type& elementType = *type.getSubtype(1);
if (selectedColumns[static_cast<uint64_t>(elementType.getColumnId())]) {
elementReader_ =
buildReader(elementType, stripe, useTightNumericVector, throwOnSchemaEvolutionOverflow);
}
}
MapColumnReader::~MapColumnReader() {
// PASS
}
uint64_t MapColumnReader::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
ColumnReader* rawKeyReader = keyReader_.get();
ColumnReader* rawElementReader = elementReader_.get();
if (rawKeyReader || rawElementReader) {
const uint64_t BUFFER_SIZE = 1024;
int64_t buffer[BUFFER_SIZE];
uint64_t childrenElements = 0;
uint64_t lengthsRead = 0;
while (lengthsRead < numValues) {
uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE);
rle_->next(buffer, chunk, nullptr);
for (size_t i = 0; i < chunk; ++i) {
childrenElements += static_cast<size_t>(buffer[i]);
}
lengthsRead += chunk;
}
if (rawKeyReader) {
rawKeyReader->skip(childrenElements);
}
if (rawElementReader) {
rawElementReader->skip(childrenElements);
}
} else {
rle_->skip(numValues);
}
return numValues;
}
void MapColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
nextInternal<false>(rowBatch, numValues, notNull);
}
void MapColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull) {
nextInternal<true>(rowBatch, numValues, notNull);
}
template <bool encoded>
void MapColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
MapVectorBatch& mapBatch = dynamic_cast<MapVectorBatch&>(rowBatch);
int64_t* offsets = mapBatch.offsets.data();
notNull = mapBatch.hasNulls ? mapBatch.notNull.data() : nullptr;
rle_->next(offsets, numValues, notNull);
uint64_t totalChildren = 0;
if (notNull) {
for (size_t i = 0; i < numValues; ++i) {
if (notNull[i]) {
uint64_t tmp = static_cast<uint64_t>(offsets[i]);
offsets[i] = static_cast<int64_t>(totalChildren);
totalChildren += tmp;
} else {
offsets[i] = static_cast<int64_t>(totalChildren);
}
}
} else {
for (size_t i = 0; i < numValues; ++i) {
uint64_t tmp = static_cast<uint64_t>(offsets[i]);
offsets[i] = static_cast<int64_t>(totalChildren);
totalChildren += tmp;
}
}
offsets[numValues] = static_cast<int64_t>(totalChildren);
ColumnReader* rawKeyReader = keyReader_.get();
if (rawKeyReader) {
if (encoded) {
rawKeyReader->nextEncoded(*(mapBatch.keys.get()), totalChildren, nullptr);
} else {
rawKeyReader->next(*(mapBatch.keys.get()), totalChildren, nullptr);
}
}
ColumnReader* rawElementReader = elementReader_.get();
if (rawElementReader) {
if (encoded) {
rawElementReader->nextEncoded(*(mapBatch.elements.get()), totalChildren, nullptr);
} else {
rawElementReader->next(*(mapBatch.elements.get()), totalChildren, nullptr);
}
}
}
void MapColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
rle_->seek(positions.at(columnId));
if (keyReader_.get()) {
keyReader_->seekToRowGroup(positions);
}
if (elementReader_.get()) {
elementReader_->seekToRowGroup(positions);
}
}
class UnionColumnReader : public ColumnReader {
private:
std::unique_ptr<ByteRleDecoder> rle_;
std::vector<std::unique_ptr<ColumnReader>> childrenReader_;
std::vector<int64_t> childrenCounts_;
uint64_t numChildren_;
public:
UnionColumnReader(const Type& type, StripeStreams& stipe, bool useTightNumericVector = false,
bool throwOnSchemaEvolutionOverflow = false);
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;
private:
template <bool encoded>
void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull);
};
UnionColumnReader::UnionColumnReader(const Type& type, StripeStreams& stripe,
bool useTightNumericVector,
bool throwOnSchemaEvolutionOverflow)
: ColumnReader(type, stripe) {
numChildren_ = type.getSubtypeCount();
childrenReader_.resize(numChildren_);
childrenCounts_.resize(numChildren_);
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (stream == nullptr) throw ParseError("LENGTH stream not found in Union column");
rle_ = createByteRleDecoder(std::move(stream), metrics);
// figure out which types are selected
const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
for (unsigned int i = 0; i < numChildren_; ++i) {
const Type& child = *type.getSubtype(i);
if (selectedColumns[static_cast<size_t>(child.getColumnId())]) {
childrenReader_[i] =
buildReader(child, stripe, useTightNumericVector, throwOnSchemaEvolutionOverflow);
}
}
}
uint64_t UnionColumnReader::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
const uint64_t BUFFER_SIZE = 1024;
char buffer[BUFFER_SIZE];
uint64_t lengthsRead = 0;
int64_t* counts = childrenCounts_.data();
memset(counts, 0, sizeof(int64_t) * numChildren_);
while (lengthsRead < numValues) {
uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE);
rle_->next(buffer, chunk, nullptr);
for (size_t i = 0; i < chunk; ++i) {
counts[static_cast<size_t>(buffer[i])] += 1;
}
lengthsRead += chunk;
}
for (size_t i = 0; i < numChildren_; ++i) {
if (counts[i] != 0 && childrenReader_[i] != nullptr) {
childrenReader_[i]->skip(static_cast<uint64_t>(counts[i]));
}
}
return numValues;
}
void UnionColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
nextInternal<false>(rowBatch, numValues, notNull);
}
void UnionColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull) {
nextInternal<true>(rowBatch, numValues, notNull);
}
template <bool encoded>
void UnionColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
UnionVectorBatch& unionBatch = dynamic_cast<UnionVectorBatch&>(rowBatch);
uint64_t* offsets = unionBatch.offsets.data();
int64_t* counts = childrenCounts_.data();
memset(counts, 0, sizeof(int64_t) * numChildren_);
unsigned char* tags = unionBatch.tags.data();
notNull = unionBatch.hasNulls ? unionBatch.notNull.data() : nullptr;
rle_->next(reinterpret_cast<char*>(tags), numValues, notNull);
// set the offsets for each row
if (notNull) {
for (size_t i = 0; i < numValues; ++i) {
if (notNull[i]) {
offsets[i] = static_cast<uint64_t>(counts[static_cast<size_t>(tags[i])]++);
}
}
} else {
for (size_t i = 0; i < numValues; ++i) {
offsets[i] = static_cast<uint64_t>(counts[static_cast<size_t>(tags[i])]++);
}
}
// read the right number of each child column
for (size_t i = 0; i < numChildren_; ++i) {
if (childrenReader_[i] != nullptr) {
if (encoded) {
childrenReader_[i]->nextEncoded(*(unionBatch.children[i]),
static_cast<uint64_t>(counts[i]), nullptr);
} else {
childrenReader_[i]->next(*(unionBatch.children[i]), static_cast<uint64_t>(counts[i]),
nullptr);
}
}
}
}
void UnionColumnReader::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
rle_->seek(positions.at(columnId));
for (size_t i = 0; i < numChildren_; ++i) {
if (childrenReader_[i] != nullptr) {
childrenReader_[i]->seekToRowGroup(positions);
}
}
}
/**
* Destructively convert the number from zigzag encoding to the
* natural signed representation.
*/
void unZigZagInt128(Int128& value) {
bool needsNegate = value.getLowBits() & 1;
value >>= 1;
if (needsNegate) {
value.negate();
value -= 1;
}
}
class Decimal64ColumnReader : public ColumnReader {
public:
static const uint32_t MAX_PRECISION_64 = 18;
static const uint32_t MAX_PRECISION_128 = 38;
static const int64_t POWERS_OF_TEN[MAX_PRECISION_64 + 1];
protected:
std::unique_ptr<SeekableInputStream> valueStream;
int32_t precision;
int32_t scale;
const char* buffer;
const char* bufferEnd;
std::unique_ptr<RleDecoder> scaleDecoder;
/**
* Read the valueStream for more bytes.
*/
void readBuffer() {
while (buffer == bufferEnd) {
int length;
if (!valueStream->Next(reinterpret_cast<const void**>(&buffer), &length)) {
throw ParseError("Read past end of stream in Decimal64ColumnReader " +
valueStream->getName());
}
bufferEnd = buffer + length;
}
}
void readInt64(int64_t& value, int32_t currentScale) {
value = 0;
size_t offset = 0;
while (true) {
readBuffer();
unsigned char ch = static_cast<unsigned char>(*(buffer++));
value |= static_cast<uint64_t>(ch & 0x7f) << offset;
offset += 7;
if (!(ch & 0x80)) {
break;
}
}
value = unZigZag(static_cast<uint64_t>(value));
if (scale > currentScale && static_cast<uint64_t>(scale - currentScale) <= MAX_PRECISION_64) {
value *= POWERS_OF_TEN[scale - currentScale];
} else if (scale < currentScale &&
static_cast<uint64_t>(currentScale - scale) <= MAX_PRECISION_64) {
value /= POWERS_OF_TEN[currentScale - scale];
} else if (scale != currentScale) {
throw ParseError("Decimal scale out of range");
}
}
public:
Decimal64ColumnReader(const Type& type, StripeStreams& stipe);
~Decimal64ColumnReader() override;
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;
};
const uint32_t Decimal64ColumnReader::MAX_PRECISION_64;
const uint32_t Decimal64ColumnReader::MAX_PRECISION_128;
const int64_t Decimal64ColumnReader::POWERS_OF_TEN[MAX_PRECISION_64 + 1] = {1,
10,
100,
1000,
10000,
100000,
1000000,
10000000,
100000000,
1000000000,
10000000000,
100000000000,
1000000000000,
10000000000000,
100000000000000,
1000000000000000,
10000000000000000,
100000000000000000,
1000000000000000000};
Decimal64ColumnReader::Decimal64ColumnReader(const Type& type, StripeStreams& stripe)
: ColumnReader(type, stripe) {
scale = static_cast<int32_t>(type.getScale());
precision = static_cast<int32_t>(type.getPrecision());
valueStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (valueStream == nullptr) throw ParseError("DATA stream not found in Decimal64Column");
buffer = nullptr;
bufferEnd = nullptr;
RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_SECONDARY, true);
if (stream == nullptr) throw ParseError("SECONDARY stream not found in Decimal64Column");
scaleDecoder = createRleDecoder(std::move(stream), true, vers, memoryPool, metrics);
}
Decimal64ColumnReader::~Decimal64ColumnReader() {
// PASS
}
uint64_t Decimal64ColumnReader::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
uint64_t skipped = 0;
while (skipped < numValues) {
readBuffer();
if (!(0x80 & *(buffer++))) {
skipped += 1;
}
}
scaleDecoder->skip(numValues);
return numValues;
}
void Decimal64ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
Decimal64VectorBatch& batch = dynamic_cast<Decimal64VectorBatch&>(rowBatch);
int64_t* values = batch.values.data();
// read the next group of scales
int64_t* scaleBuffer = batch.readScales.data();
scaleDecoder->next(scaleBuffer, numValues, notNull);
batch.precision = precision;
batch.scale = scale;
if (notNull) {
for (size_t i = 0; i < numValues; ++i) {
if (notNull[i]) {
readInt64(values[i], static_cast<int32_t>(scaleBuffer[i]));
}
}
} else {
for (size_t i = 0; i < numValues; ++i) {
readInt64(values[i], static_cast<int32_t>(scaleBuffer[i]));
}
}
}
void scaleInt128(Int128& value, uint32_t scale, uint32_t currentScale) {
if (scale > currentScale) {
while (scale > currentScale) {
uint32_t scaleAdjust =
std::min(Decimal64ColumnReader::MAX_PRECISION_64, scale - currentScale);
value *= Decimal64ColumnReader::POWERS_OF_TEN[scaleAdjust];
currentScale += scaleAdjust;
}
} else if (scale < currentScale) {
Int128 remainder;
while (currentScale > scale) {
uint32_t scaleAdjust =
std::min(Decimal64ColumnReader::MAX_PRECISION_64, currentScale - scale);
value = value.divide(Decimal64ColumnReader::POWERS_OF_TEN[scaleAdjust], remainder);
currentScale -= scaleAdjust;
}
}
}
void Decimal64ColumnReader::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
valueStream->seek(positions.at(columnId));
scaleDecoder->seek(positions.at(columnId));
// clear buffer state after seek
buffer = nullptr;
bufferEnd = nullptr;
}
class Decimal128ColumnReader : public Decimal64ColumnReader {
public:
Decimal128ColumnReader(const Type& type, StripeStreams& stipe);
~Decimal128ColumnReader() override;
void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
private:
void readInt128(Int128& value, int32_t currentScale) {
value = 0;
Int128 work;
uint32_t offset = 0;
while (true) {
readBuffer();
unsigned char ch = static_cast<unsigned char>(*(buffer++));
work = ch & 0x7f;
work <<= offset;
value |= work;
offset += 7;
if (!(ch & 0x80)) {
break;
}
}
unZigZagInt128(value);
scaleInt128(value, static_cast<uint32_t>(scale), static_cast<uint32_t>(currentScale));
}
};
Decimal128ColumnReader::Decimal128ColumnReader(const Type& type, StripeStreams& stripe)
: Decimal64ColumnReader(type, stripe) {
// PASS
}
Decimal128ColumnReader::~Decimal128ColumnReader() {
// PASS
}
void Decimal128ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
Decimal128VectorBatch& batch = dynamic_cast<Decimal128VectorBatch&>(rowBatch);
Int128* values = batch.values.data();
// read the next group of scales
int64_t* scaleBuffer = batch.readScales.data();
scaleDecoder->next(scaleBuffer, numValues, notNull);
batch.precision = precision;
batch.scale = scale;
if (notNull) {
for (size_t i = 0; i < numValues; ++i) {
if (notNull[i]) {
readInt128(values[i], static_cast<int32_t>(scaleBuffer[i]));
}
}
} else {
for (size_t i = 0; i < numValues; ++i) {
readInt128(values[i], static_cast<int32_t>(scaleBuffer[i]));
}
}
}
class Decimal64ColumnReaderV2 : public ColumnReader {
protected:
std::unique_ptr<RleDecoder> valueDecoder;
int32_t precision;
int32_t scale;
public:
Decimal64ColumnReaderV2(const Type& type, StripeStreams& stripe);
~Decimal64ColumnReaderV2() override;
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
};
Decimal64ColumnReaderV2::Decimal64ColumnReaderV2(const Type& type, StripeStreams& stripe)
: ColumnReader(type, stripe) {
scale = static_cast<int32_t>(type.getScale());
precision = static_cast<int32_t>(type.getPrecision());
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (stream == nullptr) {
std::stringstream ss;
ss << "DATA stream not found in Decimal64V2 column. ColumnId=" << columnId;
throw ParseError(ss.str());
}
valueDecoder = createRleDecoder(std::move(stream), true, RleVersion_2, memoryPool, metrics);
}
Decimal64ColumnReaderV2::~Decimal64ColumnReaderV2() {
// PASS
}
uint64_t Decimal64ColumnReaderV2::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
valueDecoder->skip(numValues);
return numValues;
}
void Decimal64ColumnReaderV2::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
Decimal64VectorBatch& batch = dynamic_cast<Decimal64VectorBatch&>(rowBatch);
valueDecoder->next(batch.values.data(), numValues, notNull);
batch.precision = precision;
batch.scale = scale;
}
class DecimalHive11ColumnReader : public Decimal64ColumnReader {
private:
bool throwOnOverflow_;
std::ostream* errorStream_;
/**
* Read an Int128 from the stream and correct it to the desired scale.
*/
bool readInt128(Int128& value, int32_t currentScale) {
// -/+ 99999999999999999999999999999999999999
static const Int128 MIN_VALUE(-0x4b3b4ca85a86c47b, 0xf675ddc000000001);
static const Int128 MAX_VALUE(0x4b3b4ca85a86c47a, 0x098a223fffffffff);
value = 0;
Int128 work;
uint32_t offset = 0;
bool result = true;
while (true) {
readBuffer();
unsigned char ch = static_cast<unsigned char>(*(buffer++));
work = ch & 0x7f;
// If we have read more than 128 bits, we flag the error, but keep
// reading bytes so the stream isn't thrown off.
if (offset > 128 || (offset == 126 && work > 3)) {
result = false;
}
work <<= offset;
value |= work;
offset += 7;
if (!(ch & 0x80)) {
break;
}
}
if (!result) {
return result;
}
unZigZagInt128(value);
scaleInt128(value, static_cast<uint32_t>(scale), static_cast<uint32_t>(currentScale));
return value >= MIN_VALUE && value <= MAX_VALUE;
}
public:
DecimalHive11ColumnReader(const Type& type, StripeStreams& stipe);
~DecimalHive11ColumnReader() override;
void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
};
DecimalHive11ColumnReader::DecimalHive11ColumnReader(const Type& type, StripeStreams& stripe)
: Decimal64ColumnReader(type, stripe) {
scale = stripe.getForcedScaleOnHive11Decimal();
throwOnOverflow_ = stripe.getThrowOnHive11DecimalOverflow();
errorStream_ = stripe.getErrorStream();
}
DecimalHive11ColumnReader::~DecimalHive11ColumnReader() {
// PASS
}
void DecimalHive11ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
Decimal128VectorBatch& batch = dynamic_cast<Decimal128VectorBatch&>(rowBatch);
Int128* values = batch.values.data();
// read the next group of scales
int64_t* scaleBuffer = batch.readScales.data();
scaleDecoder->next(scaleBuffer, numValues, notNull);
batch.precision = precision;
batch.scale = scale;
if (notNull) {
for (size_t i = 0; i < numValues; ++i) {
if (notNull[i]) {
if (!readInt128(values[i], static_cast<int32_t>(scaleBuffer[i]))) {
if (throwOnOverflow_) {
throw ParseError("Hive 0.11 decimal was more than 38 digits.");
} else {
*errorStream_ << "Warning: "
<< "Hive 0.11 decimal with more than 38 digits "
<< "replaced by NULL.\n";
notNull[i] = false;
}
}
}
}
} else {
for (size_t i = 0; i < numValues; ++i) {
if (!readInt128(values[i], static_cast<int32_t>(scaleBuffer[i]))) {
if (throwOnOverflow_) {
throw ParseError("Hive 0.11 decimal was more than 38 digits.");
} else {
*errorStream_ << "Warning: "
<< "Hive 0.11 decimal with more than 38 digits "
<< "replaced by NULL.\n";
batch.hasNulls = true;
batch.notNull[i] = false;
}
}
}
}
}
static bool isLittleEndian() {
static union {
uint32_t i;
char c[4];
} num = {0x01020304};
return num.c[0] == 4;
}
/**
* Create a reader for the given stripe.
*/
std::unique_ptr<ColumnReader> buildReader(const Type& type, StripeStreams& stripe,
bool useTightNumericVector,
bool throwOnSchemaEvolutionOverflow,
bool convertToReadType) {
if (convertToReadType && stripe.getSchemaEvolution() &&
stripe.getSchemaEvolution()->needConvert(type)) {
return buildConvertReader(type, stripe, useTightNumericVector,
throwOnSchemaEvolutionOverflow);
}
switch (static_cast<int64_t>(type.getKind())) {
case SHORT:
if (useTightNumericVector) {
return std::make_unique<IntegerColumnReader<ShortVectorBatch>>(type, stripe);
}
return std::make_unique<IntegerColumnReader<LongVectorBatch>>(type, stripe);
case INT:
if (useTightNumericVector) {
return std::make_unique<IntegerColumnReader<IntVectorBatch>>(type, stripe);
}
return std::make_unique<IntegerColumnReader<LongVectorBatch>>(type, stripe);
case LONG:
case DATE:
return std::make_unique<IntegerColumnReader<LongVectorBatch>>(type, stripe);
case BINARY:
case CHAR:
case STRING:
case VARCHAR:
switch (static_cast<int64_t>(stripe.getEncoding(type.getColumnId()).kind())) {
case proto::ColumnEncoding_Kind_DICTIONARY:
case proto::ColumnEncoding_Kind_DICTIONARY_V2:
return std::make_unique<StringDictionaryColumnReader>(type, stripe);
case proto::ColumnEncoding_Kind_DIRECT:
case proto::ColumnEncoding_Kind_DIRECT_V2:
return std::make_unique<StringDirectColumnReader>(type, stripe);
default:
throw NotImplementedYet("buildReader unhandled string encoding");
}
case BOOLEAN: {
if (useTightNumericVector) {
return std::make_unique<BooleanColumnReader<ByteVectorBatch>>(type, stripe);
} else {
return std::make_unique<BooleanColumnReader<LongVectorBatch>>(type, stripe);
}
}
case BYTE:
if (useTightNumericVector) {
return std::make_unique<ByteColumnReader<ByteVectorBatch>>(type, stripe);
}
return std::make_unique<ByteColumnReader<LongVectorBatch>>(type, stripe);
case LIST:
return std::make_unique<ListColumnReader>(type, stripe, useTightNumericVector,
throwOnSchemaEvolutionOverflow);
case MAP:
return std::make_unique<MapColumnReader>(type, stripe, useTightNumericVector,
throwOnSchemaEvolutionOverflow);
case UNION:
return std::make_unique<UnionColumnReader>(type, stripe, useTightNumericVector,
throwOnSchemaEvolutionOverflow);
case STRUCT:
return std::make_unique<StructColumnReader>(type, stripe, useTightNumericVector,
throwOnSchemaEvolutionOverflow);
case FLOAT: {
if (useTightNumericVector) {
if (isLittleEndian()) {
return std::make_unique<DoubleColumnReader<FLOAT, true, float, FloatVectorBatch>>(
type, stripe);
}
return std::make_unique<DoubleColumnReader<FLOAT, false, float, FloatVectorBatch>>(
type, stripe);
}
if (isLittleEndian()) {
return std::make_unique<DoubleColumnReader<FLOAT, true, double, DoubleVectorBatch>>(
type, stripe);
}
return std::make_unique<DoubleColumnReader<FLOAT, false, double, DoubleVectorBatch>>(
type, stripe);
}
case DOUBLE: {
if (isLittleEndian()) {
return std::make_unique<DoubleColumnReader<DOUBLE, true, double, DoubleVectorBatch>>(
type, stripe);
}
return std::make_unique<DoubleColumnReader<DOUBLE, false, double, DoubleVectorBatch>>(
type, stripe);
}
case TIMESTAMP:
return std::make_unique<TimestampColumnReader>(type, stripe, false);
case TIMESTAMP_INSTANT:
return std::make_unique<TimestampColumnReader>(type, stripe, true);
case DECIMAL:
// is this a Hive 0.11 or 0.12 file?
if (type.getPrecision() == 0) {
return std::make_unique<DecimalHive11ColumnReader>(type, stripe);
}
// can we represent the values using int64_t?
if (type.getPrecision() <= Decimal64ColumnReader::MAX_PRECISION_64) {
if (stripe.isDecimalAsLong()) {
return std::make_unique<Decimal64ColumnReaderV2>(type, stripe);
}
return std::make_unique<Decimal64ColumnReader>(type, stripe);
}
// otherwise we use the Int128 implementation
return std::make_unique<Decimal128ColumnReader>(type, stripe);
default:
throw NotImplementedYet("buildReader unhandled type");
}
}
} // namespace orc