blob: 016aed87421a7dc4b55767f81a5151b791c3ec80 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "orc/Int128.hh"
#include "Adaptor.hh"
#include "ByteRLE.hh"
#include "ColumnReader.hh"
#include "orc/Exceptions.hh"
#include "RLE.hh"
#include <math.h>
#include <iostream>
namespace orc {
StripeStreams::~StripeStreams() {
// PASS
}
inline RleVersion convertRleVersion(proto::ColumnEncoding_Kind kind) {
switch (static_cast<int64_t>(kind)) {
case proto::ColumnEncoding_Kind_DIRECT:
case proto::ColumnEncoding_Kind_DICTIONARY:
return RleVersion_1;
case proto::ColumnEncoding_Kind_DIRECT_V2:
case proto::ColumnEncoding_Kind_DICTIONARY_V2:
return RleVersion_2;
default:
throw ParseError("Unknown encoding in convertRleVersion");
}
}
ColumnReader::ColumnReader(const Type& type,
StripeStreams& stripe
): columnId(type.getColumnId()),
memoryPool(stripe.getMemoryPool()) {
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_PRESENT, true);
if (stream.get()) {
notNullDecoder = createBooleanRleDecoder(std::move(stream));
}
}
ColumnReader::~ColumnReader() {
// PASS
}
uint64_t ColumnReader::skip(uint64_t numValues) {
ByteRleDecoder* decoder = notNullDecoder.get();
if (decoder) {
// page through the values that we want to skip
// and count how many are non-null
const size_t MAX_BUFFER_SIZE = 32768;
size_t bufferSize = std::min(MAX_BUFFER_SIZE,
static_cast<size_t>(numValues));
char buffer[MAX_BUFFER_SIZE];
uint64_t remaining = numValues;
while (remaining > 0) {
uint64_t chunkSize =
std::min(remaining,
static_cast<uint64_t>(bufferSize));
decoder->next(buffer, chunkSize, nullptr);
remaining -= chunkSize;
for(uint64_t i=0; i < chunkSize; ++i) {
if (!buffer[i]) {
numValues -= 1;
}
}
}
}
return numValues;
}
void ColumnReader::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char* incomingMask) {
if (numValues > rowBatch.capacity) {
rowBatch.resize(numValues);
}
rowBatch.numElements = numValues;
ByteRleDecoder* decoder = notNullDecoder.get();
if (decoder) {
char* notNullArray = rowBatch.notNull.data();
decoder->next(notNullArray, numValues, incomingMask);
// check to see if there are nulls in this batch
for(uint64_t i=0; i < numValues; ++i) {
if (!notNullArray[i]) {
rowBatch.hasNulls = true;
return;
}
}
} else if (incomingMask) {
// If we don't have a notNull stream, copy the incomingMask
rowBatch.hasNulls = true;
memcpy(rowBatch.notNull.data(), incomingMask, numValues);
return;
}
rowBatch.hasNulls = false;
}
void ColumnReader::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
if (notNullDecoder.get()) {
notNullDecoder->seek(positions.at(columnId));
}
}
/**
* Expand an array of bytes in place to the corresponding array of longs.
* Has to work backwards so that they data isn't clobbered during the
* expansion.
* @param buffer the array of chars and array of longs that need to be
* expanded
* @param numValues the number of bytes to convert to longs
*/
void expandBytesToLongs(int64_t* buffer, uint64_t numValues) {
for(size_t i=numValues - 1; i < numValues; --i) {
buffer[i] = reinterpret_cast<char *>(buffer)[i];
}
}
class BooleanColumnReader: public ColumnReader {
private:
std::unique_ptr<orc::ByteRleDecoder> rle;
public:
BooleanColumnReader(const Type& type, StripeStreams& stipe);
~BooleanColumnReader() override;
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char* notNull) override;
void seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) override;
};
BooleanColumnReader::BooleanColumnReader(const Type& type,
StripeStreams& stripe
): ColumnReader(type, stripe){
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (stream == nullptr)
throw ParseError("DATA stream not found in Boolean column");
rle = createBooleanRleDecoder(std::move(stream));
}
BooleanColumnReader::~BooleanColumnReader() {
// PASS
}
uint64_t BooleanColumnReader::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
rle->skip(numValues);
return numValues;
}
void BooleanColumnReader::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
// Since the byte rle places the output in a char* instead of long*,
// we cheat here and use the long* and then expand it in a second pass.
int64_t *ptr = dynamic_cast<LongVectorBatch&>(rowBatch).data.data();
rle->next(reinterpret_cast<char*>(ptr),
numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
expandBytesToLongs(ptr, numValues);
}
void BooleanColumnReader::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
rle->seek(positions.at(columnId));
}
class ByteColumnReader: public ColumnReader {
private:
std::unique_ptr<orc::ByteRleDecoder> rle;
public:
ByteColumnReader(const Type& type, StripeStreams& stipe);
~ByteColumnReader() override;
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char* notNull) override;
void seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) override;
};
ByteColumnReader::ByteColumnReader(const Type& type,
StripeStreams& stripe
): ColumnReader(type, stripe){
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (stream == nullptr)
throw ParseError("DATA stream not found in Byte column");
rle = createByteRleDecoder(std::move(stream));
}
ByteColumnReader::~ByteColumnReader() {
// PASS
}
uint64_t ByteColumnReader::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
rle->skip(numValues);
return numValues;
}
void ByteColumnReader::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
// Since the byte rle places the output in a char* instead of long*,
// we cheat here and use the long* and then expand it in a second pass.
int64_t *ptr = dynamic_cast<LongVectorBatch&>(rowBatch).data.data();
rle->next(reinterpret_cast<char*>(ptr),
numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
expandBytesToLongs(ptr, numValues);
}
void ByteColumnReader::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
rle->seek(positions.at(columnId));
}
class IntegerColumnReader: public ColumnReader {
protected:
std::unique_ptr<orc::RleDecoder> rle;
public:
IntegerColumnReader(const Type& type, StripeStreams& stripe);
~IntegerColumnReader() override;
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char* notNull) override;
void seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) override;
};
IntegerColumnReader::IntegerColumnReader(const Type& type,
StripeStreams& stripe
): ColumnReader(type, stripe) {
RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (stream == nullptr)
throw ParseError("DATA stream not found in Integer column");
rle = createRleDecoder(std::move(stream), true, vers, memoryPool);
}
IntegerColumnReader::~IntegerColumnReader() {
// PASS
}
uint64_t IntegerColumnReader::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
rle->skip(numValues);
return numValues;
}
void IntegerColumnReader::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
rle->next(dynamic_cast<LongVectorBatch&>(rowBatch).data.data(),
numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
}
void IntegerColumnReader::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
rle->seek(positions.at(columnId));
}
class TimestampColumnReader: public ColumnReader {
private:
std::unique_ptr<orc::RleDecoder> secondsRle;
std::unique_ptr<orc::RleDecoder> nanoRle;
const Timezone& writerTimezone;
const int64_t epochOffset;
public:
TimestampColumnReader(const Type& type, StripeStreams& stripe);
~TimestampColumnReader() override;
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char* notNull) override;
void seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) override;
};
TimestampColumnReader::TimestampColumnReader(const Type& type,
StripeStreams& stripe
): ColumnReader(type, stripe),
writerTimezone(stripe.getWriterTimezone()),
epochOffset(writerTimezone.getEpoch()) {
RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (stream == nullptr)
throw ParseError("DATA stream not found in Timestamp column");
secondsRle = createRleDecoder(std::move(stream), true, vers, memoryPool);
stream = stripe.getStream(columnId, proto::Stream_Kind_SECONDARY, true);
if (stream == nullptr)
throw ParseError("SECONDARY stream not found in Timestamp column");
nanoRle = createRleDecoder(std::move(stream), false, vers, memoryPool);
}
TimestampColumnReader::~TimestampColumnReader() {
// PASS
}
uint64_t TimestampColumnReader::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
secondsRle->skip(numValues);
nanoRle->skip(numValues);
return numValues;
}
void TimestampColumnReader::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
TimestampVectorBatch& timestampBatch =
dynamic_cast<TimestampVectorBatch&>(rowBatch);
int64_t *secsBuffer = timestampBatch.data.data();
secondsRle->next(secsBuffer, numValues, notNull);
int64_t *nanoBuffer = timestampBatch.nanoseconds.data();
nanoRle->next(nanoBuffer, numValues, notNull);
// Construct the values
for(uint64_t i=0; i < numValues; i++) {
if (notNull == nullptr || notNull[i]) {
uint64_t zeros = nanoBuffer[i] & 0x7;
nanoBuffer[i] >>= 3;
if (zeros != 0) {
for(uint64_t j = 0; j <= zeros; ++j) {
nanoBuffer[i] *= 10;
}
}
int64_t writerTime = secsBuffer[i] + epochOffset;
secsBuffer[i] = writerTimezone.convertToUTC(writerTime);
if (secsBuffer[i] < 0 && nanoBuffer[i] != 0) {
secsBuffer[i] -= 1;
}
}
}
}
void TimestampColumnReader::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
secondsRle->seek(positions.at(columnId));
nanoRle->seek(positions.at(columnId));
}
class DoubleColumnReader: public ColumnReader {
public:
DoubleColumnReader(const Type& type, StripeStreams& stripe);
~DoubleColumnReader() override;
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char* notNull) override;
void seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) override;
private:
std::unique_ptr<SeekableInputStream> inputStream;
TypeKind columnKind;
const uint64_t bytesPerValue ;
const char *bufferPointer;
const char *bufferEnd;
unsigned char readByte() {
if (bufferPointer == bufferEnd) {
int length;
if (!inputStream->Next
(reinterpret_cast<const void**>(&bufferPointer), &length)) {
throw ParseError("bad read in DoubleColumnReader::next()");
}
bufferEnd = bufferPointer + length;
}
return static_cast<unsigned char>(*(bufferPointer++));
}
double readDouble() {
int64_t bits = 0;
for (uint64_t i=0; i < 8; i++) {
bits |= static_cast<int64_t>(readByte()) << (i*8);
}
double *result = reinterpret_cast<double*>(&bits);
return *result;
}
double readFloat() {
int32_t bits = 0;
for (uint64_t i=0; i < 4; i++) {
bits |= readByte() << (i*8);
}
float *result = reinterpret_cast<float*>(&bits);
return static_cast<double>(*result);
}
};
DoubleColumnReader::DoubleColumnReader(const Type& type,
StripeStreams& stripe
): ColumnReader(type, stripe),
columnKind(type.getKind()),
bytesPerValue((type.getKind() ==
FLOAT) ? 4 : 8),
bufferPointer(nullptr),
bufferEnd(nullptr) {
inputStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (inputStream == nullptr)
throw ParseError("DATA stream not found in Double column");
}
DoubleColumnReader::~DoubleColumnReader() {
// PASS
}
uint64_t DoubleColumnReader::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
if (static_cast<size_t>(bufferEnd - bufferPointer) >=
bytesPerValue * numValues) {
bufferPointer += bytesPerValue * numValues;
} else {
size_t sizeToSkip = bytesPerValue * numValues -
static_cast<size_t>(bufferEnd - bufferPointer);
const size_t cap = static_cast<size_t>(std::numeric_limits<int>::max());
while (sizeToSkip != 0) {
size_t step = sizeToSkip > cap ? cap : sizeToSkip;
inputStream->Skip(static_cast<int>(step));
sizeToSkip -= step;
}
bufferEnd = nullptr;
bufferPointer = nullptr;
}
return numValues;
}
void DoubleColumnReader::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
// update the notNull from the parent class
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
double* outArray = dynamic_cast<DoubleVectorBatch&>(rowBatch).data.data();
if (columnKind == FLOAT) {
if (notNull) {
for(size_t i=0; i < numValues; ++i) {
if (notNull[i]) {
outArray[i] = readFloat();
}
}
} else {
for(size_t i=0; i < numValues; ++i) {
outArray[i] = readFloat();
}
}
} else {
if (notNull) {
for(size_t i=0; i < numValues; ++i) {
if (notNull[i]) {
outArray[i] = readDouble();
}
}
} else {
for(size_t i=0; i < numValues; ++i) {
outArray[i] = readDouble();
}
}
}
}
void readFully(char* buffer, int64_t bufferSize, SeekableInputStream* stream) {
int64_t posn = 0;
while (posn < bufferSize) {
const void* chunk;
int length;
if (!stream->Next(&chunk, &length)) {
throw ParseError("bad read in readFully");
}
if (posn + length > bufferSize) {
throw ParseError("Corrupt dictionary blob in StringDictionaryColumn");
}
memcpy(buffer + posn, chunk, static_cast<size_t>(length));
posn += length;
}
}
void DoubleColumnReader::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
inputStream->seek(positions.at(columnId));
}
class StringDictionaryColumnReader: public ColumnReader {
private:
std::shared_ptr<StringDictionary> dictionary;
std::unique_ptr<RleDecoder> rle;
public:
StringDictionaryColumnReader(const Type& type, StripeStreams& stipe);
~StringDictionaryColumnReader() override;
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) override;
void nextEncoded(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char* notNull) override;
void seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) override;
};
StringDictionaryColumnReader::StringDictionaryColumnReader
(const Type& type,
StripeStreams& stripe
): ColumnReader(type, stripe),
dictionary(new StringDictionary(stripe.getMemoryPool())) {
RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId)
.kind());
uint32_t dictSize = stripe.getEncoding(columnId).dictionarysize();
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (stream == nullptr) {
throw ParseError("DATA stream not found in StringDictionaryColumn");
}
rle = createRleDecoder(std::move(stream), false, rleVersion, memoryPool);
stream = stripe.getStream(columnId, proto::Stream_Kind_LENGTH, false);
if (dictSize > 0 && stream == nullptr) {
throw ParseError("LENGTH stream not found in StringDictionaryColumn");
}
std::unique_ptr<RleDecoder> lengthDecoder =
createRleDecoder(std::move(stream), false, rleVersion, memoryPool);
dictionary->dictionaryOffset.resize(dictSize + 1);
int64_t* lengthArray = dictionary->dictionaryOffset.data();
lengthDecoder->next(lengthArray + 1, dictSize, nullptr);
lengthArray[0] = 0;
for(uint32_t i = 1; i < dictSize + 1; ++i) {
if (lengthArray[i] < 0) {
throw ParseError("Negative dictionary entry length");
}
lengthArray[i] += lengthArray[i - 1];
}
int64_t blobSize = lengthArray[dictSize];
dictionary->dictionaryBlob.resize(static_cast<uint64_t>(blobSize));
std::unique_ptr<SeekableInputStream> blobStream =
stripe.getStream(columnId, proto::Stream_Kind_DICTIONARY_DATA, false);
if (blobSize > 0 && blobStream == nullptr) {
throw ParseError(
"DICTIONARY_DATA stream not found in StringDictionaryColumn");
}
readFully(dictionary->dictionaryBlob.data(), blobSize, blobStream.get());
}
StringDictionaryColumnReader::~StringDictionaryColumnReader() {
// PASS
}
uint64_t StringDictionaryColumnReader::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
rle->skip(numValues);
return numValues;
}
void StringDictionaryColumnReader::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
// update the notNull from the parent class
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
char *blob = dictionary->dictionaryBlob.data();
int64_t *dictionaryOffsets = dictionary->dictionaryOffset.data();
char **outputStarts = byteBatch.data.data();
int64_t *outputLengths = byteBatch.length.data();
rle->next(outputLengths, numValues, notNull);
uint64_t dictionaryCount = dictionary->dictionaryOffset.size() - 1;
if (notNull) {
for(uint64_t i=0; i < numValues; ++i) {
if (notNull[i]) {
int64_t entry = outputLengths[i];
if (entry < 0 || static_cast<uint64_t>(entry) >= dictionaryCount ) {
throw ParseError("Entry index out of range in StringDictionaryColumn");
}
outputStarts[i] = blob + dictionaryOffsets[entry];
outputLengths[i] = dictionaryOffsets[entry+1] -
dictionaryOffsets[entry];
}
}
} else {
for(uint64_t i=0; i < numValues; ++i) {
int64_t entry = outputLengths[i];
if (entry < 0 || static_cast<uint64_t>(entry) >= dictionaryCount) {
throw ParseError("Entry index out of range in StringDictionaryColumn");
}
outputStarts[i] = blob + dictionaryOffsets[entry];
outputLengths[i] = dictionaryOffsets[entry+1] -
dictionaryOffsets[entry];
}
}
}
void StringDictionaryColumnReader::nextEncoded(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char* notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
rowBatch.isEncoded = true;
EncodedStringVectorBatch& batch = dynamic_cast<EncodedStringVectorBatch&>(rowBatch);
batch.dictionary = this->dictionary;
// Length buffer is reused to save dictionary entry ids
rle->next(batch.index.data(), numValues, notNull);
}
void StringDictionaryColumnReader::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
rle->seek(positions.at(columnId));
}
class StringDirectColumnReader: public ColumnReader {
private:
std::unique_ptr<RleDecoder> lengthRle;
std::unique_ptr<SeekableInputStream> blobStream;
const char *lastBuffer;
size_t lastBufferLength;
/**
* Compute the total length of the values.
* @param lengths the array of lengths
* @param notNull the array of notNull flags
* @param numValues the lengths of the arrays
* @return the total number of bytes for the non-null values
*/
size_t computeSize(const int64_t *lengths, const char *notNull,
uint64_t numValues);
public:
StringDirectColumnReader(const Type& type, StripeStreams& stipe);
~StringDirectColumnReader() override;
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) override;
void seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) override;
};
StringDirectColumnReader::StringDirectColumnReader
(const Type& type,
StripeStreams& stripe
): ColumnReader(type, stripe) {
RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId)
.kind());
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true);
if (stream == nullptr)
throw ParseError("LENGTH stream not found in StringDirectColumn");
lengthRle = createRleDecoder(
std::move(stream), false, rleVersion, memoryPool);
blobStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (blobStream == nullptr)
throw ParseError("DATA stream not found in StringDirectColumn");
lastBuffer = nullptr;
lastBufferLength = 0;
}
StringDirectColumnReader::~StringDirectColumnReader() {
// PASS
}
uint64_t StringDirectColumnReader::skip(uint64_t numValues) {
const size_t BUFFER_SIZE = 1024;
numValues = ColumnReader::skip(numValues);
int64_t buffer[BUFFER_SIZE];
uint64_t done = 0;
size_t totalBytes = 0;
// read the lengths, so we know haw many bytes to skip
while (done < numValues) {
uint64_t step = std::min(BUFFER_SIZE,
static_cast<size_t>(numValues - done));
lengthRle->next(buffer, step, nullptr);
totalBytes += computeSize(buffer, nullptr, step);
done += step;
}
if (totalBytes <= lastBufferLength) {
// subtract the needed bytes from the ones left over
lastBufferLength -= totalBytes;
lastBuffer += totalBytes;
} else {
// move the stream forward after accounting for the buffered bytes
totalBytes -= lastBufferLength;
const size_t cap = static_cast<size_t>(std::numeric_limits<int>::max());
while (totalBytes != 0) {
size_t step = totalBytes > cap ? cap : totalBytes;
blobStream->Skip(static_cast<int>(step));
totalBytes -= step;
}
lastBufferLength = 0;
lastBuffer = nullptr;
}
return numValues;
}
size_t StringDirectColumnReader::computeSize(const int64_t* lengths,
const char* notNull,
uint64_t numValues) {
size_t totalLength = 0;
if (notNull) {
for(size_t i=0; i < numValues; ++i) {
if (notNull[i]) {
totalLength += static_cast<size_t>(lengths[i]);
}
}
} else {
for(size_t i=0; i < numValues; ++i) {
totalLength += static_cast<size_t>(lengths[i]);
}
}
return totalLength;
}
void StringDirectColumnReader::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
// update the notNull from the parent class
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
char **startPtr = byteBatch.data.data();
int64_t *lengthPtr = byteBatch.length.data();
// read the length vector
lengthRle->next(lengthPtr, numValues, notNull);
// figure out the total length of data we need from the blob stream
const size_t totalLength = computeSize(lengthPtr, notNull, numValues);
// Load data from the blob stream into our buffer until we have enough
// to get the rest directly out of the stream's buffer.
size_t bytesBuffered = 0;
byteBatch.blob.resize(totalLength);
char *ptr= byteBatch.blob.data();
while (bytesBuffered + lastBufferLength < totalLength) {
memcpy(ptr + bytesBuffered, lastBuffer, lastBufferLength);
bytesBuffered += lastBufferLength;
const void* readBuffer;
int readLength;
if (!blobStream->Next(&readBuffer, &readLength)) {
throw ParseError("failed to read in StringDirectColumnReader.next");
}
lastBuffer = static_cast<const char*>(readBuffer);
lastBufferLength = static_cast<size_t>(readLength);
}
if (bytesBuffered < totalLength) {
size_t moreBytes = totalLength - bytesBuffered;
memcpy(ptr + bytesBuffered, lastBuffer, moreBytes);
lastBuffer += moreBytes;
lastBufferLength -= moreBytes;
}
size_t filledSlots = 0;
ptr = byteBatch.blob.data();
if (notNull) {
while (filledSlots < numValues) {
if (notNull[filledSlots]) {
startPtr[filledSlots] = const_cast<char*>(ptr);
ptr += lengthPtr[filledSlots];
}
filledSlots += 1;
}
} else {
while (filledSlots < numValues) {
startPtr[filledSlots] = const_cast<char*>(ptr);
ptr += lengthPtr[filledSlots];
filledSlots += 1;
}
}
}
void StringDirectColumnReader::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
blobStream->seek(positions.at(columnId));
lengthRle->seek(positions.at(columnId));
}
class StructColumnReader: public ColumnReader {
private:
std::vector<std::unique_ptr<ColumnReader>> children;
public:
StructColumnReader(const Type& type, StripeStreams& stipe);
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) override;
void nextEncoded(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) override;
void seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) override;
private:
template<bool encoded>
void nextInternal(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull);
};
StructColumnReader::StructColumnReader(const Type& type,
StripeStreams& stripe
): ColumnReader(type, stripe) {
// count the number of selected sub-columns
const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
switch (static_cast<int64_t>(stripe.getEncoding(columnId).kind())) {
case proto::ColumnEncoding_Kind_DIRECT:
for(unsigned int i=0; i < type.getSubtypeCount(); ++i) {
const Type& child = *type.getSubtype(i);
if (selectedColumns[static_cast<uint64_t>(child.getColumnId())]) {
children.push_back(buildReader(child, stripe));
}
}
break;
case proto::ColumnEncoding_Kind_DIRECT_V2:
case proto::ColumnEncoding_Kind_DICTIONARY:
case proto::ColumnEncoding_Kind_DICTIONARY_V2:
default:
throw ParseError("Unknown encoding for StructColumnReader");
}
}
uint64_t StructColumnReader::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
for(auto& ptr : children) {
ptr->skip(numValues);
}
return numValues;
}
void StructColumnReader::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
nextInternal<false>(rowBatch, numValues, notNull);
}
void StructColumnReader::nextEncoded(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
nextInternal<true>(rowBatch, numValues, notNull);
}
template<bool encoded>
void StructColumnReader::nextInternal(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
uint64_t i=0;
notNull = rowBatch.hasNulls? rowBatch.notNull.data() : nullptr;
for(auto iter = children.begin(); iter != children.end(); ++iter, ++i) {
if (encoded) {
(*iter)->nextEncoded(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]),
numValues, notNull);
} else {
(*iter)->next(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]),
numValues, notNull);
}
}
}
void StructColumnReader::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
for(auto& ptr : children) {
ptr->seekToRowGroup(positions);
}
}
class ListColumnReader: public ColumnReader {
private:
std::unique_ptr<ColumnReader> child;
std::unique_ptr<RleDecoder> rle;
public:
ListColumnReader(const Type& type, StripeStreams& stipe);
~ListColumnReader() override;
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) override;
void nextEncoded(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) override;
void seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) override;
private:
template<bool encoded>
void nextInternal(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull);
};
ListColumnReader::ListColumnReader(const Type& type,
StripeStreams& stripe
): ColumnReader(type, stripe) {
// count the number of selected sub-columns
const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true);
if (stream == nullptr)
throw ParseError("LENGTH stream not found in List column");
rle = createRleDecoder(std::move(stream), false, vers, memoryPool);
const Type& childType = *type.getSubtype(0);
if (selectedColumns[static_cast<uint64_t>(childType.getColumnId())]) {
child = buildReader(childType, stripe);
}
}
ListColumnReader::~ListColumnReader() {
// PASS
}
uint64_t ListColumnReader::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
ColumnReader *childReader = child.get();
if (childReader) {
const uint64_t BUFFER_SIZE = 1024;
int64_t buffer[BUFFER_SIZE];
uint64_t childrenElements = 0;
uint64_t lengthsRead = 0;
while (lengthsRead < numValues) {
uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE);
rle->next(buffer, chunk, nullptr);
for(size_t i=0; i < chunk; ++i) {
childrenElements += static_cast<size_t>(buffer[i]);
}
lengthsRead += chunk;
}
childReader->skip(childrenElements);
} else {
rle->skip(numValues);
}
return numValues;
}
void ListColumnReader::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
nextInternal<false>(rowBatch, numValues, notNull);
}
void ListColumnReader::nextEncoded(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
nextInternal<true>(rowBatch, numValues, notNull);
}
template<bool encoded>
void ListColumnReader::nextInternal(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
ListVectorBatch &listBatch = dynamic_cast<ListVectorBatch&>(rowBatch);
int64_t* offsets = listBatch.offsets.data();
notNull = listBatch.hasNulls ? listBatch.notNull.data() : nullptr;
rle->next(offsets, numValues, notNull);
uint64_t totalChildren = 0;
if (notNull) {
for(size_t i=0; i < numValues; ++i) {
if (notNull[i]) {
uint64_t tmp = static_cast<uint64_t>(offsets[i]);
offsets[i] = static_cast<int64_t>(totalChildren);
totalChildren += tmp;
} else {
offsets[i] = static_cast<int64_t>(totalChildren);
}
}
} else {
for(size_t i=0; i < numValues; ++i) {
uint64_t tmp = static_cast<uint64_t>(offsets[i]);
offsets[i] = static_cast<int64_t>(totalChildren);
totalChildren += tmp;
}
}
offsets[numValues] = static_cast<int64_t>(totalChildren);
ColumnReader *childReader = child.get();
if (childReader) {
if (encoded) {
childReader->nextEncoded(*(listBatch.elements.get()), totalChildren, nullptr);
} else {
childReader->next(*(listBatch.elements.get()), totalChildren, nullptr);
}
}
}
void ListColumnReader::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
rle->seek(positions.at(columnId));
if (child.get()) {
child->seekToRowGroup(positions);
}
}
class MapColumnReader: public ColumnReader {
private:
std::unique_ptr<ColumnReader> keyReader;
std::unique_ptr<ColumnReader> elementReader;
std::unique_ptr<RleDecoder> rle;
public:
MapColumnReader(const Type& type, StripeStreams& stipe);
~MapColumnReader() override;
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) override;
void nextEncoded(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) override;
void seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) override;
private:
template<bool encoded>
void nextInternal(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull);
};
MapColumnReader::MapColumnReader(const Type& type,
StripeStreams& stripe
): ColumnReader(type, stripe) {
// Determine if the key and/or value columns are selected
const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true);
if (stream == nullptr)
throw ParseError("LENGTH stream not found in Map column");
rle = createRleDecoder(std::move(stream), false, vers, memoryPool);
const Type& keyType = *type.getSubtype(0);
if (selectedColumns[static_cast<uint64_t>(keyType.getColumnId())]) {
keyReader = buildReader(keyType, stripe);
}
const Type& elementType = *type.getSubtype(1);
if (selectedColumns[static_cast<uint64_t>(elementType.getColumnId())]) {
elementReader = buildReader(elementType, stripe);
}
}
MapColumnReader::~MapColumnReader() {
// PASS
}
uint64_t MapColumnReader::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
ColumnReader *rawKeyReader = keyReader.get();
ColumnReader *rawElementReader = elementReader.get();
if (rawKeyReader || rawElementReader) {
const uint64_t BUFFER_SIZE = 1024;
int64_t buffer[BUFFER_SIZE];
uint64_t childrenElements = 0;
uint64_t lengthsRead = 0;
while (lengthsRead < numValues) {
uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE);
rle->next(buffer, chunk, nullptr);
for(size_t i=0; i < chunk; ++i) {
childrenElements += static_cast<size_t>(buffer[i]);
}
lengthsRead += chunk;
}
if (rawKeyReader) {
rawKeyReader->skip(childrenElements);
}
if (rawElementReader) {
rawElementReader->skip(childrenElements);
}
} else {
rle->skip(numValues);
}
return numValues;
}
void MapColumnReader::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull)
{
nextInternal<false>(rowBatch, numValues, notNull);
}
void MapColumnReader::nextEncoded(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull)
{
nextInternal<true>(rowBatch, numValues, notNull);
}
template<bool encoded>
void MapColumnReader::nextInternal(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
MapVectorBatch &mapBatch = dynamic_cast<MapVectorBatch&>(rowBatch);
int64_t* offsets = mapBatch.offsets.data();
notNull = mapBatch.hasNulls ? mapBatch.notNull.data() : nullptr;
rle->next(offsets, numValues, notNull);
uint64_t totalChildren = 0;
if (notNull) {
for(size_t i=0; i < numValues; ++i) {
if (notNull[i]) {
uint64_t tmp = static_cast<uint64_t>(offsets[i]);
offsets[i] = static_cast<int64_t>(totalChildren);
totalChildren += tmp;
} else {
offsets[i] = static_cast<int64_t>(totalChildren);
}
}
} else {
for(size_t i=0; i < numValues; ++i) {
uint64_t tmp = static_cast<uint64_t>(offsets[i]);
offsets[i] = static_cast<int64_t>(totalChildren);
totalChildren += tmp;
}
}
offsets[numValues] = static_cast<int64_t>(totalChildren);
ColumnReader *rawKeyReader = keyReader.get();
if (rawKeyReader) {
if (encoded) {
rawKeyReader->nextEncoded(*(mapBatch.keys.get()), totalChildren, nullptr);
} else {
rawKeyReader->next(*(mapBatch.keys.get()), totalChildren, nullptr);
}
}
ColumnReader *rawElementReader = elementReader.get();
if (rawElementReader) {
if (encoded) {
rawElementReader->nextEncoded(*(mapBatch.elements.get()), totalChildren, nullptr);
} else {
rawElementReader->next(*(mapBatch.elements.get()), totalChildren, nullptr);
}
}
}
void MapColumnReader::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
rle->seek(positions.at(columnId));
if (keyReader.get()) {
keyReader->seekToRowGroup(positions);
}
if (elementReader.get()) {
elementReader->seekToRowGroup(positions);
}
}
class UnionColumnReader: public ColumnReader {
private:
std::unique_ptr<ByteRleDecoder> rle;
std::vector<std::unique_ptr<ColumnReader>> childrenReader;
std::vector<int64_t> childrenCounts;
uint64_t numChildren;
public:
UnionColumnReader(const Type& type, StripeStreams& stipe);
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) override;
void nextEncoded(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) override;
void seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) override;
private:
template<bool encoded>
void nextInternal(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull);
};
UnionColumnReader::UnionColumnReader(const Type& type,
StripeStreams& stripe
): ColumnReader(type, stripe) {
numChildren = type.getSubtypeCount();
childrenReader.resize(numChildren);
childrenCounts.resize(numChildren);
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (stream == nullptr)
throw ParseError("LENGTH stream not found in Union column");
rle = createByteRleDecoder(std::move(stream));
// figure out which types are selected
const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
for(unsigned int i=0; i < numChildren; ++i) {
const Type &child = *type.getSubtype(i);
if (selectedColumns[static_cast<size_t>(child.getColumnId())]) {
childrenReader[i] = buildReader(child, stripe);
}
}
}
uint64_t UnionColumnReader::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
const uint64_t BUFFER_SIZE = 1024;
char buffer[BUFFER_SIZE];
uint64_t lengthsRead = 0;
int64_t *counts = childrenCounts.data();
memset(counts, 0, sizeof(int64_t) * numChildren);
while (lengthsRead < numValues) {
uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE);
rle->next(buffer, chunk, nullptr);
for(size_t i=0; i < chunk; ++i) {
counts[static_cast<size_t>(buffer[i])] += 1;
}
lengthsRead += chunk;
}
for(size_t i=0; i < numChildren; ++i) {
if (counts[i] != 0 && childrenReader[i] != nullptr) {
childrenReader[i]->skip(static_cast<uint64_t>(counts[i]));
}
}
return numValues;
}
void UnionColumnReader::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
nextInternal<false>(rowBatch, numValues, notNull);
}
void UnionColumnReader::nextEncoded(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
nextInternal<true>(rowBatch, numValues, notNull);
}
template<bool encoded>
void UnionColumnReader::nextInternal(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
UnionVectorBatch &unionBatch = dynamic_cast<UnionVectorBatch&>(rowBatch);
uint64_t* offsets = unionBatch.offsets.data();
int64_t* counts = childrenCounts.data();
memset(counts, 0, sizeof(int64_t) * numChildren);
unsigned char* tags = unionBatch.tags.data();
notNull = unionBatch.hasNulls ? unionBatch.notNull.data() : nullptr;
rle->next(reinterpret_cast<char *>(tags), numValues, notNull);
// set the offsets for each row
if (notNull) {
for(size_t i=0; i < numValues; ++i) {
if (notNull[i]) {
offsets[i] =
static_cast<uint64_t>(counts[static_cast<size_t>(tags[i])]++);
}
}
} else {
for(size_t i=0; i < numValues; ++i) {
offsets[i] =
static_cast<uint64_t>(counts[static_cast<size_t>(tags[i])]++);
}
}
// read the right number of each child column
for(size_t i=0; i < numChildren; ++i) {
if (childrenReader[i] != nullptr) {
if (encoded) {
childrenReader[i]->nextEncoded(*(unionBatch.children[i]),
static_cast<uint64_t>(counts[i]), nullptr);
} else {
childrenReader[i]->next(*(unionBatch.children[i]),
static_cast<uint64_t>(counts[i]), nullptr);
}
}
}
}
void UnionColumnReader::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
rle->seek(positions.at(columnId));
for(size_t i = 0; i < numChildren; ++i) {
if (childrenReader[i] != nullptr) {
childrenReader[i]->seekToRowGroup(positions);
}
}
}
/**
* Destructively convert the number from zigzag encoding to the
* natural signed representation.
*/
void unZigZagInt128(Int128& value) {
bool needsNegate = value.getLowBits() & 1;
value >>= 1;
if (needsNegate) {
value.negate();
value -= 1;
}
}
class Decimal64ColumnReader: public ColumnReader {
public:
static const uint32_t MAX_PRECISION_64 = 18;
static const uint32_t MAX_PRECISION_128 = 38;
static const int64_t POWERS_OF_TEN[MAX_PRECISION_64 + 1];
protected:
std::unique_ptr<SeekableInputStream> valueStream;
int32_t precision;
int32_t scale;
const char* buffer;
const char* bufferEnd;
std::unique_ptr<RleDecoder> scaleDecoder;
/**
* Read the valueStream for more bytes.
*/
void readBuffer() {
while (buffer == bufferEnd) {
int length;
if (!valueStream->Next(reinterpret_cast<const void**>(&buffer),
&length)) {
throw ParseError("Read past end of stream in Decimal64ColumnReader "+
valueStream->getName());
}
bufferEnd = buffer + length;
}
}
void readInt64(int64_t& value, int32_t currentScale) {
value = 0;
size_t offset = 0;
while (true) {
readBuffer();
unsigned char ch = static_cast<unsigned char>(*(buffer++));
value |= static_cast<uint64_t>(ch & 0x7f) << offset;
offset += 7;
if (!(ch & 0x80)) {
break;
}
}
value = unZigZag(static_cast<uint64_t>(value));
if (scale > currentScale &&
static_cast<uint64_t>(scale - currentScale) <= MAX_PRECISION_64) {
value *= POWERS_OF_TEN[scale - currentScale];
} else if (scale < currentScale &&
static_cast<uint64_t>(currentScale - scale) <= MAX_PRECISION_64) {
value /= POWERS_OF_TEN[currentScale - scale];
} else if (scale != currentScale) {
throw ParseError("Decimal scale out of range");
}
}
public:
Decimal64ColumnReader(const Type& type, StripeStreams& stipe);
~Decimal64ColumnReader() override;
uint64_t skip(uint64_t numValues) override;
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) override;
void seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) override;
};
const uint32_t Decimal64ColumnReader::MAX_PRECISION_64;
const uint32_t Decimal64ColumnReader::MAX_PRECISION_128;
const int64_t Decimal64ColumnReader::POWERS_OF_TEN[MAX_PRECISION_64 + 1]=
{1,
10,
100,
1000,
10000,
100000,
1000000,
10000000,
100000000,
1000000000,
10000000000,
100000000000,
1000000000000,
10000000000000,
100000000000000,
1000000000000000,
10000000000000000,
100000000000000000,
1000000000000000000};
Decimal64ColumnReader::Decimal64ColumnReader(const Type& type,
StripeStreams& stripe
): ColumnReader(type, stripe) {
scale = static_cast<int32_t>(type.getScale());
precision = static_cast<int32_t>(type.getPrecision());
valueStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (valueStream == nullptr)
throw ParseError("DATA stream not found in Decimal64Column");
buffer = nullptr;
bufferEnd = nullptr;
RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_SECONDARY, true);
if (stream == nullptr)
throw ParseError("SECONDARY stream not found in Decimal64Column");
scaleDecoder = createRleDecoder(std::move(stream), true, vers, memoryPool);
}
Decimal64ColumnReader::~Decimal64ColumnReader() {
// PASS
}
uint64_t Decimal64ColumnReader::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
uint64_t skipped = 0;
while (skipped < numValues) {
readBuffer();
if (!(0x80 & *(buffer++))) {
skipped += 1;
}
}
scaleDecoder->skip(numValues);
return numValues;
}
void Decimal64ColumnReader::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
Decimal64VectorBatch &batch =
dynamic_cast<Decimal64VectorBatch&>(rowBatch);
int64_t* values = batch.values.data();
// read the next group of scales
int64_t* scaleBuffer = batch.readScales.data();
scaleDecoder->next(scaleBuffer, numValues, notNull);
batch.precision = precision;
batch.scale = scale;
if (notNull) {
for(size_t i=0; i < numValues; ++i) {
if (notNull[i]) {
readInt64(values[i], static_cast<int32_t>(scaleBuffer[i]));
}
}
} else {
for(size_t i=0; i < numValues; ++i) {
readInt64(values[i], static_cast<int32_t>(scaleBuffer[i]));
}
}
}
void scaleInt128(Int128& value, uint32_t scale, uint32_t currentScale) {
if (scale > currentScale) {
while(scale > currentScale) {
uint32_t scaleAdjust =
std::min(Decimal64ColumnReader::MAX_PRECISION_64,
scale - currentScale);
value *= Decimal64ColumnReader::POWERS_OF_TEN[scaleAdjust];
currentScale += scaleAdjust;
}
} else if (scale < currentScale) {
Int128 remainder;
while(currentScale > scale) {
uint32_t scaleAdjust =
std::min(Decimal64ColumnReader::MAX_PRECISION_64,
currentScale - scale);
value = value.divide(Decimal64ColumnReader::POWERS_OF_TEN[scaleAdjust],
remainder);
currentScale -= scaleAdjust;
}
}
}
void Decimal64ColumnReader::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
valueStream->seek(positions.at(columnId));
scaleDecoder->seek(positions.at(columnId));
}
class Decimal128ColumnReader: public Decimal64ColumnReader {
public:
Decimal128ColumnReader(const Type& type, StripeStreams& stipe);
~Decimal128ColumnReader() override;
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) override;
private:
void readInt128(Int128& value, int32_t currentScale) {
value = 0;
Int128 work;
uint32_t offset = 0;
while (true) {
readBuffer();
unsigned char ch = static_cast<unsigned char>(*(buffer++));
work = ch & 0x7f;
work <<= offset;
value |= work;
offset += 7;
if (!(ch & 0x80)) {
break;
}
}
unZigZagInt128(value);
scaleInt128(value, static_cast<uint32_t>(scale),
static_cast<uint32_t>(currentScale));
}
};
Decimal128ColumnReader::Decimal128ColumnReader
(const Type& type,
StripeStreams& stripe
): Decimal64ColumnReader(type, stripe) {
// PASS
}
Decimal128ColumnReader::~Decimal128ColumnReader() {
// PASS
}
void Decimal128ColumnReader::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
Decimal128VectorBatch &batch =
dynamic_cast<Decimal128VectorBatch&>(rowBatch);
Int128* values = batch.values.data();
// read the next group of scales
int64_t* scaleBuffer = batch.readScales.data();
scaleDecoder->next(scaleBuffer, numValues, notNull);
batch.precision = precision;
batch.scale = scale;
if (notNull) {
for(size_t i=0; i < numValues; ++i) {
if (notNull[i]) {
readInt128(values[i], static_cast<int32_t>(scaleBuffer[i]));
}
}
} else {
for(size_t i=0; i < numValues; ++i) {
readInt128(values[i], static_cast<int32_t>(scaleBuffer[i]));
}
}
}
class DecimalHive11ColumnReader: public Decimal64ColumnReader {
private:
bool throwOnOverflow;
std::ostream* errorStream;
/**
* Read an Int128 from the stream and correct it to the desired scale.
*/
bool readInt128(Int128& value, int32_t currentScale) {
// -/+ 99999999999999999999999999999999999999
static const Int128 MIN_VALUE(-0x4b3b4ca85a86c47b, 0xf675ddc000000001);
static const Int128 MAX_VALUE( 0x4b3b4ca85a86c47a, 0x098a223fffffffff);
value = 0;
Int128 work;
uint32_t offset = 0;
bool result = true;
while (true) {
readBuffer();
unsigned char ch = static_cast<unsigned char>(*(buffer++));
work = ch & 0x7f;
// If we have read more than 128 bits, we flag the error, but keep
// reading bytes so the stream isn't thrown off.
if (offset > 128 || (offset == 126 && work > 3)) {
result = false;
}
work <<= offset;
value |= work;
offset += 7;
if (!(ch & 0x80)) {
break;
}
}
if (!result) {
return result;
}
unZigZagInt128(value);
scaleInt128(value, static_cast<uint32_t>(scale),
static_cast<uint32_t>(currentScale));
return value >= MIN_VALUE && value <= MAX_VALUE;
}
public:
DecimalHive11ColumnReader(const Type& type, StripeStreams& stipe);
~DecimalHive11ColumnReader() override;
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) override;
};
DecimalHive11ColumnReader::DecimalHive11ColumnReader
(const Type& type,
StripeStreams& stripe
): Decimal64ColumnReader(type, stripe) {
scale = stripe.getForcedScaleOnHive11Decimal();
throwOnOverflow = stripe.getThrowOnHive11DecimalOverflow();
errorStream = stripe.getErrorStream();
}
DecimalHive11ColumnReader::~DecimalHive11ColumnReader() {
// PASS
}
void DecimalHive11ColumnReader::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
Decimal128VectorBatch &batch =
dynamic_cast<Decimal128VectorBatch&>(rowBatch);
Int128* values = batch.values.data();
// read the next group of scales
int64_t* scaleBuffer = batch.readScales.data();
scaleDecoder->next(scaleBuffer, numValues, notNull);
batch.precision = precision;
batch.scale = scale;
if (notNull) {
for(size_t i=0; i < numValues; ++i) {
if (notNull[i]) {
if (!readInt128(values[i],
static_cast<int32_t>(scaleBuffer[i]))) {
if (throwOnOverflow) {
throw ParseError("Hive 0.11 decimal was more than 38 digits.");
} else {
*errorStream << "Warning: "
<< "Hive 0.11 decimal with more than 38 digits "
<< "replaced by NULL.\n";
notNull[i] = false;
}
}
}
}
} else {
for(size_t i=0; i < numValues; ++i) {
if (!readInt128(values[i],
static_cast<int32_t>(scaleBuffer[i]))) {
if (throwOnOverflow) {
throw ParseError("Hive 0.11 decimal was more than 38 digits.");
} else {
*errorStream << "Warning: "
<< "Hive 0.11 decimal with more than 38 digits "
<< "replaced by NULL.\n";
batch.hasNulls = true;
batch.notNull[i] = false;
}
}
}
}
}
/**
* Create a reader for the given stripe.
*/
std::unique_ptr<ColumnReader> buildReader(const Type& type,
StripeStreams& stripe) {
switch (static_cast<int64_t>(type.getKind())) {
case DATE:
case INT:
case LONG:
case SHORT:
return std::unique_ptr<ColumnReader>(
new IntegerColumnReader(type, stripe));
case BINARY:
case CHAR:
case STRING:
case VARCHAR:
switch (static_cast<int64_t>(stripe.getEncoding(type.getColumnId()).kind())){
case proto::ColumnEncoding_Kind_DICTIONARY:
case proto::ColumnEncoding_Kind_DICTIONARY_V2:
return std::unique_ptr<ColumnReader>(
new StringDictionaryColumnReader(type, stripe));
case proto::ColumnEncoding_Kind_DIRECT:
case proto::ColumnEncoding_Kind_DIRECT_V2:
return std::unique_ptr<ColumnReader>(
new StringDirectColumnReader(type, stripe));
default:
throw NotImplementedYet("buildReader unhandled string encoding");
}
case BOOLEAN:
return std::unique_ptr<ColumnReader>(
new BooleanColumnReader(type, stripe));
case BYTE:
return std::unique_ptr<ColumnReader>(
new ByteColumnReader(type, stripe));
case LIST:
return std::unique_ptr<ColumnReader>(
new ListColumnReader(type, stripe));
case MAP:
return std::unique_ptr<ColumnReader>(
new MapColumnReader(type, stripe));
case UNION:
return std::unique_ptr<ColumnReader>(
new UnionColumnReader(type, stripe));
case STRUCT:
return std::unique_ptr<ColumnReader>(
new StructColumnReader(type, stripe));
case FLOAT:
case DOUBLE:
return std::unique_ptr<ColumnReader>(
new DoubleColumnReader(type, stripe));
case TIMESTAMP:
return std::unique_ptr<ColumnReader>
(new TimestampColumnReader(type, stripe));
case DECIMAL:
// is this a Hive 0.11 or 0.12 file?
if (type.getPrecision() == 0) {
return std::unique_ptr<ColumnReader>
(new DecimalHive11ColumnReader(type, stripe));
// can we represent the values using int64_t?
} else if (type.getPrecision() <=
Decimal64ColumnReader::MAX_PRECISION_64) {
return std::unique_ptr<ColumnReader>
(new Decimal64ColumnReader(type, stripe));
// otherwise we use the Int128 implementation
} else {
return std::unique_ptr<ColumnReader>
(new Decimal128ColumnReader(type, stripe));
}
default:
throw NotImplementedYet("buildReader unhandled type");
}
}
}