blob: 9f8d8b8f8881027ad518101c5b578d215547e9cd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Adaptor.hh"
#include "ColumnReader.hh"
#include "orc/Exceptions.hh"
#include "OrcTest.hh"
#include "wrap/orc-proto-wrapper.hh"
#include "wrap/gtest-wrapper.h"
#include "wrap/gmock.h"
#include <cmath>
#include <iostream>
#include <vector>
#ifdef __clang__
DIAGNOSTIC_IGNORE("-Winconsistent-missing-override")
DIAGNOSTIC_IGNORE("-Wmissing-variable-declarations")
#endif
#ifdef __GNUC__
DIAGNOSTIC_IGNORE("-Wparentheses")
#endif
namespace orc {
using ::testing::TestWithParam;
using ::testing::Values;
class MockStripeStreams : public StripeStreams {
public:
~MockStripeStreams() override;
std::unique_ptr<SeekableInputStream> getStream(uint64_t columnId,
proto::Stream_Kind kind,
bool stream) const override;
MOCK_CONST_METHOD0(getSelectedColumns,
const std::vector<bool>()
);
MOCK_CONST_METHOD1(getEncoding, proto::ColumnEncoding(uint64_t)
);
MOCK_CONST_METHOD3(getStreamProxy, SeekableInputStream
*
(uint64_t, proto::Stream_Kind, bool));
MOCK_CONST_METHOD0(getErrorStream, std::ostream
*());
MOCK_CONST_METHOD0(getThrowOnHive11DecimalOverflow,
bool());
MOCK_CONST_METHOD0(getForcedScaleOnHive11Decimal, int32_t()
);
MemoryPool &getMemoryPool() const {
return *getDefaultPool();
}
const Timezone &getWriterTimezone() const override {
return getTimezoneByName("America/Los_Angeles");
}
};
MockStripeStreams::~MockStripeStreams() {
// PASS
}
std::unique_ptr<SeekableInputStream>
MockStripeStreams::getStream(uint64_t columnId,
proto::Stream_Kind kind,
bool shouldStream) const {
return std::unique_ptr<SeekableInputStream>
(getStreamProxy(columnId, kind, shouldStream));
}
bool isNotNull(tm *timeptr) {
return timeptr != nullptr;
}
class TestColumnReaderEncoded : public TestWithParam<bool> {
virtual void SetUp();
protected:
bool encoded;
};
void TestColumnReaderEncoded::SetUp() {
encoded = GetParam();
}
TEST(TestColumnReader, testBooleanWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// alternate 4 non-null and 4 null via [0xf0 for x in range(512 / 8)]
const unsigned char buffer1[] = { 0x3d, 0xf0 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
// [0x0f for x in range(256 / 8)]
const unsigned char buffer2[] = {0x1d, 0x0f};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createPrimitiveType(BOOLEAN));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
LongVectorBatch *longBatch = new LongVectorBatch(1024, *getDefaultPool());
StructVectorBatch batch(1024, *getDefaultPool());
batch.fields.push_back(longBatch);
reader->next(batch, 512, 0);
ASSERT_EQ(512, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(512, longBatch->numElements);
ASSERT_EQ(true, longBatch->hasNulls);
unsigned int next = 0;
for (size_t i = 0; i < batch.numElements; ++i) {
if (i & 4) {
EXPECT_EQ(0, longBatch->notNull[i]) << "Wrong value at " << i;
} else {
EXPECT_EQ(1, longBatch->notNull[i]) << "Wrong value at " << i;
EXPECT_EQ((next++ & 4) != 0, longBatch->data[i])
<< "Wrong value at " << i;
}
}
}
TEST(TestColumnReader, testBooleanSkipsWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// alternate 4 non-null and 4 null via [0xf0 for x in range(512 / 8)]
const unsigned char buffer1[] = {0x3d, 0xf0};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
// [0x0f for x in range(128 / 8)]
const unsigned char buffer2[] = {0x1d, 0x0f};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createPrimitiveType(BOOLEAN));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
LongVectorBatch *longBatch = new LongVectorBatch(1024, *getDefaultPool());
StructVectorBatch batch(1024, *getDefaultPool());
batch.fields.push_back(longBatch);
reader->next(batch, 1, 0);
ASSERT_EQ(1, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(1, longBatch->numElements);
ASSERT_EQ(true, !longBatch->hasNulls);
EXPECT_EQ(0, longBatch->data[0]);
reader->skip(506);
reader->next(batch, 5, 0);
ASSERT_EQ(5, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(5, longBatch->numElements);
ASSERT_EQ(true, longBatch->hasNulls);
EXPECT_EQ(1, longBatch->data[0]);
EXPECT_EQ(true, !longBatch->notNull[1]);
EXPECT_EQ(true, !longBatch->notNull[2]);
EXPECT_EQ(true, !longBatch->notNull[3]);
EXPECT_EQ(true, !longBatch->notNull[4]);
}
TEST(TestColumnReader, testByteWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// alternate 4 non-null and 4 null via [0xf0 for x in range(512 / 8)]
const unsigned char buffer1[] = {0x3d, 0xf0};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
// range(256)
char buffer[258];
buffer[0] = '\x80';
for (unsigned int i = 0; i < 128; ++i) {
buffer[i + 1] = static_cast<char>(i);
}
buffer[129] = '\x80';
for (unsigned int i = 128; i < 256; ++i) {
buffer[i + 2] = static_cast<char>(i);
}
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer, ARRAY_SIZE(buffer))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createPrimitiveType(BYTE));
std::unique_ptr<ColumnReader> reader =
buildReader(*rowType, streams);
LongVectorBatch *longBatch = new LongVectorBatch(1024, *getDefaultPool());
StructVectorBatch batch(1024, *getDefaultPool());
batch.fields.push_back(longBatch);
reader->next(batch, 512, 0);
ASSERT_EQ(512, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(512, longBatch->numElements);
ASSERT_EQ(true, longBatch->hasNulls);
unsigned int next = 0;
for (size_t i = 0; i < batch.numElements; ++i) {
if (i & 4) {
EXPECT_EQ(0, longBatch->notNull[i]) << "Wrong value at " << i;
} else {
EXPECT_EQ(1, longBatch->notNull[i]) << "Wrong value at " << i;
EXPECT_EQ(static_cast<char>(next++), longBatch->data[i])
<< "Wrong value at " << i;
}
}
}
TEST(TestColumnReader, testByteSkipsWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// alternate 4 non-null and 4 null via [0xf0 for x in range(512 / 8)]
const unsigned char buffer1[] = {0x3d, 0xf0};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
// range(256)
char buffer[258];
buffer[0] = '\x80';
for (unsigned int i = 0; i < 128; ++i) {
buffer[i + 1] = static_cast<char>(i);
}
buffer[129] = '\x80';
for (unsigned int i = 128; i < 256; ++i) {
buffer[i + 2] = static_cast<char>(i);
}
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer, ARRAY_SIZE(buffer))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createPrimitiveType(BYTE));
std::unique_ptr<ColumnReader> reader =
buildReader(*rowType, streams);
LongVectorBatch *longBatch = new LongVectorBatch(1024, *getDefaultPool());
StructVectorBatch batch(1024, *getDefaultPool());
batch.fields.push_back(longBatch);
reader->next(batch, 1, 0);
ASSERT_EQ(1, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(1, longBatch->numElements);
ASSERT_EQ(true, !longBatch->hasNulls);
EXPECT_EQ(0, longBatch->data[0]);
reader->skip(506);
reader->next(batch, 5, 0);
ASSERT_EQ(5, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(5, longBatch->numElements);
ASSERT_EQ(true, longBatch->hasNulls);
EXPECT_EQ(static_cast<char>(-1), longBatch->data[0]);
EXPECT_EQ(true, !longBatch->notNull[1]);
EXPECT_EQ(true, !longBatch->notNull[2]);
EXPECT_EQ(true, !longBatch->notNull[3]);
EXPECT_EQ(true, !longBatch->notNull[4]);
}
TEST(TestColumnReader, testIntegerWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
const unsigned char buffer1[] = {0x16, 0xf0};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
const unsigned char buffer2[] = {0x64, 0x01, 0x00};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("myInt", createPrimitiveType(INT));
std::unique_ptr<ColumnReader> reader =
buildReader(*rowType, streams);
LongVectorBatch *longBatch = new LongVectorBatch(1024, *getDefaultPool());
StructVectorBatch batch(1024, *getDefaultPool());
batch.fields.push_back(longBatch);
reader->next(batch, 200, 0);
ASSERT_EQ(200, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(200, longBatch->numElements);
ASSERT_EQ(true, longBatch->hasNulls);
long next = 0;
for (size_t i = 0; i < batch.numElements; ++i) {
if (i & 4) {
EXPECT_EQ(0, longBatch->notNull[i]);
} else {
EXPECT_EQ(1, longBatch->notNull[i]);
EXPECT_EQ(next++, longBatch->data[i]);
}
}
}
TEST_P(TestColumnReaderEncoded, testDictionaryWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(0))
.WillRepeatedly(testing::Return(directEncoding));
proto::ColumnEncoding dictionaryEncoding;
dictionaryEncoding.set_kind(proto::ColumnEncoding_Kind_DICTIONARY);
dictionaryEncoding.set_dictionarysize(2);
EXPECT_CALL(streams, getEncoding(1))
.WillRepeatedly(testing::Return(dictionaryEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
const unsigned char buffer1[] = { 0x19, 0xf0 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
const unsigned char buffer2[] = { 0x2f, 0x00, 0x00, 0x2f, 0x00, 0x01 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
const unsigned char buffer3[] = { 0x4f, 0x52, 0x43, 0x4f, 0x77, 0x65, 0x6e };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DICTIONARY_DATA,
false))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer3, ARRAY_SIZE(buffer3))));
const unsigned char buffer4[] = { 0x02, 0x01, 0x03 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_LENGTH, false))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer4, ARRAY_SIZE(buffer4))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("myString", createPrimitiveType(STRING));
std::unique_ptr<ColumnReader> reader =
buildReader(*rowType, streams);
if (encoded) {
EncodedStringVectorBatch *encodedStringBatch = new EncodedStringVectorBatch(1024,
*getDefaultPool());
StructVectorBatch batch(1024, *getDefaultPool());
batch.fields.push_back(encodedStringBatch);
reader->nextEncoded(batch, 200, 0);
ASSERT_EQ(200, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(200, encodedStringBatch->numElements);
ASSERT_EQ(true, encodedStringBatch->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
if (i & 4) {
EXPECT_EQ(0, encodedStringBatch->notNull[i]);
} else {
EXPECT_EQ(1, encodedStringBatch->notNull[i]);
const char* expected = i < 98 ? "ORC" : "Owen";
int64_t index = encodedStringBatch->index.data()[i];
char* actualString;
int64_t actualLength;
encodedStringBatch->dictionary->getValueByIndex(index, actualString, actualLength);
ASSERT_EQ(strlen(expected), actualLength)
<< "Wrong length at " << i;
for (size_t letter = 0; letter < strlen(expected); ++letter) {
EXPECT_EQ(expected[letter], actualString[letter])
<< "Wrong contents at " << i << ", " << letter;
}
}
}
} else {
StringVectorBatch *stringBatch = new StringVectorBatch(1024,
*getDefaultPool());
StructVectorBatch batch(1024, *getDefaultPool());
batch.fields.push_back(stringBatch);
reader->next(batch, 200, 0);
ASSERT_EQ(200, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(200, stringBatch->numElements);
ASSERT_EQ(true, stringBatch->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
if (i & 4) {
EXPECT_EQ(0, stringBatch->notNull[i]);
} else {
EXPECT_EQ(1, stringBatch->notNull[i]);
const char* expected = i < 98 ? "ORC" : "Owen";
ASSERT_EQ(strlen(expected), stringBatch->length[i])
<< "Wrong length at " << i;
for (size_t letter = 0; letter < strlen(expected); ++letter) {
EXPECT_EQ(expected[letter], stringBatch->data[i][letter])
<< "Wrong contents at " << i << ", " << letter;
}
}
}
}
}
TEST_P(TestColumnReaderEncoded, testVarcharDictionaryWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(3, true);
selectedColumns.push_back(false);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(0))
.WillRepeatedly(testing::Return(directEncoding));
proto::ColumnEncoding dictionary2Encoding;
dictionary2Encoding.set_kind(proto::ColumnEncoding_Kind_DICTIONARY);
dictionary2Encoding.set_dictionarysize(2);
EXPECT_CALL(streams, getEncoding(1))
.WillRepeatedly(testing::Return(dictionary2Encoding));
proto::ColumnEncoding dictionary0Encoding;
dictionary0Encoding.set_kind(proto::ColumnEncoding_Kind_DICTIONARY);
dictionary0Encoding.set_dictionarysize(0);
EXPECT_CALL(streams, getEncoding(testing::Ge(2)))
.WillRepeatedly(testing::Return(dictionary0Encoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
const unsigned char buffer1[] = { 0x16, 0xff };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
const unsigned char buffer2[] = { 0x61, 0x00, 0x01, 0x61, 0x00, 0x00 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
const unsigned char buffer3[] = { 0x4f, 0x52, 0x43, 0x4f, 0x77,
0x65, 0x6e };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DICTIONARY_DATA,
false))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer3, ARRAY_SIZE(buffer3))));
const unsigned char buffer4[] = { 0x02, 0x01, 0x03 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_LENGTH, false))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer4, ARRAY_SIZE(buffer4))));
const unsigned char buffer5[] = {0x16, 0x00};
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer5, ARRAY_SIZE(buffer5))));
// all three return an empty stream
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer5, 0)));
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_DICTIONARY_DATA,
false))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer5, 0)));
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_LENGTH, false))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer5, 0)));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createPrimitiveType(VARCHAR))
->addStructField("col1", createPrimitiveType(CHAR))
->addStructField("col2", createPrimitiveType(STRING));
std::unique_ptr<ColumnReader> reader =
buildReader(*rowType, streams);
if (encoded) {
StructVectorBatch batch(1024, *getDefaultPool());
EncodedStringVectorBatch *encodedStringBatch = new EncodedStringVectorBatch(1024,
*getDefaultPool());
EncodedStringVectorBatch *nullBatch = new EncodedStringVectorBatch(1024,
*getDefaultPool());
batch.fields.push_back(encodedStringBatch);
batch.fields.push_back(nullBatch);
reader->nextEncoded(batch, 200, 0);
ASSERT_EQ(200, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(200, encodedStringBatch->numElements);
ASSERT_EQ(true, !encodedStringBatch->hasNulls);
ASSERT_EQ(200, nullBatch->numElements);
ASSERT_EQ(true, nullBatch->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
EXPECT_EQ(true, encodedStringBatch->notNull[i]);
EXPECT_EQ(true, !nullBatch->notNull[i]);
const char* expected = i < 100 ? "Owen" : "ORC";
int64_t index = encodedStringBatch->index.data()[i];
char* actualString;
int64_t actualLength;
encodedStringBatch->dictionary->getValueByIndex(index, actualString, actualLength);
ASSERT_EQ(strlen(expected), actualLength)
<< "Wrong length at " << i;
for (size_t letter = 0; letter < strlen(expected); ++letter) {
EXPECT_EQ(expected[letter], actualString[letter])
<< "Wrong contents at " << i << ", " << letter;
}
}
} else {
StructVectorBatch batch(1024, *getDefaultPool());
StringVectorBatch *stringBatch = new StringVectorBatch(1024,
*getDefaultPool());
StringVectorBatch *nullBatch = new StringVectorBatch(1024,
*getDefaultPool());
batch.fields.push_back(stringBatch);
batch.fields.push_back(nullBatch);
reader->next(batch, 200, 0);
ASSERT_EQ(200, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(200, stringBatch->numElements);
ASSERT_EQ(true, !stringBatch->hasNulls);
ASSERT_EQ(200, nullBatch->numElements);
ASSERT_EQ(true, nullBatch->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
EXPECT_EQ(true, stringBatch->notNull[i]);
EXPECT_EQ(true, !nullBatch->notNull[i]);
const char* expected = i < 100 ? "Owen" : "ORC";
ASSERT_EQ(strlen(expected), stringBatch->length[i])
<< "Wrong length at " << i;
for (size_t letter = 0; letter < strlen(expected); ++letter) {
EXPECT_EQ(expected[letter], stringBatch->data[i][letter])
<< "Wrong contents at " << i << ", " << letter;
}
}
}
}
TEST(TestColumnReader, testSubstructsWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(4, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
const unsigned char buffer1[] = {0x16, 0x0f};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
const unsigned char buffer2[] = { 0x0a, 0x55 };
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
const unsigned char buffer3[] = { 0x04, 0xf0 };
EXPECT_CALL(streams, getStreamProxy(3, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer3, ARRAY_SIZE(buffer3))));
const unsigned char buffer4[] = { 0x17, 0x01, 0x00 };
EXPECT_CALL(streams, getStreamProxy(3, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer4, ARRAY_SIZE(buffer4))));
// create the row type
std::unique_ptr<Type> innerType = createStructType();
innerType->addStructField("col2", createPrimitiveType(LONG));
std::unique_ptr<Type> middleType = createStructType();
middleType->addStructField("col1", std::move(innerType));
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", std::move(middleType));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(1024, *getDefaultPool());
StructVectorBatch *middle = new StructVectorBatch(1024, *getDefaultPool());
StructVectorBatch *inner = new StructVectorBatch(1024, *getDefaultPool());
LongVectorBatch *longs = new LongVectorBatch(1024, *getDefaultPool());
batch.fields.push_back(middle);
middle->fields.push_back(inner);
inner->fields.push_back(longs);
reader->next(batch, 200, 0);
ASSERT_EQ(200, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(200, middle->numElements);
ASSERT_EQ(true, middle->hasNulls);
ASSERT_EQ(200, inner->numElements);
ASSERT_EQ(true, inner->hasNulls);
ASSERT_EQ(200, longs->numElements);
ASSERT_EQ(true, longs->hasNulls);
long middleCount = 0;
long innerCount = 0;
long longCount = 0;
for (size_t i = 0; i < batch.numElements; ++i) {
if (i & 4) {
EXPECT_EQ(true, middle->notNull[i]) << "Wrong at " << i;
if (middleCount++ & 1) {
EXPECT_EQ(true, inner->notNull[i]) << "Wrong at " << i;
if (innerCount++ & 4) {
EXPECT_EQ(true, !longs->notNull[i]) << "Wrong at " << i;
} else {
EXPECT_EQ(true, longs->notNull[i]) << "Wrong at " << i;
EXPECT_EQ(longCount++, longs->data[i]) << "Wrong at " << i;
}
} else {
EXPECT_EQ(true, !inner->notNull[i]) << "Wrong at " << i;
EXPECT_EQ(true, !longs->notNull[i]) << "Wrong at " << i;
}
} else {
EXPECT_EQ(true, !middle->notNull[i]) << "Wrong at " << i;
EXPECT_EQ(true, !inner->notNull[i]) << "Wrong at " << i;
EXPECT_EQ(true, !longs->notNull[i]) << "Wrong at " << i;
}
}
}
TEST(TestColumnReader, testSkipWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(3, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
proto::ColumnEncoding dictionaryEncoding;
dictionaryEncoding.set_kind(proto::ColumnEncoding_Kind_DICTIONARY);
dictionaryEncoding.set_dictionarysize(100);
EXPECT_CALL(streams, getEncoding(2))
.WillRepeatedly(testing::Return(dictionaryEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
const unsigned char buffer1[] = { 0x03, 0x00, 0xff, 0x3f, 0x08, 0xff,
0xff, 0xfc, 0x03, 0x00 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
const unsigned char buffer2[] = { 0x61, 0x01, 0x00 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// fill the dictionary with '00' to '99'
char digits[200];
for (int i = 0; i < 10; ++i) {
for (int j = 0; j < 10; ++j) {
digits[2 * (10 * i + j)] = static_cast<char>('0' + i);
digits[2 * (10 * i + j) + 1] = static_cast<char>('0' + j);
}
}
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_DICTIONARY_DATA,
false))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(digits, ARRAY_SIZE(digits))));
const unsigned char buffer3[] = {0x61, 0x00, 0x02};
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_LENGTH, false))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer3, ARRAY_SIZE(buffer3))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("myInt", createPrimitiveType(INT));
rowType->addStructField("myString", createPrimitiveType(STRING));
std::unique_ptr<ColumnReader> reader =
buildReader(*rowType, streams);
StructVectorBatch batch(100, *getDefaultPool());
LongVectorBatch *longBatch = new LongVectorBatch(100, *getDefaultPool());
StringVectorBatch *stringBatch =
new StringVectorBatch(100, *getDefaultPool());
batch.fields.push_back(longBatch);
batch.fields.push_back(stringBatch);
reader->next(batch, 20, 0);
ASSERT_EQ(20, batch.numElements);
ASSERT_EQ(20, longBatch->numElements);
ASSERT_EQ(20, stringBatch->numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(true, longBatch->hasNulls);
ASSERT_EQ(true, stringBatch->hasNulls);
for (size_t i = 0; i < 20; ++i) {
EXPECT_EQ(true, !longBatch->notNull[i]) << "Wrong at " << i;
EXPECT_EQ(true, !stringBatch->notNull[i]) << "Wrong at " << i;
}
reader->skip(30);
reader->next(batch, 100, 0);
ASSERT_EQ(100, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(true, !longBatch->hasNulls);
ASSERT_EQ(true, !stringBatch->hasNulls);
for (size_t i = 0; i < 10; ++i) {
for (size_t j = 0; j < 10; ++j) {
size_t k = 10 * i + j;
EXPECT_EQ(1, longBatch->notNull[k]) << "Wrong at " << k;
ASSERT_EQ(2, stringBatch->length[k])<< "Wrong at " << k;
EXPECT_EQ('0' + static_cast<char>(i), stringBatch->data[k][0])
<< "Wrong at " << k;
EXPECT_EQ('0' + static_cast<char>(j), stringBatch->data[k][1])
<< "Wrong at " << k;
}
}
reader->skip(50);
}
TEST(TestColumnReader, testBinaryDirect) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
char blob[200];
for (size_t i = 0; i < 10; ++i) {
for (size_t j = 0; j < 10; ++j) {
blob[2 * (10 * i + j)] = static_cast<char>(i);
blob[2 * (10 * i + j) + 1] = static_cast<char>(j);
}
}
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(blob, ARRAY_SIZE(blob))));
const unsigned char buffer[] = { 0x61, 0x00, 0x02 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_LENGTH, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer, ARRAY_SIZE(buffer))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createPrimitiveType(BINARY));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(1024, *getDefaultPool());
StringVectorBatch *strings = new StringVectorBatch(1024, *getDefaultPool());
batch.fields.push_back(strings);
for (size_t i = 0; i < 2; ++i) {
reader->next(batch, 50, 0);
ASSERT_EQ(50, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(50, strings->numElements);
ASSERT_EQ(true, !strings->hasNulls);
for (size_t j = 0; j < batch.numElements; ++j) {
ASSERT_EQ(2, strings->length[j]);
EXPECT_EQ((50 * i + j) / 10, strings->data[j][0]);
EXPECT_EQ((50 * i + j) % 10, strings->data[j][1]);
}
}
}
TEST(TestColumnReader, testBinaryDirectWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
const unsigned char buffer1[] = {0x1d, 0xf0};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
char blob[256];
for (size_t i = 0; i < 8; ++i) {
for (size_t j = 0; j < 16; ++j) {
blob[2 * (16 * i + j)] = static_cast<char>('A' + i);
blob[2 * (16 * i + j) + 1] = static_cast<char>('A' + j);
}
}
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(blob, ARRAY_SIZE(blob))));
const unsigned char buffer2[] = {0x7d, 0x00, 0x02};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_LENGTH, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createPrimitiveType(BINARY));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(1024, *getDefaultPool());
StringVectorBatch *strings = new StringVectorBatch(1024, *getDefaultPool());
batch.fields.push_back(strings);
size_t next = 0;
for (size_t i = 0; i < 2; ++i) {
reader->next(batch, 128, 0);
ASSERT_EQ(128, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(128, strings->numElements);
ASSERT_EQ(true, strings->hasNulls);
for (size_t j = 0; j < batch.numElements; ++j) {
ASSERT_EQ(((128 * i + j) & 4) == 0, strings->notNull[j]);
if (strings->notNull[j]) {
ASSERT_EQ(2, strings->length[j]);
EXPECT_EQ('A' + static_cast<char>(next / 16), strings->data[j][0]);
EXPECT_EQ('A' + static_cast<char>(next % 16), strings->data[j][1]);
next += 1;
}
}
}
}
TEST(TestColumnReader, testShortBlobError) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
char blob[100];
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(blob, ARRAY_SIZE(blob))));
const unsigned char buffer1[] = {0x61, 0x00, 0x02};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_LENGTH, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createPrimitiveType(STRING));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(1024, *getDefaultPool());
StringVectorBatch *strings = new StringVectorBatch(1024, *getDefaultPool());
batch.fields.push_back(strings);
EXPECT_THROW(reader->next(batch, 100, 0), ParseError);
}
TEST_P(TestColumnReaderEncoded, testStringDirectShortBuffer) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
char blob[200];
for (size_t i = 0; i < 10; ++i) {
for (size_t j = 0; j < 10; ++j) {
blob[2 * (10 * i + j)] = static_cast<char>(i);
blob[2 * (10 * i + j) + 1] = static_cast<char>(j);
}
}
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(blob, ARRAY_SIZE(blob), 3)));
const unsigned char buffer1[] = {0x61, 0x00, 0x02};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_LENGTH, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createPrimitiveType(STRING));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(25, *getDefaultPool());
StringVectorBatch *strings = new StringVectorBatch(25, *getDefaultPool());
batch.fields.push_back(strings);
for (size_t i = 0; i < 4; ++i) {
if (encoded) {
reader->nextEncoded(batch, 25, 0);
} else {
reader->next(batch, 25, 0);
}
ASSERT_EQ(25, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(25, strings->numElements);
ASSERT_EQ(true, !strings->hasNulls);
for (size_t j = 0; j < batch.numElements; ++j) {
ASSERT_EQ(2, strings->length[j]);
EXPECT_EQ((25 * i + j) / 10, strings->data[j][0]);
EXPECT_EQ((25 * i + j) % 10, strings->data[j][1]);
}
}
}
TEST_P(TestColumnReaderEncoded, testStringDirectShortBufferWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
const unsigned char buffer1[] = {0x3d, 0xf0};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
char blob[512];
for (size_t i = 0; i < 16; ++i) {
for (size_t j = 0; j < 16; ++j) {
blob[2 * (16 * i + j)] = static_cast<char>('A' + i);
blob[2 * (16 * i + j) + 1] = static_cast<char>('A' + j);
}
}
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(blob, ARRAY_SIZE(blob), 30)));
const unsigned char buffer2[] = { 0x7d, 0x00, 0x02, 0x7d, 0x00, 0x02 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_LENGTH, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createPrimitiveType(STRING));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(64, *getDefaultPool());
StringVectorBatch *strings = new StringVectorBatch(64, *getDefaultPool());
batch.fields.push_back(strings);
size_t next = 0;
for (size_t i = 0; i < 8; ++i) {
if (encoded) {
reader->nextEncoded(batch, 64, 0);
} else {
reader->next(batch, 64, 0);
}
ASSERT_EQ(64, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(64, strings->numElements);
ASSERT_EQ(true, strings->hasNulls);
for (size_t j = 0; j < batch.numElements; ++j) {
ASSERT_EQ((j & 4) == 0, strings->notNull[j]);
if (strings->notNull[j]) {
ASSERT_EQ(2, strings->length[j]);
EXPECT_EQ('A' + next / 16, strings->data[j][0]);
EXPECT_EQ('A' + next % 16, strings->data[j][1]);
next += 1;
}
}
}
}
/**
* Tests ORC-24.
* Requires:
* * direct string encoding
* * a null value where the unused length crosses the streaming block
* and the actual value doesn't
*/
TEST(TestColumnReader, testStringDirectNullAcrossWindow) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
const unsigned char isNull[2] = {0xff, 0x7f};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return
(new SeekableArrayInputStream(isNull,
ARRAY_SIZE(isNull))));
const char blob[] = "abcdefg";
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(blob, ARRAY_SIZE(blob), 4)));
// [1] * 7
const unsigned char lenData[] = {0x04, 0x00, 0x01};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_LENGTH, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(lenData, ARRAY_SIZE(lenData))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createPrimitiveType(STRING));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(25, *getDefaultPool());
StringVectorBatch *strings = new StringVectorBatch(25, *getDefaultPool());
batch.fields.push_back(strings);
// This length value won't be overwritten because the value is null,
// but it induces the problem.
strings->length[0] = 5;
reader->next(batch, 8, 0);
ASSERT_EQ(8, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(8, strings->numElements);
ASSERT_EQ(true, strings->hasNulls);
ASSERT_EQ(true, !strings->notNull[0]);
for (size_t j = 1; j < batch.numElements; ++j) {
ASSERT_EQ(true, strings->notNull[j]);
ASSERT_EQ(1, strings->length[j]);
ASSERT_EQ('a' + j - 1, strings->data[j][0]) << "difference at " << j;
}
}
TEST(TestColumnReader, testStringDirectSkip) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// sum(0 to 1199)
const size_t BLOB_SIZE = 719400;
char blob[BLOB_SIZE];
size_t posn = 0;
for (size_t item = 0; item < 1200; ++item) {
for (size_t ch = 0; ch < item; ++ch) {
blob[posn++] = static_cast<char>(ch);
}
}
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(blob, BLOB_SIZE, 200)));
// the stream of 0 to 1199
const unsigned char buffer1[] =
{ 0x7f, 0x01, 0x00,
0x7f, 0x01, 0x82, 0x01,
0x7f, 0x01, 0x84, 0x02,
0x7f, 0x01, 0x86, 0x03,
0x7f, 0x01, 0x88, 0x04,
0x7f, 0x01, 0x8a, 0x05,
0x7f, 0x01, 0x8c, 0x06,
0x7f, 0x01, 0x8e, 0x07,
0x7f, 0x01, 0x90, 0x08,
0x1b, 0x01, 0x92, 0x09 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_LENGTH, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createPrimitiveType(STRING));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(2, *getDefaultPool());
StringVectorBatch *strings = new StringVectorBatch(2, *getDefaultPool());
batch.fields.push_back(strings);
reader->next(batch, 2, 0);
ASSERT_EQ(2, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(2, strings->numElements);
ASSERT_EQ(true, !strings->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
ASSERT_EQ(i, strings->length[i]);
for (size_t j = 0; j < i; ++j) {
EXPECT_EQ(static_cast<char>(j), strings->data[i][j]);
}
}
reader->skip(14);
reader->next(batch, 2, 0);
ASSERT_EQ(2, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(2, strings->numElements);
ASSERT_EQ(true, !strings->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
ASSERT_EQ(16 + i, strings->length[i]);
for (size_t j = 0; j < 16 + i; ++j) {
EXPECT_EQ(static_cast<char>(j), strings->data[i][j]);
}
}
reader->skip(1180);
reader->next(batch, 2, 0);
ASSERT_EQ(2, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(2, strings->numElements);
ASSERT_EQ(true, !strings->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
ASSERT_EQ(1198 + i, strings->length[i]);
for (size_t j = 0; j < 1198 + i; ++j) {
EXPECT_EQ(static_cast<char>(j), strings->data[i][j]);
}
}
}
TEST(TestColumnReader, testStringDirectSkipWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// alternate 4 non-null and 4 null via [0xf0 for x in range(2400 / 8)]
const unsigned char buffer1[] = { 0x7f, 0xf0, 0x7f, 0xf0, 0x25, 0xf0 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
// sum(range(1200))
const size_t BLOB_SIZE = 719400;
// each string is [x % 256 for x in range(r)]
char blob[BLOB_SIZE];
size_t posn = 0;
for (size_t item = 0; item < 1200; ++item) {
for (size_t ch = 0; ch < item; ++ch) {
blob[posn++] = static_cast<char>(ch);
}
}
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(blob, BLOB_SIZE, 200)));
// range(1200)
const unsigned char buffer2[] = { 0x7f, 0x01, 0x00,
0x7f, 0x01, 0x82, 0x01,
0x7f, 0x01, 0x84, 0x02,
0x7f, 0x01, 0x86, 0x03,
0x7f, 0x01, 0x88, 0x04,
0x7f, 0x01, 0x8a, 0x05,
0x7f, 0x01, 0x8c, 0x06,
0x7f, 0x01, 0x8e, 0x07,
0x7f, 0x01, 0x90, 0x08,
0x1b, 0x01, 0x92, 0x09 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_LENGTH, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createPrimitiveType(STRING));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(2, *getDefaultPool());
StringVectorBatch *strings = new StringVectorBatch(2, *getDefaultPool());
batch.fields.push_back(strings);
reader->next(batch, 2, 0);
ASSERT_EQ(2, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(2, strings->numElements);
ASSERT_EQ(true, !strings->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
ASSERT_EQ(i, strings->length[i]);
for (size_t j = 0; j < i; ++j) {
EXPECT_EQ(static_cast<char>(j), strings->data[i][j]);
}
}
reader->skip(30);
reader->next(batch, 2, 0);
ASSERT_EQ(2, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(2, strings->numElements);
ASSERT_EQ(true, !strings->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
ASSERT_EQ(16 + i, strings->length[i]);
for (size_t j = 0; j < 16 + i; ++j) {
EXPECT_EQ(static_cast<char>(j), strings->data[i][j]);
}
}
reader->skip(2364);
reader->next(batch, 2, 0);
ASSERT_EQ(2, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(2, strings->numElements);
ASSERT_EQ(true, strings->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
EXPECT_EQ(true, !strings->notNull[i]);
}
}
TEST_P(TestColumnReaderEncoded, testList) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(3, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_,
proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// [2 for x in range(600)]
const unsigned char buffer1[] = { 0x7f, 0x00, 0x02,
0x7f, 0x00, 0x02,
0x7f, 0x00, 0x02,
0x7f, 0x00, 0x02,
0x4d, 0x00, 0x02 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_LENGTH, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
// range(1200)
const unsigned char buffer2[] = { 0x7f, 0x01, 0x00,
0x7f, 0x01, 0x84, 0x02,
0x7f, 0x01, 0x88, 0x04,
0x7f, 0x01, 0x8c, 0x06,
0x7f, 0x01, 0x90, 0x08,
0x7f, 0x01, 0x94, 0x0a,
0x7f, 0x01, 0x98, 0x0c,
0x7f, 0x01, 0x9c, 0x0e,
0x7f, 0x01, 0xa0, 0x10,
0x1b, 0x01, 0xa4, 0x12 };
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createListType(createPrimitiveType(LONG)));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(512, *getDefaultPool());
ListVectorBatch *lists = new ListVectorBatch(512, *getDefaultPool());
LongVectorBatch *longs = new LongVectorBatch(512, *getDefaultPool());
batch.fields.push_back(lists);
lists->elements = std::unique_ptr < ColumnVectorBatch > (longs);
if (encoded) {
reader->nextEncoded(batch, 512, 0);
} else {
reader->next(batch, 512, 0);
}
ASSERT_EQ(512, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(512, lists->numElements);
ASSERT_EQ(true, !lists->hasNulls);
ASSERT_EQ(1024, longs->numElements);
ASSERT_EQ(true, !longs->hasNulls);
for (size_t i = 0; i <= batch.numElements; ++i) {
EXPECT_EQ(2 * i, lists->offsets[i]);
}
for (size_t i = 0; i < longs->numElements; ++i) {
EXPECT_EQ(i, longs->data[i]);
}
}
TEST(TestColumnReader, testListPropagateNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(4, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
std::unique_ptr<Type> innerType = createStructType();
innerType->addStructField("col0_0",
createListType(createPrimitiveType(LONG)));
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", std::move(innerType));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_,
proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// set getStream
const unsigned char buffer[] = {0xff, 0x00};
EXPECT_CALL(streams, getStreamProxy(1,proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return
(new SeekableArrayInputStream(buffer,
ARRAY_SIZE(buffer))));
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_LENGTH, true))
.WillRepeatedly(testing::Return
(new SeekableArrayInputStream(buffer, 0)));
EXPECT_CALL(streams, getStreamProxy(3, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return
(new SeekableArrayInputStream(buffer, 0)));
// create the row type
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(512, *getDefaultPool());
StructVectorBatch *structs = new StructVectorBatch(512, *getDefaultPool());
ListVectorBatch *lists = new ListVectorBatch(512, *getDefaultPool());
LongVectorBatch *longs = new LongVectorBatch(512, *getDefaultPool());
batch.fields.push_back(structs);
structs->fields.push_back(lists);
lists->elements = std::unique_ptr < ColumnVectorBatch > (longs);
reader->next(batch, 8, 0);
ASSERT_EQ(8, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(8, structs->numElements);
ASSERT_EQ(true, structs->hasNulls);
ASSERT_EQ(8, lists->numElements);
ASSERT_EQ(true, lists->hasNulls);
ASSERT_EQ(0, longs->numElements);
ASSERT_EQ(true, !longs->hasNulls);
for(size_t i=0; i < 8; ++i) {
EXPECT_EQ(true, !structs->notNull[i]);
}
}
TEST(TestColumnReader, testListWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(3, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// [0xaa for x in range(2048/8)]
const unsigned char buffer1[] = { 0x7f, 0xaa, 0x7b, 0xaa };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// [1 for x in range(260)] +
// [4 for x in range(260)] +
// [0 for x in range(260)] +
// [3 for x in range(243)] +
// [19]
const unsigned char buffer2[] = { 0x7f, 0x00, 0x01,
0x7f, 0x00, 0x01,
0x7f, 0x00, 0x04,
0x7f, 0x00, 0x04,
0x7f, 0x00, 0x00,
0x7f, 0x00, 0x00,
0x7f, 0x00, 0x03,
0x6e, 0x00, 0x03,
0xff, 0x13 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_LENGTH, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// range(2048)
const unsigned char buffer3[] = { 0x7f, 0x01, 0x00,
0x7f, 0x01, 0x84, 0x02,
0x7f, 0x01, 0x88, 0x04,
0x7f, 0x01, 0x8c, 0x06,
0x7f, 0x01, 0x90, 0x08,
0x7f, 0x01, 0x94, 0x0a,
0x7f, 0x01, 0x98, 0x0c,
0x7f, 0x01, 0x9c, 0x0e,
0x7f, 0x01, 0xa0, 0x10,
0x7f, 0x01, 0xa4, 0x12,
0x7f, 0x01, 0xa8, 0x14,
0x7f, 0x01, 0xac, 0x16,
0x7f, 0x01, 0xb0, 0x18,
0x7f, 0x01, 0xb4, 0x1a,
0x7f, 0x01, 0xb8, 0x1c,
0x5f, 0x01, 0xbc, 0x1e };
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer3, ARRAY_SIZE(buffer3))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createListType(createPrimitiveType(LONG)));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(512, *getDefaultPool());
ListVectorBatch *lists = new ListVectorBatch(512, *getDefaultPool());
LongVectorBatch *longs = new LongVectorBatch(512, *getDefaultPool());
batch.fields.push_back(lists);
lists->elements = std::unique_ptr < ColumnVectorBatch > (longs);
reader->next(batch, 512, 0);
ASSERT_EQ(512, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(512, lists->numElements);
ASSERT_EQ(true, lists->hasNulls);
ASSERT_EQ(256, longs->numElements);
ASSERT_EQ(true, !longs->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
EXPECT_EQ(i % 2 == 0, lists->notNull[i]) << "Wrong value at " << i;
EXPECT_EQ((i + 1) / 2, lists->offsets[i]) << "Wrong value at " << i;
}
EXPECT_EQ(256, lists->offsets[512]);
for (size_t i = 0; i < longs->numElements; ++i) {
EXPECT_EQ(i, longs->data[i]);
}
reader->next(batch, 512, 0);
ASSERT_EQ(512, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(512, lists->numElements);
ASSERT_EQ(true, lists->hasNulls);
ASSERT_EQ(1012, longs->numElements);
ASSERT_EQ(true, !longs->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
EXPECT_EQ(i % 2 == 0, lists->notNull[i]) << "Wrong value at " << i;
if (i < 8) {
EXPECT_EQ((i + 1) / 2, lists->offsets[i])
<< "Wrong value at " << i;
} else {
EXPECT_EQ(4 * ((i + 1) / 2) - 12, lists->offsets[i])
<< "Wrong value at " << i;
}
}
EXPECT_EQ(1012, lists->offsets[512]);
for (size_t i = 0; i < longs->numElements; ++i) {
EXPECT_EQ(256 + i, longs->data[i]);
}
reader->next(batch, 512, 0);
ASSERT_EQ(512, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(512, lists->numElements);
ASSERT_EQ(true, lists->hasNulls);
ASSERT_EQ(32, longs->numElements);
ASSERT_EQ(true, !longs->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
EXPECT_EQ(i % 2 == 0, lists->notNull[i]) << "Wrong value at " << i;
if (i < 16) {
EXPECT_EQ(4 * ((i + 1) / 2), lists->offsets[i])
<< "Wrong value at " << i;
} else {
EXPECT_EQ(32, lists->offsets[i]) << "Wrong value at " << i;
}
}
EXPECT_EQ(32, lists->offsets[512]);
for (size_t i = 0; i < longs->numElements; ++i) {
EXPECT_EQ(1268 + i, longs->data[i]);
}
reader->next(batch, 512, 0);
ASSERT_EQ(512, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(512, lists->numElements);
ASSERT_EQ(true, lists->hasNulls);
ASSERT_EQ(748, longs->numElements);
ASSERT_EQ(true, !longs->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
EXPECT_EQ(i % 2 == 0, lists->notNull[i]) << "Wrong value at " << i;
if (i < 24) {
EXPECT_EQ(0, lists->offsets[i]) << "Wrong value at " << i;
} else if (i < 510) {
EXPECT_EQ(3 * ((i - 23) / 2), lists->offsets[i])
<< "Wrong value at " << i;
} else if (i < 511) {
EXPECT_EQ(729, lists->offsets[i]) << "Wrong value at " << i;
} else {
EXPECT_EQ(748, lists->offsets[i]) << "Wrong value at " << i;
}
}
EXPECT_EQ(748, lists->offsets[512]);
for (size_t i = 0; i < longs->numElements; ++i) {
EXPECT_EQ(1300 + i, longs->data[i]);
}
}
TEST(TestColumnReader, testListSkipWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(3, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// [0xaa for x in range(2048/8)]
const unsigned char buffer1[] = { 0x7f, 0xaa, 0x7b, 0xaa };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// [1 for x in range(260)] +
// [4 for x in range(260)] +
// [0 for x in range(260)] +
// [3 for x in range(243)] +
// [19]
const unsigned char buffer2[] = { 0x7f, 0x00, 0x01,
0x7f, 0x00, 0x01,
0x7f, 0x00, 0x04,
0x7f, 0x00, 0x04,
0x7f, 0x00, 0x00,
0x7f, 0x00, 0x00,
0x7f, 0x00, 0x03,
0x6e, 0x00, 0x03,
0xff, 0x13 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_LENGTH, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// range(2048)
const unsigned char buffer3[] = { 0x7f, 0x01, 0x00,
0x7f, 0x01, 0x84, 0x02,
0x7f, 0x01, 0x88, 0x04,
0x7f, 0x01, 0x8c, 0x06,
0x7f, 0x01, 0x90, 0x08,
0x7f, 0x01, 0x94, 0x0a,
0x7f, 0x01, 0x98, 0x0c,
0x7f, 0x01, 0x9c, 0x0e,
0x7f, 0x01, 0xa0, 0x10,
0x7f, 0x01, 0xa4, 0x12,
0x7f, 0x01, 0xa8, 0x14,
0x7f, 0x01, 0xac, 0x16,
0x7f, 0x01, 0xb0, 0x18,
0x7f, 0x01, 0xb4, 0x1a,
0x7f, 0x01, 0xb8, 0x1c,
0x5f, 0x01, 0xbc, 0x1e };
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer3, ARRAY_SIZE(buffer3))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createListType(createPrimitiveType(LONG)));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(1, *getDefaultPool());
ListVectorBatch *lists = new ListVectorBatch(1, *getDefaultPool());
LongVectorBatch *longs = new LongVectorBatch(1, *getDefaultPool());
batch.fields.push_back(lists);
lists->elements = std::unique_ptr < ColumnVectorBatch > (longs);
reader->next(batch, 1, 0);
ASSERT_EQ(1, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(1, lists->numElements);
ASSERT_EQ(true, !lists->hasNulls);
ASSERT_EQ(1, longs->numElements);
ASSERT_EQ(true, !longs->hasNulls);
EXPECT_EQ(0, lists->offsets[0]);
EXPECT_EQ(1, lists->offsets[1]);
EXPECT_EQ(0, longs->data[0]);
reader->skip(13);
reader->next(batch, 1, 0);
ASSERT_EQ(1, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(1, lists->numElements);
ASSERT_EQ(true, !lists->hasNulls);
ASSERT_EQ(1, longs->numElements);
ASSERT_EQ(true, !longs->hasNulls);
EXPECT_EQ(0, lists->offsets[0]);
EXPECT_EQ(1, lists->offsets[1]);
EXPECT_EQ(7, longs->data[0]);
reader->skip(2031);
reader->next(batch, 2, 0);
ASSERT_EQ(2, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(2, lists->numElements);
ASSERT_EQ(true, lists->hasNulls);
ASSERT_EQ(19, longs->numElements);
ASSERT_EQ(true, !longs->hasNulls);
EXPECT_EQ(0, lists->offsets[0]);
EXPECT_EQ(19, lists->offsets[1]);
EXPECT_EQ(19, lists->offsets[2]);
for (size_t i = 0; i < longs->numElements; ++i) {
EXPECT_EQ(2029 + i, longs->data[i]);
}
}
TEST(TestColumnReader, testListSkipWithNullsNoData) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
selectedColumns.push_back(false);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// [0xaa for x in range(2048/8)]
const unsigned char buffer1[] = { 0x7f, 0xaa, 0x7b, 0xaa };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// [1 for x in range(260)] +
// [4 for x in range(260)] +
// [0 for x in range(260)] +
// [3 for x in range(243)] +
// [19]
const unsigned char buffer2[] = { 0x7f, 0x00, 0x01,
0x7f, 0x00, 0x01,
0x7f, 0x00, 0x04,
0x7f, 0x00, 0x04,
0x7f, 0x00, 0x00,
0x7f, 0x00, 0x00,
0x7f, 0x00, 0x03,
0x6e, 0x00, 0x03,
0xff, 0x13 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_LENGTH, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(nullptr));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createListType(createPrimitiveType(LONG)));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(1, *getDefaultPool());
ListVectorBatch *lists = new ListVectorBatch(1, *getDefaultPool());
batch.fields.push_back(lists);
reader->next(batch, 1, 0);
ASSERT_EQ(1, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(1, lists->numElements);
ASSERT_EQ(true, !lists->hasNulls);
EXPECT_EQ(0, lists->offsets[0]);
EXPECT_EQ(1, lists->offsets[1]);
reader->skip(13);
reader->next(batch, 1, 0);
ASSERT_EQ(1, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(1, lists->numElements);
ASSERT_EQ(true, !lists->hasNulls);
EXPECT_EQ(0, lists->offsets[0]);
EXPECT_EQ(1, lists->offsets[1]);
reader->skip(2031);
reader->next(batch, 2, 0);
ASSERT_EQ(2, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(2, lists->numElements);
ASSERT_EQ(true, lists->hasNulls);
EXPECT_EQ(0, lists->offsets[0]);
EXPECT_EQ(19, lists->offsets[1]);
EXPECT_EQ(19, lists->offsets[2]);
}
TEST_P(TestColumnReaderEncoded, testMap) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(4, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_,
proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// [2 for x in range(600)]
const unsigned char buffer1[] = { 0x7f, 0x00, 0x02,
0x7f, 0x00, 0x02,
0x7f, 0x00, 0x02,
0x7f, 0x00, 0x02,
0x4d, 0x00, 0x02 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_LENGTH, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
// range(1200)
const unsigned char buffer2[] = { 0x7f, 0x01, 0x00,
0x7f, 0x01, 0x84, 0x02,
0x7f, 0x01, 0x88, 0x04,
0x7f, 0x01, 0x8c, 0x06,
0x7f, 0x01, 0x90, 0x08,
0x7f, 0x01, 0x94, 0x0a,
0x7f, 0x01, 0x98, 0x0c,
0x7f, 0x01, 0x9c, 0x0e,
0x7f, 0x01, 0xa0, 0x10,
0x1b, 0x01, 0xa4, 0x12 };
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// range(8, 1208)
const unsigned char buffer3[] = { 0x7f, 0x01, 0x10,
0x7f, 0x01, 0x94, 0x02,
0x7f, 0x01, 0x98, 0x04,
0x7f, 0x01, 0x9c, 0x06,
0x7f, 0x01, 0xa0, 0x08,
0x7f, 0x01, 0xa4, 0x0a,
0x7f, 0x01, 0xa8, 0x0c,
0x7f, 0x01, 0xac, 0x0e,
0x7f, 0x01, 0xb0, 0x10,
0x1b, 0x01, 0xb4, 0x12 };
EXPECT_CALL(streams, getStreamProxy(3, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer3, ARRAY_SIZE(buffer3))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createMapType(createPrimitiveType(LONG),
createPrimitiveType(LONG)));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(512, *getDefaultPool());
MapVectorBatch *maps = new MapVectorBatch(512, *getDefaultPool());
LongVectorBatch *keys = new LongVectorBatch(512, *getDefaultPool());
LongVectorBatch *elements = new LongVectorBatch(512, *getDefaultPool());
batch.fields.push_back(maps);
maps->keys = std::unique_ptr < ColumnVectorBatch > (keys);
maps->elements = std::unique_ptr < ColumnVectorBatch > (elements);
if (encoded) {
reader->nextEncoded(batch, 512, 0);
} else {
reader->next(batch, 512, 0);
}
ASSERT_EQ(512, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(512, maps->numElements);
ASSERT_EQ(true, !maps->hasNulls);
ASSERT_EQ(1024, keys->numElements);
ASSERT_EQ(true, !keys->hasNulls);
ASSERT_EQ(1024, elements->numElements);
ASSERT_EQ(true, !elements->hasNulls);
for (size_t i = 0; i <= batch.numElements; ++i) {
EXPECT_EQ(2 * i, maps->offsets[i]);
}
for (size_t i = 0; i < keys->numElements; ++i) {
EXPECT_EQ(i, keys->data[i]);
EXPECT_EQ(i + 8, elements->data[i]);
}
}
TEST(TestColumnReader, testMapWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(4, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// [0xaa for x in range(2048/8)]
const unsigned char buffer1[] = { 0x7f, 0xaa, 0x7b, 0xaa };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// [0x55 for x in range(2048/8)]
const unsigned char buffer2[] = { 0x7f, 0x55, 0x7b, 0x55 };
EXPECT_CALL(streams, getStreamProxy(3, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// [1 for x in range(260)] +
// [4 for x in range(260)] +
// [0 for x in range(260)] +
// [3 for x in range(243)] +
// [19]
const unsigned char buffer3[] = { 0x7f, 0x00, 0x01,
0x7f, 0x00, 0x01,
0x7f, 0x00, 0x04,
0x7f, 0x00, 0x04,
0x7f, 0x00, 0x00,
0x7f, 0x00, 0x00,
0x7f, 0x00, 0x03,
0x6e, 0x00, 0x03,
0xff, 0x13 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_LENGTH, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer3, ARRAY_SIZE(buffer3))));
// range(2048)
const unsigned char buffer4[] = { 0x7f, 0x01, 0x00,
0x7f, 0x01, 0x84, 0x02,
0x7f, 0x01, 0x88, 0x04,
0x7f, 0x01, 0x8c, 0x06,
0x7f, 0x01, 0x90, 0x08,
0x7f, 0x01, 0x94, 0x0a,
0x7f, 0x01, 0x98, 0x0c,
0x7f, 0x01, 0x9c, 0x0e,
0x7f, 0x01, 0xa0, 0x10,
0x7f, 0x01, 0xa4, 0x12,
0x7f, 0x01, 0xa8, 0x14,
0x7f, 0x01, 0xac, 0x16,
0x7f, 0x01, 0xb0, 0x18,
0x7f, 0x01, 0xb4, 0x1a,
0x7f, 0x01, 0xb8, 0x1c,
0x5f, 0x01, 0xbc, 0x1e };
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer4, ARRAY_SIZE(buffer4))));
// range(8, 1032)
const unsigned char buffer5[] = { 0x7f, 0x01, 0x10,
0x7f, 0x01, 0x94, 0x02,
0x7f, 0x01, 0x98, 0x04,
0x7f, 0x01, 0x9c, 0x06,
0x7f, 0x01, 0xa0, 0x08,
0x7f, 0x01, 0xa4, 0x0a,
0x7f, 0x01, 0xa8, 0x0c,
0x6f, 0x01, 0xac, 0x0e };
EXPECT_CALL(streams, getStreamProxy(3, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer5, ARRAY_SIZE(buffer5))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createMapType(createPrimitiveType(LONG),
createPrimitiveType(LONG)));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(512, *getDefaultPool());
MapVectorBatch *maps = new MapVectorBatch(512, *getDefaultPool());
LongVectorBatch *keys = new LongVectorBatch(512, *getDefaultPool());
LongVectorBatch *elements = new LongVectorBatch(512, *getDefaultPool());
batch.fields.push_back(maps);
maps->keys = std::unique_ptr < ColumnVectorBatch > (keys);
maps->elements = std::unique_ptr < ColumnVectorBatch > (elements);
reader->next(batch, 512, 0);
ASSERT_EQ(512, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(512, maps->numElements);
ASSERT_EQ(true, maps->hasNulls);
ASSERT_EQ(256, keys->numElements);
ASSERT_EQ(true, !keys->hasNulls);
ASSERT_EQ(256, elements->numElements);
ASSERT_EQ(true, elements->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
EXPECT_EQ(i % 2 == 0, maps->notNull[i]) << "Wrong value at " << i;
EXPECT_EQ((i + 1) / 2, maps->offsets[i]) << "Wrong value at " << i;
}
EXPECT_EQ(256, maps->offsets[512]);
for (size_t i = 0; i < keys->numElements; ++i) {
EXPECT_EQ(i, keys->data[i]);
EXPECT_EQ(i & 1, elements->notNull[i]);
if (elements->notNull[i]) {
EXPECT_EQ(i / 2 + 8, elements->data[i]);
}
}
reader->next(batch, 512, 0);
ASSERT_EQ(512, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(512, maps->numElements);
ASSERT_EQ(true, maps->hasNulls);
ASSERT_EQ(1012, keys->numElements);
ASSERT_EQ(true, !keys->hasNulls);
ASSERT_EQ(1012, elements->numElements);
ASSERT_EQ(true, elements->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
EXPECT_EQ(i % 2 == 0, maps->notNull[i]) << "Wrong value at " << i;
if (i < 8) {
EXPECT_EQ((i + 1) / 2, maps->offsets[i])
<< "Wrong value at " << i;
} else {
EXPECT_EQ(4 * ((i + 1) / 2) - 12, maps->offsets[i])
<< "Wrong value at " << i;
}
}
EXPECT_EQ(1012, maps->offsets[512]);
for (size_t i = 0; i < keys->numElements; ++i) {
EXPECT_EQ(256 + i, keys->data[i]);
EXPECT_EQ(i & 1, elements->notNull[i]);
if (elements->notNull[i]) {
EXPECT_EQ(128 + 8 + i / 2, elements->data[i]);
}
}
reader->next(batch, 512, 0);
ASSERT_EQ(512, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(512, maps->numElements);
ASSERT_EQ(true, maps->hasNulls);
ASSERT_EQ(32, keys->numElements);
ASSERT_EQ(true, !keys->hasNulls);
ASSERT_EQ(32, elements->numElements);
ASSERT_EQ(true, elements->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
EXPECT_EQ(i % 2 == 0, maps->notNull[i]) << "Wrong value at " << i;
if (i < 16) {
EXPECT_EQ(4 * ((i + 1) / 2), maps->offsets[i])
<< "Wrong value at " << i;
} else {
EXPECT_EQ(32, maps->offsets[i]) << "Wrong value at " << i;
}
}
EXPECT_EQ(32, maps->offsets[512]);
for (size_t i = 0; i < keys->numElements; ++i) {
EXPECT_EQ(1268 + i, keys->data[i]);
EXPECT_EQ(i & 1, elements->notNull[i]);
if (elements->notNull[i]) {
EXPECT_EQ(634 + 8 + i / 2, elements->data[i]);
}
}
reader->next(batch, 512, 0);
ASSERT_EQ(512, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(512, maps->numElements);
ASSERT_EQ(true, maps->hasNulls);
ASSERT_EQ(748, keys->numElements);
ASSERT_EQ(true, !keys->hasNulls);
ASSERT_EQ(748, elements->numElements);
ASSERT_EQ(true, elements->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
EXPECT_EQ(i % 2 == 0, maps->notNull[i]) << "Wrong value at " << i;
if (i < 24) {
EXPECT_EQ(0, maps->offsets[i]) << "Wrong value at " << i;
} else if (i < 510) {
EXPECT_EQ(3 * ((i - 23) / 2), maps->offsets[i])
<< "Wrong value at " << i;
} else if (i < 511) {
EXPECT_EQ(729, maps->offsets[i]) << "Wrong value at " << i;
} else {
EXPECT_EQ(748, maps->offsets[i]) << "Wrong value at " << i;
}
}
EXPECT_EQ(748, maps->offsets[512]);
for (size_t i = 0; i < keys->numElements; ++i) {
EXPECT_EQ(1300 + i, keys->data[i]);
EXPECT_EQ(i & 1, elements->notNull[i]);
if (elements->notNull[i]) {
EXPECT_EQ(650 + 8 + i / 2, elements->data[i]);
}
}
}
TEST(TestColumnReader, testMapSkipWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(4, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_,proto::Stream_Kind_PRESENT,
true))
.WillRepeatedly(testing::Return(nullptr));
// [0xaa for x in range(2048/8)]
const unsigned char buffer1[] = { 0x7f, 0xaa, 0x7b, 0xaa };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
// [1 for x in range(260)] +
// [4 for x in range(260)] +
// [0 for x in range(260)] +
// [3 for x in range(243)] +
// [19]
const unsigned char buffer2[] = { 0x7f, 0x00, 0x01,
0x7f, 0x00, 0x01,
0x7f, 0x00, 0x04,
0x7f, 0x00, 0x04,
0x7f, 0x00, 0x00,
0x7f, 0x00, 0x00,
0x7f, 0x00, 0x03,
0x6e, 0x00, 0x03,
0xff, 0x13 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_LENGTH, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// range(2048)
const unsigned char buffer3[] = { 0x7f, 0x01, 0x00,
0x7f, 0x01, 0x84, 0x02,
0x7f, 0x01, 0x88, 0x04,
0x7f, 0x01, 0x8c, 0x06,
0x7f, 0x01, 0x90, 0x08,
0x7f, 0x01, 0x94, 0x0a,
0x7f, 0x01, 0x98, 0x0c,
0x7f, 0x01, 0x9c, 0x0e,
0x7f, 0x01, 0xa0, 0x10,
0x7f, 0x01, 0xa4, 0x12,
0x7f, 0x01, 0xa8, 0x14,
0x7f, 0x01, 0xac, 0x16,
0x7f, 0x01, 0xb0, 0x18,
0x7f, 0x01, 0xb4, 0x1a,
0x7f, 0x01, 0xb8, 0x1c,
0x5f, 0x01, 0xbc, 0x1e };
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer3, ARRAY_SIZE(buffer3))));
// range(8, 2056)
const unsigned char buffer4[] = { 0x7f, 0x01, 0x10,
0x7f, 0x01, 0x94, 0x02,
0x7f, 0x01, 0x98, 0x04,
0x7f, 0x01, 0x9c, 0x06,
0x7f, 0x01, 0xa0, 0x08,
0x7f, 0x01, 0xa4, 0x0a,
0x7f, 0x01, 0xa8, 0x0c,
0x7f, 0x01, 0xac, 0x0e,
0x7f, 0x01, 0xb0, 0x10,
0x7f, 0x01, 0xb4, 0x12,
0x7f, 0x01, 0xb8, 0x14,
0x7f, 0x01, 0xbc, 0x16,
0x7f, 0x01, 0xc0, 0x18,
0x7f, 0x01, 0xc4, 0x1a,
0x7f, 0x01, 0xc8, 0x1c,
0x5f, 0x01, 0xcc, 0x1e };
EXPECT_CALL(streams, getStreamProxy(3, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer4, ARRAY_SIZE(buffer4))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createMapType(createPrimitiveType(LONG),
createPrimitiveType(LONG)));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(1, *getDefaultPool());
MapVectorBatch *maps = new MapVectorBatch(1, *getDefaultPool());
LongVectorBatch *keys = new LongVectorBatch(1, *getDefaultPool());
LongVectorBatch *elements = new LongVectorBatch(1, *getDefaultPool());
batch.fields.push_back(maps);
maps->keys = std::unique_ptr < ColumnVectorBatch > (keys);
maps->elements = std::unique_ptr < ColumnVectorBatch > (elements);
reader->next(batch, 1, 0);
ASSERT_EQ(1, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(1, maps->numElements);
ASSERT_EQ(true, !maps->hasNulls);
ASSERT_EQ(1, keys->numElements);
ASSERT_EQ(true, !keys->hasNulls);
ASSERT_EQ(1, elements->numElements);
ASSERT_EQ(true, !elements->hasNulls);
EXPECT_EQ(0, maps->offsets[0]);
EXPECT_EQ(1, maps->offsets[1]);
EXPECT_EQ(0, keys->data[0]);
EXPECT_EQ(8, elements->data[0]);
reader->skip(13);
reader->next(batch, 1, 0);
ASSERT_EQ(1, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(1, maps->numElements);
ASSERT_EQ(true, !maps->hasNulls);
ASSERT_EQ(1, keys->numElements);
ASSERT_EQ(true, !keys->hasNulls);
ASSERT_EQ(1, elements->numElements);
ASSERT_EQ(true, !elements->hasNulls);
EXPECT_EQ(0, maps->offsets[0]);
EXPECT_EQ(1, maps->offsets[1]);
EXPECT_EQ(7, keys->data[0]);
EXPECT_EQ(7 + 8, elements->data[0]);
reader->skip(2031);
reader->next(batch, 2, 0);
ASSERT_EQ(2, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(2, maps->numElements);
ASSERT_EQ(true, maps->hasNulls);
ASSERT_EQ(19, keys->numElements);
ASSERT_EQ(true, !keys->hasNulls);
ASSERT_EQ(19, elements->numElements);
ASSERT_EQ(true, !elements->hasNulls);
EXPECT_EQ(0, maps->offsets[0]);
EXPECT_EQ(19, maps->offsets[1]);
EXPECT_EQ(19, maps->offsets[2]);
for (size_t i = 0; i < keys->numElements; ++i) {
EXPECT_EQ(2029 + i, keys->data[i]);
EXPECT_EQ(2029 + 8 + i, elements->data[i]);
}
}
TEST(TestColumnReader, testMapSkipWithNullsNoData) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
selectedColumns.push_back(false);
selectedColumns.push_back(false);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_,proto::Stream_Kind_PRESENT,
true))
.WillRepeatedly(testing::Return(nullptr));
// [0xaa for x in range(2048/8)]
const unsigned char buffer1[] = { 0x7f, 0xaa, 0x7b, 0xaa };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
// [1 for x in range(260)] +
// [4 for x in range(260)] +
// [0 for x in range(260)] +
// [3 for x in range(243)] +
// [19]
const unsigned char buffer2[] = { 0x7f, 0x00, 0x01,
0x7f, 0x00, 0x01,
0x7f, 0x00, 0x04,
0x7f, 0x00, 0x04,
0x7f, 0x00, 0x00,
0x7f, 0x00, 0x00,
0x7f, 0x00, 0x03,
0x6e, 0x00, 0x03,
0xff, 0x13 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_LENGTH, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createMapType(createPrimitiveType(LONG),
createPrimitiveType(LONG)));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(1, *getDefaultPool());
MapVectorBatch *maps = new MapVectorBatch(1, *getDefaultPool());
batch.fields.push_back(maps);
reader->next(batch, 1, 0);
ASSERT_EQ(1, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(1, maps->numElements);
ASSERT_EQ(true, !maps->hasNulls);
EXPECT_EQ(0, maps->offsets[0]);
EXPECT_EQ(1, maps->offsets[1]);
reader->skip(13);
reader->next(batch, 1, 0);
ASSERT_EQ(1, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(1, maps->numElements);
ASSERT_EQ(true, !maps->hasNulls);
EXPECT_EQ(0, maps->offsets[0]);
EXPECT_EQ(1, maps->offsets[1]);
reader->skip(2031);
reader->next(batch, 2, 0);
ASSERT_EQ(2, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(2, maps->numElements);
ASSERT_EQ(true, maps->hasNulls);
EXPECT_EQ(0, maps->offsets[0]);
EXPECT_EQ(19, maps->offsets[1]);
EXPECT_EQ(19, maps->offsets[2]);
}
TEST(TestColumnReader, testFloatWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// 13 non-nulls followed by 19 nulls
const unsigned char buffer1[] = { 0xfc, 0xff, 0xf8, 0x0, 0x0 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
const float test_vals[] = { 1.0f, 2.5f, -100.125f, 10000.0f, 1.234567E23f,
-2.3456E-12f,
std::numeric_limits<float>::infinity(),
std::numeric_limits<float>::quiet_NaN(),
-std::numeric_limits<float>::infinity(),
std::numeric_limits<float>::max(),
-std::numeric_limits<float>::max(),
1.4e-45f, -1.4e-45f };
const unsigned char buffer2[] = { 0x00, 0x00, 0x80, 0x3f,
0x00, 0x00, 0x20, 0x40,
0x00, 0x40, 0xc8, 0xc2,
0x00, 0x40, 0x1c, 0x46,
0xcf, 0x24, 0xd1, 0x65,
0x93, 0xe, 0x25, 0xac,
0x0, 0x0, 0x80, 0x7f,
0x0, 0x0, 0xc0, 0x7f,
0x0, 0x0, 0x80, 0xff,
0xff, 0xff, 0x7f, 0x7f,
0xff, 0xff, 0x7f, 0xff,
0x1, 0x0, 0x0, 0x0,
0x1, 0x0, 0x0, 0x80};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("myFloat", createPrimitiveType(FLOAT));
std::unique_ptr<ColumnReader> reader =
buildReader(*rowType, streams);
DoubleVectorBatch *doubleBatch = new DoubleVectorBatch(1024,
*getDefaultPool());
StructVectorBatch batch(1024, *getDefaultPool());
batch.fields.push_back(doubleBatch);
reader->next(batch, 32, 0);
ASSERT_EQ(32, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(32, doubleBatch->numElements);
ASSERT_EQ(true, doubleBatch->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
if (i > 12) {
EXPECT_EQ(0, doubleBatch->notNull[i]);
} else if (i == 7) {
EXPECT_EQ(1, doubleBatch->notNull[i]);
EXPECT_EQ(true, std::isnan(doubleBatch->data[i]));
} else {
EXPECT_EQ(1, doubleBatch->notNull[i]);
EXPECT_DOUBLE_EQ(static_cast<double>(test_vals[i]),
doubleBatch->data[i]);
}
}
}
TEST(TestColumnReader, testFloatSkipWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// 2 non-nulls, 2 nulls, 2 non-nulls, 2 nulls
const unsigned char buffer1[] = { 0xff, 0xcc };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
// 1, 2.5, -100.125, 10000
const unsigned char buffer2[] = { 0x00, 0x00, 0x80, 0x3f,
0x00, 0x00, 0x20, 0x40,
0x00, 0x40, 0xc8, 0xc2,
0x00, 0x40, 0x1c, 0x46};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("myFloat", createPrimitiveType(FLOAT));
std::unique_ptr<ColumnReader> reader =
buildReader(*rowType, streams);
DoubleVectorBatch *doubleBatch = new DoubleVectorBatch(1024,
*getDefaultPool());
StructVectorBatch batch(1024, *getDefaultPool());
batch.fields.push_back(doubleBatch);
float test_vals[] = { 1.0, 2.5, -100.125, 10000.0 };
int vals_ix = 0;
reader->next(batch, 3, 0);
ASSERT_EQ(3, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(3, doubleBatch->numElements);
ASSERT_EQ(true, doubleBatch->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
if (i > 1) {
EXPECT_EQ(0, doubleBatch->notNull[i]);
} else {
EXPECT_EQ(1, doubleBatch->notNull[i]);
EXPECT_DOUBLE_EQ(static_cast<double>(test_vals[vals_ix]),
doubleBatch->data[i]);
vals_ix++;
}
}
reader->skip(1);
reader->next(batch, 4, 0);
ASSERT_EQ(4, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(4, doubleBatch->numElements);
ASSERT_EQ(true, doubleBatch->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
if (i > 1) {
EXPECT_EQ(0, doubleBatch->notNull[i]);
} else {
EXPECT_EQ(1, doubleBatch->notNull[i]);
EXPECT_DOUBLE_EQ(static_cast<double>(test_vals[vals_ix]),
doubleBatch->data[i]);
vals_ix++;
}
}
}
TEST(TestColumnReader, testDoubleWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// 13 non-nulls followed by 19 nulls
const unsigned char buffer1[] = { 0xfc, 0xff, 0xf8, 0x0, 0x0 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
const double test_vals[] = { 1.0, 2.0, -2.0, 100.0, 1.23456789E32,
-3.42234E-18,
std::numeric_limits<double>::infinity(),
std::numeric_limits<double>::quiet_NaN(),
-std::numeric_limits<double>::infinity(),
1.7976931348623157e308, -1.7976931348623157E308,
4.9e-324, -4.9e-324 };
const unsigned char buffer2[] = { 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf0, 0x3f,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x40,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xc0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x59, 0x40,
0xe8, 0x38, 0x65, 0x99, 0xf9, 0x58, 0x98,
0x46,
0xa1, 0x88, 0x41, 0x98, 0xc5, 0x90, 0x4f,
0xbc,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf0, 0x7f,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf8, 0x7f,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf0, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xef,
0x7f,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xef,
0xff,
0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x80};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("myDouble", createPrimitiveType(DOUBLE));
std::unique_ptr<ColumnReader> reader =
buildReader(*rowType, streams);
DoubleVectorBatch *doubleBatch = new DoubleVectorBatch(1024,
*getDefaultPool());
StructVectorBatch batch(1024, *getDefaultPool());
batch.fields.push_back(doubleBatch);
reader->next(batch, 32, 0);
ASSERT_EQ(32, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(32, doubleBatch->numElements);
ASSERT_EQ(true, doubleBatch->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
if (i > 12) {
EXPECT_EQ(0, doubleBatch->notNull[i]) << "Wrong value at " << i;
} else if (i == 7) {
EXPECT_EQ(1, doubleBatch->notNull[i]) << "Wrong value at " << i;
EXPECT_EQ(true, std::isnan(doubleBatch->data[i]));
} else {
EXPECT_EQ(1, doubleBatch->notNull[i]) << "Wrong value at " << i;
EXPECT_DOUBLE_EQ(test_vals[i], doubleBatch->data[i])
<< "Wrong value at " << i;
}
}
}
TEST(TestColumnReader, testDoubleSkipWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// 1 non-null, 5 nulls, 2 non-nulls
const unsigned char buffer1[] = { 0xff, 0x83 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
// 1, 2, -2
const unsigned char buffer2[] = { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xf0,
0x3f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0xc0 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("myDouble", createPrimitiveType(DOUBLE));
std::unique_ptr<ColumnReader> reader =
buildReader(*rowType, streams);
DoubleVectorBatch *doubleBatch = new DoubleVectorBatch(1024,
*getDefaultPool());
StructVectorBatch batch(1024, *getDefaultPool());
batch.fields.push_back(doubleBatch);
double test_vals[] = { 1.0, 2.0, -2.0 };
int vals_ix = 0;
reader->next(batch, 2, 0);
ASSERT_EQ(2, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(2, doubleBatch->numElements);
ASSERT_EQ(true, doubleBatch->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
if (i > 0) {
EXPECT_EQ(0, doubleBatch->notNull[i]);
} else {
EXPECT_EQ(1, doubleBatch->notNull[i]);
EXPECT_DOUBLE_EQ(test_vals[vals_ix], doubleBatch->data[i]);
vals_ix++;
}
}
reader->skip(3);
reader->next(batch, 3, 0);
ASSERT_EQ(3, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(3, doubleBatch->numElements);
ASSERT_EQ(true, doubleBatch->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
if (i < 1) {
EXPECT_EQ(0, doubleBatch->notNull[i]);
} else {
EXPECT_EQ(1, doubleBatch->notNull[i]);
EXPECT_DOUBLE_EQ(test_vals[vals_ix], doubleBatch->data[i]);
vals_ix++;
}
}
}
TEST(TestColumnReader, testTimestampSkipWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// 2 non-nulls, 2 nulls, 2 non-nulls, 2 nulls
const unsigned char buffer1[] = { 0xff, 0xcc };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
const unsigned char buffer2[] = { 0xfc, 0xbb, 0xb5, 0xbe, 0x31, 0xa1, 0xee,
0xe2, 0x10, 0xf8, 0x92, 0xee, 0xf, 0x92,
0xa0, 0xd4, 0x30 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
const unsigned char buffer3[] = { 0x1, 0x8, 0x5e };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_SECONDARY, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer3, ARRAY_SIZE(buffer3))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("myTimestamp", createPrimitiveType(TIMESTAMP));
std::unique_ptr<ColumnReader> reader =
buildReader(*rowType, streams);
TimestampVectorBatch *longBatch =
new TimestampVectorBatch(1024, *getDefaultPool());
StructVectorBatch batch(1024, *getDefaultPool());
batch.fields.push_back(longBatch);
const char *(expected[]) = {"Fri May 10 10:40:50 2013\n",
"Wed Jun 11 11:41:51 2014\n",
"Sun Jul 12 12:42:52 2015\n",
"Sat Aug 13 13:43:53 2016\n"};
int64_t expected_nano[] = {110000000,
120000000,
130000000,
140000000};
int vals_ix = 0;
reader->next(batch, 3, 0);
ASSERT_EQ(3, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(3, longBatch->numElements);
ASSERT_EQ(true, longBatch->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
if (i > 1) {
EXPECT_EQ(0, longBatch->notNull[i]);
} else {
EXPECT_EQ(1, longBatch->notNull[i]);
time_t time = static_cast<time_t>(longBatch->data[i]);
tm timeStruct;
ASSERT_PRED1(isNotNull, gmtime_r(&time, &timeStruct));
char buffer[30];
asctime_r(&timeStruct, buffer);
EXPECT_STREQ(expected[vals_ix], buffer);
EXPECT_EQ(expected_nano[vals_ix], longBatch->nanoseconds[i]);
vals_ix++;
}
}
reader->skip(1);
reader->next(batch, 4, 0);
ASSERT_EQ(4, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(4, longBatch->numElements);
ASSERT_EQ(true, longBatch->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
if (i > 1) {
EXPECT_EQ(0, longBatch->notNull[i]);
} else {
EXPECT_EQ(1, longBatch->notNull[i]);
time_t time = static_cast<time_t>(longBatch->data[i]);
tm timeStruct;
ASSERT_PRED1(isNotNull, gmtime_r(&time, &timeStruct));
char buffer[30];
asctime_r(&timeStruct, buffer);
EXPECT_STREQ(expected[vals_ix], buffer);
EXPECT_EQ(expected_nano[vals_ix], longBatch->nanoseconds[i]);
vals_ix++;
}
}
}
TEST(TestColumnReader, testTimestamp) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_,
proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
const unsigned char buffer1[] = { 0xf6,
0x9f, 0xf4, 0xc6, 0xbd, 0x03,
0xff, 0xec, 0xf3, 0xbc, 0x03,
0xff, 0xb1, 0xf8, 0x84, 0x1b,
0x9d, 0x86, 0xd7, 0xfa, 0x1a,
0x9d, 0xb8, 0xcd, 0xdc, 0x1a,
0x9d, 0xea, 0xc3, 0xbe, 0x1a,
0x9d, 0x9c, 0xba, 0xa0, 0x1a,
0x9d, 0x88, 0xa6, 0x82, 0x1a,
0x9d, 0xba, 0x9c, 0xe4, 0x19,
0x9d, 0xee, 0xe1, 0xcd, 0x18 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
const unsigned char buffer2[] = { 0xf6,
0x00,
0xa8, 0xd1, 0xf9, 0xd6, 0x03,
0x00,
0x9e, 0x01,
0xec, 0x76,
0xf4, 0x76,
0xfc, 0x76,
0x84, 0x77,
0x8c, 0x77,
0xfd, 0x0b};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_SECONDARY, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("myTimestamp", createPrimitiveType(TIMESTAMP));
std::unique_ptr<ColumnReader> reader =
buildReader(*rowType, streams);
TimestampVectorBatch *longBatch =
new TimestampVectorBatch(1024, *getDefaultPool());
StructVectorBatch batch(1024, *getDefaultPool());
batch.fields.push_back(longBatch);
const char *(expected[]) = {"Sun Mar 12 15:00:00 2000\n",
"Mon Mar 20 12:00:00 2000\n",
"Mon Jan 1 00:00:00 1900\n",
"Sat May 5 12:34:56 1900\n",
"Sun May 5 12:34:56 1901\n",
"Mon May 5 12:34:56 1902\n",
"Tue May 5 12:34:56 1903\n",
"Thu May 5 12:34:56 1904\n",
"Fri May 5 12:34:56 1905\n",
"Thu May 5 12:34:56 1910\n"};
const int64_t expectedNano[] = {0,
123456789,
0,
190000000,
190100000,
190200000,
190300000,
190400000,
190500000,
191000000};
reader->next(batch, 10, 0);
ASSERT_EQ(10, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(10, longBatch->numElements);
ASSERT_EQ(true, !longBatch->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
time_t time = static_cast<time_t>(longBatch->data[i]);
EXPECT_EQ(expectedNano[i], longBatch->nanoseconds[i]);
#ifndef HAS_PRE_1970
if (time < 0) continue;
#endif
tm timeStruct;
ASSERT_PRED1(isNotNull, gmtime_r(&time, &timeStruct));
char buffer[30];
asctime_r(&timeStruct, buffer);
EXPECT_STREQ(expected[i], buffer) << "Wrong value at " << i;
}
}
TEST(DecimalColumnReader, testDecimal64) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_, proto::Stream_Kind_PRESENT,
true))
.WillRepeatedly(testing::Return(nullptr));
// [0xff] * (64/8) + [0x00] * (56/8) + [0x01]
const unsigned char buffer1[] = { 0x05, 0xff, 0x04, 0x00, 0xff, 0x01 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
char numBuffer[65];
for(int i=0; i < 65; ++i) {
if (i < 32) {
numBuffer[i] = static_cast<char>(0x3f - 2*i);
} else {
numBuffer[i] = static_cast<char>(2*(i - 32));
}
}
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(numBuffer, ARRAY_SIZE(numBuffer), 3)));
// [0x02] * 65
const unsigned char buffer2[] = { 0x3e, 0x00, 0x04 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_SECONDARY, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createDecimalType(12, 2));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(64, *getDefaultPool());
Decimal64VectorBatch *decimals =
new Decimal64VectorBatch(64, *getDefaultPool());
batch.fields.push_back(decimals);
reader->next(batch, 64, 0);
EXPECT_EQ(true, !batch.hasNulls);
EXPECT_EQ(64, batch.numElements);
EXPECT_EQ(true, !decimals->hasNulls);
EXPECT_EQ(64, decimals->numElements);
EXPECT_EQ(2, decimals->scale);
int64_t *values = decimals->values.data();
for(int64_t i = 0; i < 64; ++i) {
EXPECT_EQ(i - 32, values[i]);
}
reader->next(batch, 64, 0);
EXPECT_EQ(true, !batch.hasNulls);
EXPECT_EQ(64, batch.numElements);
EXPECT_EQ(true, decimals->hasNulls);
EXPECT_EQ(64, decimals->numElements);
for(size_t i=0; i < 63; ++i) {
EXPECT_EQ(0, decimals->notNull[i]);
}
EXPECT_EQ(1, decimals->notNull[63]);
EXPECT_EQ(32, decimals->values.data()[63]);
}
TEST(DecimalColumnReader, testDecimal64Skip) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_, proto::Stream_Kind_PRESENT,
true))
.WillRepeatedly(testing::Return(nullptr));
// [0xff]
unsigned char presentBuffer[] = {0xfe, 0xff, 0x80};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(presentBuffer,
ARRAY_SIZE(presentBuffer))));
// [493827160549382716, 4938271605493827, 49382716054938, 493827160549,
// 4938271605, 49382716, 493827, 4938, 49]
const unsigned char numBuffer[] =
{ 0xf8, 0xe8, 0xe2, 0xcf, 0xf4, 0xcb, 0xb6, 0xda, 0x0d,
0x86, 0xc1, 0xcc, 0xcd, 0x9e, 0xd5, 0xc5, 0x11,
0xb4, 0xf6, 0xfc, 0xf3, 0xb9, 0xba, 0x16,
0xca, 0xe7, 0xa3, 0xa6, 0xdf, 0x1c,
0xea, 0xad, 0xc0, 0xe5, 0x24,
0xf8, 0x94, 0x8c, 0x2f,
0x86, 0xa4, 0x3c,
0x94, 0x4d,
0x62 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(numBuffer, ARRAY_SIZE(numBuffer))));
// [0x0a] * 9
const unsigned char buffer1[] = { 0x06, 0x00, 0x14 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_SECONDARY, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createDecimalType(12, 10));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(64, *getDefaultPool());
Decimal64VectorBatch *decimals =
new Decimal64VectorBatch(64, *getDefaultPool());
batch.fields.push_back(decimals);
reader->next(batch, 6, 0);
EXPECT_EQ(true, !batch.hasNulls);
EXPECT_EQ(6, batch.numElements);
EXPECT_EQ(true, !decimals->hasNulls);
EXPECT_EQ(6, decimals->numElements);
EXPECT_EQ(10, decimals->scale);
int64_t *values = decimals->values.data();
EXPECT_EQ(493827160549382716, values[0]);
EXPECT_EQ(4938271605493827, values[1]);
EXPECT_EQ(49382716054938, values[2]);
EXPECT_EQ(493827160549, values[3]);
EXPECT_EQ(4938271605, values[4]);
EXPECT_EQ(49382716, values[5]);
reader->skip(2);
reader->next(batch, 1, 0);
EXPECT_EQ(true, !batch.hasNulls);
EXPECT_EQ(1, batch.numElements);
EXPECT_EQ(true, !decimals->hasNulls);
EXPECT_EQ(1, decimals->numElements);
EXPECT_EQ(49, values[0]);
}
TEST(DecimalColumnReader, testDecimal128) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_, proto::Stream_Kind_PRESENT,
true))
.WillRepeatedly(testing::Return(nullptr));
// [0xff] * (64/8) + [0x00] * (56/8) + [0x01]
const unsigned char buffer1[] = { 0x05, 0xff, 0x04, 0x00, 0xff, 0x01 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
char numBuffer[65];
for(int i=0; i < 65; ++i) {
if (i < 32) {
numBuffer[i] = static_cast<char>(0x3f - 2*i);
} else {
numBuffer[i] = static_cast<char>(2*(i - 32));
}
}
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(numBuffer, ARRAY_SIZE(numBuffer), 3)));
// [0x02] * 65
const unsigned char buffer2[] = { 0x3e, 0x00, 0x04 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_SECONDARY, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createDecimalType(32, 2));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(64, *getDefaultPool());
Decimal128VectorBatch *decimals =
new Decimal128VectorBatch(64, *getDefaultPool());
batch.fields.push_back(decimals);
reader->next(batch, 64, 0);
EXPECT_EQ(true, !batch.hasNulls);
EXPECT_EQ(64, batch.numElements);
EXPECT_EQ(true, !decimals->hasNulls);
EXPECT_EQ(64, decimals->numElements);
EXPECT_EQ(2, decimals->scale);
Int128 *values = decimals->values.data();
for(int64_t i = 0; i < 64; ++i) {
EXPECT_EQ(i - 32, values[i].toLong());
}
reader->next(batch, 64, 0);
EXPECT_EQ(true, !batch.hasNulls);
EXPECT_EQ(64, batch.numElements);
EXPECT_EQ(true, decimals->hasNulls);
EXPECT_EQ(64, decimals->numElements);
for(size_t i=0; i < 63; ++i) {
EXPECT_EQ(0, decimals->notNull[i]);
}
EXPECT_EQ(1, decimals->notNull[63]);
EXPECT_EQ(32, decimals->values.data()[63].toLong());
}
TEST(DecimalColumnReader, testDecimal128Skip) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_, proto::Stream_Kind_PRESENT,
true))
.WillRepeatedly(testing::Return(nullptr));
// [0xff, 0xf8]
unsigned char presentBuffer[] = {0xfe, 0xff, 0xf8};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(presentBuffer,
ARRAY_SIZE(presentBuffer))));
// [493827160549382716, 4938271605493827, 49382716054938, 493827160549,
// 4938271605, 49382716, 493827, 4938, 49,
// 17320508075688772935274463415058723669,
// -17320508075688772935274463415058723669,
// 99999999999999999999999999999999999999,
// -99999999999999999999999999999999999999]
const unsigned char numBuffer[] =
{ 0xf8, 0xe8, 0xe2, 0xcf, 0xf4, 0xcb, 0xb6, 0xda, 0x0d,
0x86, 0xc1, 0xcc, 0xcd, 0x9e, 0xd5, 0xc5, 0x11,
0xb4, 0xf6, 0xfc, 0xf3, 0xb9, 0xba, 0x16,
0xca, 0xe7, 0xa3, 0xa6, 0xdf, 0x1c,
0xea, 0xad, 0xc0, 0xe5, 0x24,
0xf8, 0x94, 0x8c, 0x2f,
0x86, 0xa4, 0x3c,
0x94, 0x4d,
0x62,
0xaa, 0xcd, 0xb3, 0xf2, 0x9e, 0xf0, 0x99, 0xd6, 0xbe, 0xf8, 0xb6,
0x9e, 0xe4, 0xb7, 0xfd, 0xce, 0x8f, 0x34,
0xa9, 0xcd, 0xb3, 0xf2, 0x9e, 0xf0, 0x99, 0xd6, 0xbe, 0xf8, 0xb6,
0x9e, 0xe4, 0xb7, 0xfd, 0xce, 0x8f, 0x34,
0xfe, 0xff, 0xff, 0xff, 0xff, 0x8f, 0x91, 0x8a, 0x93, 0xe8, 0xa3,
0xec, 0xd0, 0x96, 0xd4, 0xcc, 0xf6, 0xac, 0x02,
0xfd, 0xff, 0xff, 0xff, 0xff, 0x8f, 0x91, 0x8a, 0x93, 0xe8, 0xa3,
0xec, 0xd0, 0x96, 0xd4, 0xcc, 0xf6, 0xac, 0x02,
};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(numBuffer, ARRAY_SIZE(numBuffer))));
// [0x02] * 13
unsigned char buffer2[] = { 0x0a, 0x00, 0x4a };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_SECONDARY, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createDecimalType(38, 37));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(64, *getDefaultPool());
Decimal128VectorBatch *decimals =
new Decimal128VectorBatch(64, *getDefaultPool());
batch.fields.push_back(decimals);
reader->next(batch, 6, 0);
EXPECT_EQ(true, !batch.hasNulls);
EXPECT_EQ(6, batch.numElements);
EXPECT_EQ(true, !decimals->hasNulls);
EXPECT_EQ(6, decimals->numElements);
EXPECT_EQ(37, decimals->scale);
Int128 *values = decimals->values.data();
EXPECT_EQ(493827160549382716, values[0].toLong());
EXPECT_EQ(4938271605493827, values[1].toLong());
EXPECT_EQ(49382716054938, values[2].toLong());
EXPECT_EQ(493827160549, values[3].toLong());
EXPECT_EQ(4938271605, values[4].toLong());
EXPECT_EQ(49382716, values[5].toLong());
reader->skip(2);
reader->next(batch, 5, 0);
EXPECT_EQ(true, !batch.hasNulls);
EXPECT_EQ(5, batch.numElements);
EXPECT_EQ(true, !decimals->hasNulls);
EXPECT_EQ(5, decimals->numElements);
EXPECT_EQ(49, values[0].toLong());
EXPECT_EQ("1.7320508075688772935274463415058723669",
values[1].toDecimalString(decimals->scale));
EXPECT_EQ("-1.7320508075688772935274463415058723669",
values[2].toDecimalString(decimals->scale));
EXPECT_EQ("9.9999999999999999999999999999999999999",
values[3].toDecimalString(decimals->scale));
EXPECT_EQ("-9.9999999999999999999999999999999999999",
values[4].toDecimalString(decimals->scale));
}
TEST(DecimalColumnReader, testDecimalHive11) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
EXPECT_CALL(streams, getThrowOnHive11DecimalOverflow())
.WillRepeatedly(testing::Return(true));
EXPECT_CALL(streams, getForcedScaleOnHive11Decimal())
.WillRepeatedly(testing::Return(6));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_, proto::Stream_Kind_PRESENT,
true))
.WillRepeatedly(testing::Return(nullptr));
// [0xff] * (64/8) + [0x00] * (56/8) + [0x01]
const unsigned char buffer1[] = { 0x05, 0xff, 0x04, 0x00, 0xff, 0x01 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
char numBuffer[65];
for(int i=0; i < 65; ++i) {
if (i < 32) {
numBuffer[i] = static_cast<char>(0x3f - 2*i);
} else {
numBuffer[i] = static_cast<char>(2*(i - 32));
}
}
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(numBuffer, ARRAY_SIZE(numBuffer), 3)));
const unsigned char scaleBuffer[] = {0x3e, 0x00, 0x0c};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_SECONDARY, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(scaleBuffer, ARRAY_SIZE(scaleBuffer))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createDecimalType(0, 0));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(64, *getDefaultPool());
Decimal128VectorBatch *decimals =
new Decimal128VectorBatch(64, *getDefaultPool());
batch.fields.push_back(decimals);
reader->next(batch, 64, 0);
EXPECT_EQ(true, !batch.hasNulls);
EXPECT_EQ(64, batch.numElements);
EXPECT_EQ(true, !decimals->hasNulls);
EXPECT_EQ(64, decimals->numElements);
EXPECT_EQ(6, decimals->scale);
Int128 *values = decimals->values.data();
for(int64_t i = 0; i < 64; ++i) {
EXPECT_EQ(i - 32, values[i].toLong());
}
reader->next(batch, 64, 0);
EXPECT_EQ(true, !batch.hasNulls);
EXPECT_EQ(64, batch.numElements);
EXPECT_EQ(true, decimals->hasNulls);
EXPECT_EQ(64, decimals->numElements);
for(size_t i=0; i < 63; ++i) {
EXPECT_EQ(0, decimals->notNull[i]);
}
EXPECT_EQ(1, decimals->notNull[63]);
EXPECT_EQ(32, decimals->values.data()[63].toLong());
}
TEST(DecimalColumnReader, testDecimalHive11Skip) {
MockStripeStreams streams;
EXPECT_CALL(streams, getThrowOnHive11DecimalOverflow())
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(streams, getForcedScaleOnHive11Decimal())
.WillRepeatedly(testing::Return(3));
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_, proto::Stream_Kind_PRESENT,
true))
.WillRepeatedly(testing::Return(nullptr));
// [0xff, 0xf8]
unsigned char presentBuffer[] = {0xfe, 0xff, 0xf8};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(presentBuffer,
ARRAY_SIZE(presentBuffer))));
// [493827160549382716, 4938271605493827, 49382716054938, 493827160549,
// 4938271605, 49382716, 493827, 4938, 49,
// 17320508075688772935274463415058723669,
// -17320508075688772935274463415058723669,
// 99999999999999999999999999999999999999,
// -99999999999999999999999999999999999999]
const unsigned char numBuffer[] =
{ 0xf8, 0xe8, 0xe2, 0xcf, 0xf4, 0xcb, 0xb6, 0xda, 0x0d,
0x86, 0xc1, 0xcc, 0xcd, 0x9e, 0xd5, 0xc5, 0x11,
0xb4, 0xf6, 0xfc, 0xf3, 0xb9, 0xba, 0x16,
0xca, 0xe7, 0xa3, 0xa6, 0xdf, 0x1c,
0xea, 0xad, 0xc0, 0xe5, 0x24,
0xf8, 0x94, 0x8c, 0x2f,
0x86, 0xa4, 0x3c,
0x94, 0x4d,
0x62,
0xaa, 0xcd, 0xb3, 0xf2, 0x9e, 0xf0, 0x99, 0xd6, 0xbe, 0xf8, 0xb6,
0x9e, 0xe4, 0xb7, 0xfd, 0xce, 0x8f, 0x34,
0xa9, 0xcd, 0xb3, 0xf2, 0x9e, 0xf0, 0x99, 0xd6, 0xbe, 0xf8, 0xb6,
0x9e, 0xe4, 0xb7, 0xfd, 0xce, 0x8f, 0x34,
0xfe, 0xff, 0xff, 0xff, 0xff, 0x8f, 0x91, 0x8a, 0x93, 0xe8, 0xa3,
0xec, 0xd0, 0x96, 0xd4, 0xcc, 0xf6, 0xac, 0x02,
0xfd, 0xff, 0xff, 0xff, 0xff, 0x8f, 0x91, 0x8a, 0x93, 0xe8, 0xa3,
0xec, 0xd0, 0x96, 0xd4, 0xcc, 0xf6, 0xac, 0x02,
};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(numBuffer, ARRAY_SIZE(numBuffer))));
const unsigned char scaleBuffer[] = { 0x0a, 0x00, 0x06};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_SECONDARY, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(scaleBuffer, ARRAY_SIZE(scaleBuffer))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createDecimalType(0, 0));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(64, *getDefaultPool());
Decimal128VectorBatch *decimals =
new Decimal128VectorBatch(64, *getDefaultPool());
batch.fields.push_back(decimals);
reader->next(batch, 6, 0);
EXPECT_EQ(true, !batch.hasNulls);
EXPECT_EQ(6, batch.numElements);
EXPECT_EQ(true, !decimals->hasNulls);
EXPECT_EQ(6, decimals->numElements);
EXPECT_EQ(3, decimals->scale);
Int128 *values = decimals->values.data();
EXPECT_EQ(493827160549382716, values[0].toLong());
EXPECT_EQ(4938271605493827, values[1].toLong());
EXPECT_EQ(49382716054938, values[2].toLong());
EXPECT_EQ(493827160549, values[3].toLong());
EXPECT_EQ(4938271605, values[4].toLong());
EXPECT_EQ(49382716, values[5].toLong());
reader->skip(2);
reader->next(batch, 5, 0);
EXPECT_EQ(true, !batch.hasNulls);
EXPECT_EQ(5, batch.numElements);
EXPECT_EQ(true, !decimals->hasNulls);
EXPECT_EQ(5, decimals->numElements);
EXPECT_EQ(49, values[0].toLong());
EXPECT_EQ("17320508075688772935274463415058723.669",
values[1].toDecimalString(decimals->scale));
EXPECT_EQ("-17320508075688772935274463415058723.669",
values[2].toDecimalString(decimals->scale));
EXPECT_EQ("99999999999999999999999999999999999.999",
values[3].toDecimalString(decimals->scale));
EXPECT_EQ("-99999999999999999999999999999999999.999",
values[4].toDecimalString(decimals->scale));
}
TEST(DecimalColumnReader, testDecimalHive11ScaleUp) {
MockStripeStreams streams;
EXPECT_CALL(streams, getThrowOnHive11DecimalOverflow())
.WillRepeatedly(testing::Return(true));
EXPECT_CALL(streams, getForcedScaleOnHive11Decimal())
.WillRepeatedly(testing::Return(20));
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_, proto::Stream_Kind_PRESENT,
true))
.WillRepeatedly(testing::Return(nullptr));
// [0xff, 0xff, 0xf8]
const unsigned char presentBuffer[] = {0xfd, 0xff, 0xff, 0xf8};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(presentBuffer,
ARRAY_SIZE(presentBuffer))));
// [1] * 21
const unsigned char numBuffer[] = {0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(numBuffer, ARRAY_SIZE(numBuffer))));
const unsigned char scaleBuffer[] = { 0x12, 0xff, 0x28};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_SECONDARY, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(scaleBuffer, ARRAY_SIZE(scaleBuffer))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createDecimalType(0, 0));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(64, *getDefaultPool());
Decimal128VectorBatch *decimals =
new Decimal128VectorBatch(64, *getDefaultPool());
batch.fields.push_back(decimals);
reader->next(batch, 21, 0);
EXPECT_EQ(true, !batch.hasNulls);
EXPECT_EQ(21, batch.numElements);
EXPECT_EQ(true, !decimals->hasNulls);
EXPECT_EQ(21, decimals->numElements);
EXPECT_EQ(20, decimals->scale);
Int128 *values = decimals->values.data();
Int128 expected = 1;
for(int i = 0; i < 21; ++i) {
EXPECT_EQ(expected.toString(), values[i].toString());
expected *= 10;
}
}
TEST(DecimalColumnReader, testDecimalHive11ScaleDown) {
MockStripeStreams streams;
EXPECT_CALL(streams, getThrowOnHive11DecimalOverflow())
.WillRepeatedly(testing::Return(true));
EXPECT_CALL(streams, getForcedScaleOnHive11Decimal())
.WillRepeatedly(testing::Return(0));
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_, proto::Stream_Kind_PRESENT,
true))
.WillRepeatedly(testing::Return(nullptr));
// [0xff, 0xff, 0xf8]
const unsigned char presentBuffer[] = {0xfd, 0xff, 0xff, 0xf8};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(presentBuffer,
ARRAY_SIZE(presentBuffer))));
// [100000000000000000000] * 21
const unsigned char numBuffer[] = {
0x80, 0x80, 0x80, 0xb1, 0xac, 0x8b, 0xaf, 0xc7, 0xd7, 0x15,
0x80, 0x80, 0x80, 0xb1, 0xac, 0x8b, 0xaf, 0xc7, 0xd7, 0x15,
0x80, 0x80, 0x80, 0xb1, 0xac, 0x8b, 0xaf, 0xc7, 0xd7, 0x15,
0x80, 0x80, 0x80, 0xb1, 0xac, 0x8b, 0xaf, 0xc7, 0xd7, 0x15,
0x80, 0x80, 0x80, 0xb1, 0xac, 0x8b, 0xaf, 0xc7, 0xd7, 0x15,
0x80, 0x80, 0x80, 0xb1, 0xac, 0x8b, 0xaf, 0xc7, 0xd7, 0x15,
0x80, 0x80, 0x80, 0xb1, 0xac, 0x8b, 0xaf, 0xc7, 0xd7, 0x15,
0x80, 0x80, 0x80, 0xb1, 0xac, 0x8b, 0xaf, 0xc7, 0xd7, 0x15,
0x80, 0x80, 0x80, 0xb1, 0xac, 0x8b, 0xaf, 0xc7, 0xd7, 0x15,
0x80, 0x80, 0x80, 0xb1, 0xac, 0x8b, 0xaf, 0xc7, 0xd7, 0x15,
0x80, 0x80, 0x80, 0xb1, 0xac, 0x8b, 0xaf, 0xc7, 0xd7, 0x15,
0x80, 0x80, 0x80, 0xb1, 0xac, 0x8b, 0xaf, 0xc7, 0xd7, 0x15,
0x80, 0x80, 0x80, 0xb1, 0xac, 0x8b, 0xaf, 0xc7, 0xd7, 0x15,
0x80, 0x80, 0x80, 0xb1, 0xac, 0x8b, 0xaf, 0xc7, 0xd7, 0x15,
0x80, 0x80, 0x80, 0xb1, 0xac, 0x8b, 0xaf, 0xc7, 0xd7, 0x15,
0x80, 0x80, 0x80, 0xb1, 0xac, 0x8b, 0xaf, 0xc7, 0xd7, 0x15,
0x80, 0x80, 0x80, 0xb1, 0xac, 0x8b, 0xaf, 0xc7, 0xd7, 0x15,
0x80, 0x80, 0x80, 0xb1, 0xac, 0x8b, 0xaf, 0xc7, 0xd7, 0x15,
0x80, 0x80, 0x80, 0xb1, 0xac, 0x8b, 0xaf, 0xc7, 0xd7, 0x15,
0x80, 0x80, 0x80, 0xb1, 0xac, 0x8b, 0xaf, 0xc7, 0xd7, 0x15,
0x80, 0x80, 0x80, 0xb1, 0xac, 0x8b, 0xaf, 0xc7, 0xd7, 0x15};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(numBuffer, ARRAY_SIZE(numBuffer))));
const unsigned char scaleBuffer[] = { 0x12, 0x01, 0x00};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_SECONDARY, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(scaleBuffer, ARRAY_SIZE(scaleBuffer))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createDecimalType(0, 0));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(64, *getDefaultPool());
Decimal128VectorBatch *decimals =
new Decimal128VectorBatch(64, *getDefaultPool());
batch.fields.push_back(decimals);
reader->next(batch, 21, 0);
EXPECT_EQ(true, !batch.hasNulls);
EXPECT_EQ(21, batch.numElements);
EXPECT_EQ(true, !decimals->hasNulls);
EXPECT_EQ(21, decimals->numElements);
EXPECT_EQ(0, decimals->scale);
Int128 *values = decimals->values.data();
Int128 expected = Int128(0x5, 0x6bc75e2d63100000);
Int128 remainder;
for(int i = 0; i < 21; ++i) {
EXPECT_EQ(expected.toString(), values[i].toString());
expected = expected.divide(10, remainder);
}
}
TEST(DecimalColumnReader, testDecimalHive11OverflowException) {
MockStripeStreams streams;
EXPECT_CALL(streams, getThrowOnHive11DecimalOverflow())
.WillRepeatedly(testing::Return(true));
EXPECT_CALL(streams, getForcedScaleOnHive11Decimal())
.WillRepeatedly(testing::Return(6));
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_, proto::Stream_Kind_PRESENT,
true))
.WillRepeatedly(testing::Return(nullptr));
// [0x80]
const unsigned char presentBuffer[] = {0xff, 0x80};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(presentBuffer,
ARRAY_SIZE(presentBuffer))));
// [10000000000000000000]
const unsigned char numBuffer[] =
{0x80, 0x80, 0x80, 0x80, 0x80, 0x90, 0x91, 0x8a,
0x93, 0xe8, 0xa3, 0xec, 0xd0, 0x96, 0xd4, 0xcc,
0xf6, 0xac, 0x02};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(numBuffer, ARRAY_SIZE(numBuffer))));
const unsigned char scaleBuffer[] = { 0xff, 0x0c};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_SECONDARY, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(scaleBuffer, ARRAY_SIZE(scaleBuffer))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createDecimalType(0, 0));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(64, *getDefaultPool());
Decimal128VectorBatch *decimals =
new Decimal128VectorBatch(64, *getDefaultPool());
batch.fields.push_back(decimals);
EXPECT_THROW(reader->next(batch, 1, 0), ParseError);
}
TEST(DecimalColumnReader, testDecimalHive11OverflowExceptionNull) {
MockStripeStreams streams;
EXPECT_CALL(streams, getThrowOnHive11DecimalOverflow())
.WillRepeatedly(testing::Return(true));
EXPECT_CALL(streams, getForcedScaleOnHive11Decimal())
.WillRepeatedly(testing::Return(6));
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_, proto::Stream_Kind_PRESENT,
true))
.WillRepeatedly(testing::Return(nullptr));
// [0x40]
const unsigned char presentBuffer[] = {0xff, 0x40};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(presentBuffer,
ARRAY_SIZE(presentBuffer))));
// [10000000000000000000]
const unsigned char numBuffer[] =
{0x80, 0x80, 0x80, 0x80, 0x80, 0x90, 0x91, 0x8a,
0x93, 0xe8, 0xa3, 0xec, 0xd0, 0x96, 0xd4, 0xcc,
0xf6, 0xac, 0x02};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(numBuffer, ARRAY_SIZE(numBuffer))));
const unsigned char scaleBuffer[] = { 0xff, 0x0c};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_SECONDARY, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(scaleBuffer, ARRAY_SIZE(scaleBuffer))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createDecimalType(0, 0));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(64, *getDefaultPool());
Decimal128VectorBatch *decimals =
new Decimal128VectorBatch(64, *getDefaultPool());
batch.fields.push_back(decimals);
EXPECT_THROW(reader->next(batch, 2, 0), ParseError);
}
TEST(DecimalColumnReader, testDecimalHive11OverflowNull) {
MockStripeStreams streams;
std::stringstream errStream;
EXPECT_CALL(streams, getErrorStream())
.WillRepeatedly(testing::Return(&errStream));
EXPECT_CALL(streams, getThrowOnHive11DecimalOverflow())
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(streams, getForcedScaleOnHive11Decimal())
.WillRepeatedly(testing::Return(6));
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_, proto::Stream_Kind_PRESENT,
true))
.WillRepeatedly(testing::Return(nullptr));
// [0x78]
unsigned char presentBuffer[] = {0xff, 0x78};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(presentBuffer, sizeof(presentBuffer))));
// [1000000000000000000000000000000000000000, 1,
// -10000000000000000000000000000000000000, 1]
unsigned char numBuffer[] = { 0x80, 0x80, 0x80, 0x80, 0x80, 0xc0, 0xb0, 0xf5,
0xf3, 0xae, 0xfd, 0xcb, 0x94, 0xd7, 0xe1, 0xf1,
0xd3, 0x8c, 0xeb, 0x01,
0x02,
0xff, 0xff, 0xff, 0xff, 0xff, 0x8f, 0x91, 0x8a,
0x93, 0xe8, 0xa3, 0xec, 0xd0, 0x96, 0xd4, 0xcc,
0xf6, 0xac, 0x02,
0x02};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(numBuffer, ARRAY_SIZE(numBuffer))));
const unsigned char scaleBuffer[] = { 0x01, 0x00, 0x0c};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_SECONDARY, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(scaleBuffer, ARRAY_SIZE(scaleBuffer))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createDecimalType(0, 0));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(64, *getDefaultPool());
Decimal128VectorBatch *decimals =
new Decimal128VectorBatch(64, *getDefaultPool());
batch.fields.push_back(decimals);
reader->next(batch, 3, 0);
EXPECT_EQ(true, !batch.hasNulls);
EXPECT_EQ(3, batch.numElements);
EXPECT_EQ(true, decimals->hasNulls);
EXPECT_EQ(3, decimals->numElements);
EXPECT_EQ(6, decimals->scale);
EXPECT_EQ(true, !decimals->notNull[0]);
EXPECT_EQ(true, !decimals->notNull[1]);
EXPECT_EQ(true, decimals->notNull[2]);
EXPECT_EQ(1, decimals->values[2].toLong());
reader->next(batch, 2, 0);
EXPECT_EQ(true, !batch.hasNulls);
EXPECT_EQ(2, batch.numElements);
EXPECT_EQ(true, decimals->hasNulls);
EXPECT_EQ(2, decimals->numElements);
EXPECT_EQ(6, decimals->scale);
EXPECT_EQ(true, !decimals->notNull[0]);
EXPECT_EQ(true, decimals->notNull[1]);
EXPECT_EQ(1, decimals->values[1].toLong());
EXPECT_EQ("Warning: Hive 0.11 decimal with more than 38 digits"
" replaced by NULL.\n"
"Warning: Hive 0.11 decimal with more than 38 digits"
" replaced by NULL.\n", errStream.str());
}
TEST(DecimalColumnReader, testDecimalHive11BigBatches) {
MockStripeStreams streams;
EXPECT_CALL(streams, getThrowOnHive11DecimalOverflow())
.WillRepeatedly(testing::Return(true));
EXPECT_CALL(streams, getForcedScaleOnHive11Decimal())
.WillRepeatedly(testing::Return(6));
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_, proto::Stream_Kind_PRESENT,
true))
.WillRepeatedly(testing::Return(nullptr));
// range(64) * 32
unsigned char numBuffer[2048];
for(size_t i=0; i < 2048; ++i) {
numBuffer[i] = static_cast<unsigned char>((i % 64) * 2);
}
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(numBuffer, ARRAY_SIZE(numBuffer))));
// [5] * 1024 + [4] * 1024
unsigned char scaleBuffer[48];
for(size_t i=0; i < 48; i += 3) {
scaleBuffer[i] = 0x7d;
scaleBuffer[i + 1] = 0x00;
scaleBuffer[i + 2] = (i < 24) ? 0x0a : 0x08;
}
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_SECONDARY, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(scaleBuffer, ARRAY_SIZE(scaleBuffer))));
// create the row type
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", createDecimalType(0, 0));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(2048, *getDefaultPool());
Decimal128VectorBatch *decimals =
new Decimal128VectorBatch(2048, *getDefaultPool());
batch.fields.push_back(decimals);
reader->next(batch, 2048, 0);
EXPECT_EQ(true, !batch.hasNulls);
EXPECT_EQ(2048, batch.numElements);
EXPECT_EQ(true, !decimals->hasNulls);
EXPECT_EQ(2048, decimals->numElements);
EXPECT_EQ(6, decimals->scale);
for(size_t i=0; i < decimals->numElements; ++i) {
EXPECT_EQ((i % 64) * (i < 1024 ? 10 : 100),
decimals->values[i].toLong()) << "Wrong value at " << i;
}
}
TEST(TestColumnReader, testUnion) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(4, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_,
proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// [0] * 1000 + [1] * 1000 + [0] * 200 + [1] * 200
const unsigned char buffer1[] = { 0x7f, 0x00,
0x7f, 0x00,
0x7f, 0x00,
0x7f, 0x00,
0x7f, 0x00,
0x7f, 0x00,
0x7f, 0x00,
0x57, 0x00,
0x7f, 0x01,
0x7f, 0x01,
0x7f, 0x01,
0x7f, 0x01,
0x7f, 0x01,
0x7f, 0x01,
0x7f, 0x01,
0x57, 0x01,
0x7f, 0x00,
0x43, 0x00,
0x7f, 0x01,
0x43, 0x01 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
// range(1200)
const unsigned char buffer2[] = { 0x7f, 0x01, 0x00,
0x7f, 0x01, 0x84, 0x02,
0x7f, 0x01, 0x88, 0x04,
0x7f, 0x01, 0x8c, 0x06,
0x7f, 0x01, 0x90, 0x08,
0x7f, 0x01, 0x94, 0x0a,
0x7f, 0x01, 0x98, 0x0c,
0x7f, 0x01, 0x9c, 0x0e,
0x7f, 0x01, 0xa0, 0x10,
0x1b, 0x01, 0xa4, 0x12 };
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// range(8, 1208)
const unsigned char buffer3[] = { 0x7f, 0x01, 0x10,
0x7f, 0x01, 0x94, 0x02,
0x7f, 0x01, 0x98, 0x04,
0x7f, 0x01, 0x9c, 0x06,
0x7f, 0x01, 0xa0, 0x08,
0x7f, 0x01, 0xa4, 0x0a,
0x7f, 0x01, 0xa8, 0x0c,
0x7f, 0x01, 0xac, 0x0e,
0x7f, 0x01, 0xb0, 0x10,
0x1b, 0x01, 0xb4, 0x12 };
EXPECT_CALL(streams, getStreamProxy(3, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer3, ARRAY_SIZE(buffer3))));
// create the row type
std::unique_ptr<Type> unionType = createUnionType();
unionType->addUnionChild(createPrimitiveType(LONG));
unionType->addUnionChild(createPrimitiveType(INT));
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", std::move(unionType));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(512, *getDefaultPool());
UnionVectorBatch *unions = new UnionVectorBatch(512, *getDefaultPool());
LongVectorBatch *child1 = new LongVectorBatch(512, *getDefaultPool());
LongVectorBatch *child2 = new LongVectorBatch(512, *getDefaultPool());
batch.fields.push_back(unions);
unions->children.push_back(child1);
unions->children.push_back(child2);
reader->next(batch, 512, 0);
ASSERT_EQ(512, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(512, unions->numElements);
ASSERT_EQ(true, !unions->hasNulls);
ASSERT_EQ(512, child1->numElements);
ASSERT_EQ(true, !child1->hasNulls);
ASSERT_EQ(0, child2->numElements);
ASSERT_EQ(true, !child2->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
EXPECT_EQ(0, unions->tags[i]);
EXPECT_EQ(i, unions->offsets[i]);
EXPECT_EQ(i, child1->data[i]);
}
reader->next(batch, 511, 0);
ASSERT_EQ(511, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(511, unions->numElements);
ASSERT_EQ(true, !unions->hasNulls);
ASSERT_EQ(488, child1->numElements);
ASSERT_EQ(true, !child1->hasNulls);
ASSERT_EQ(23, child2->numElements);
ASSERT_EQ(true, !child2->hasNulls);
for (size_t i = 0; i < 488; ++i) {
EXPECT_EQ(0, unions->tags[i]);
EXPECT_EQ(i, unions->offsets[i]);
EXPECT_EQ(i + 512, child1->data[i]);
}
for (size_t i = 488; i < 511; ++i) {
EXPECT_EQ(1, unions->tags[i]);
EXPECT_EQ(i - 488, unions->offsets[i]);
EXPECT_EQ(i - 480, child2->data[i - 488]);
}
reader->next(batch, 1, 0);
ASSERT_EQ(1, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(1, unions->numElements);
ASSERT_EQ(true, !unions->hasNulls);
ASSERT_EQ(0, child1->numElements);
ASSERT_EQ(true, !child1->hasNulls);
ASSERT_EQ(1, child2->numElements);
ASSERT_EQ(true, !child2->hasNulls);
EXPECT_EQ(1, unions->tags[0]);
EXPECT_EQ(0, unions->offsets[0]);
EXPECT_EQ(31, child2->data[0]);
batch.resize(1500);
reader->next(batch, 1376, 0);
ASSERT_EQ(1376, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(1376, unions->numElements);
ASSERT_EQ(true, !unions->hasNulls);
ASSERT_EQ(200, child1->numElements);
ASSERT_EQ(true, !child1->hasNulls);
ASSERT_EQ(1176, child2->numElements);
ASSERT_EQ(true, !child2->hasNulls);
for (size_t i = 0; i < 1376; ++i) {
if (i < 976) {
EXPECT_EQ(1, unions->tags[i]);
EXPECT_EQ(i, unions->offsets[i]);
EXPECT_EQ(i + 32, child2->data[i]);
} else if (i < 1176) {
EXPECT_EQ(0, unions->tags[i]);
EXPECT_EQ(i - 976, unions->offsets[i]);
EXPECT_EQ(i + 24, child1->data[i - 976]);
} else {
EXPECT_EQ(1, unions->tags[i]);
EXPECT_EQ(i - 200, unions->offsets[i]);
EXPECT_EQ(i - 168, child2->data[i - 200]);
}
}
EXPECT_EQ(("Struct vector <1376 of 1500;"
" Union vector <Long vector <200 of 512>,"
" Long vector <1176 of 1176>; with 1376 of 1376>; >"),
batch.toString());
}
TEST(TestColumnReader, testUnionWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(4, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_,
proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// [0] * 3 + [255] * 6 + [0] * 3
const unsigned char buffer1[] = { 0x00, 0x00, 0x03, 0xff, 0x00, 0x00 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
// [0, 1] * 24
const unsigned char buffer2[] = { 0xd0,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// range(24)
const unsigned char buffer3[] = { 0x15, 0x01, 0x00 };
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer3, ARRAY_SIZE(buffer3))));
// range(8, 32)
const unsigned char buffer4[] = { 0x15, 0x01, 0x10 };
EXPECT_CALL(streams, getStreamProxy(3, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer4, ARRAY_SIZE(buffer4))));
// create the row type
std::unique_ptr<Type> unionType = createUnionType();
unionType->addUnionChild(createPrimitiveType(LONG));
unionType->addUnionChild(createPrimitiveType(INT));
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", std::move(unionType));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(512, *getDefaultPool());
UnionVectorBatch *unions = new UnionVectorBatch(512, *getDefaultPool());
LongVectorBatch *child1 = new LongVectorBatch(512, *getDefaultPool());
LongVectorBatch *child2 = new LongVectorBatch(512, *getDefaultPool());
batch.fields.push_back(unions);
unions->children.push_back(child1);
unions->children.push_back(child2);
reader->next(batch, 96, 0);
ASSERT_EQ(96, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(96, unions->numElements);
ASSERT_EQ(true, unions->hasNulls);
ASSERT_EQ(24, child1->numElements);
ASSERT_EQ(true, !child1->hasNulls);
ASSERT_EQ(24, child2->numElements);
ASSERT_EQ(true, !child2->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
if (i < 24) {
EXPECT_EQ(true, !unions->notNull[i]);
} else if (i < 72) {
EXPECT_EQ(true, unions->notNull[i]);
EXPECT_EQ(i % 2, unions->tags[i]);
EXPECT_EQ((i - 24) / 2, unions->offsets[i]);
if (i % 2 == 0) {
EXPECT_EQ((i - 24) / 2, child1->data[unions->offsets[i]]);
} else {
EXPECT_EQ((i - 24) / 2 + 8, child2->data[unions->offsets[i]]);
}
} else {
EXPECT_EQ(true, !unions->notNull[i]);
}
}
}
TEST(TestColumnReader, testUnionSkips) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(2, true);
selectedColumns.push_back(false);
selectedColumns.push_back(true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_,
proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// [0] * 3 + [255] * 6 + [0] * 3
const unsigned char buffer1[] = { 0x00, 0x00, 0x03, 0xff, 0x00, 0x00 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
// [0, 1] * 24
const unsigned char buffer2[] = { 0xd0,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01,
0x00, 0x01, 0x00, 0x01 };
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// range(8, 32)
const unsigned char buffer3[] = { 0x15, 0x01, 0x10 };
EXPECT_CALL(streams, getStreamProxy(3, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer3, ARRAY_SIZE(buffer3))));
// create the row type
std::unique_ptr<Type> unionType = createUnionType();
unionType->addUnionChild(createPrimitiveType(LONG));
unionType->addUnionChild(createPrimitiveType(INT));
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", std::move(unionType));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(512, *getDefaultPool());
UnionVectorBatch *unions = new UnionVectorBatch(512, *getDefaultPool());
LongVectorBatch *child2 = new LongVectorBatch(512, *getDefaultPool());
batch.fields.push_back(unions);
unions->children.push_back(nullptr);
unions->children.push_back(child2);
reader->next(batch, 26, 0);
ASSERT_EQ(26, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(26, unions->numElements);
ASSERT_EQ(true, unions->hasNulls);
ASSERT_EQ(1, child2->numElements);
ASSERT_EQ(true, !child2->hasNulls);
for (size_t i = 0; i < 24; ++i) {
EXPECT_EQ(true, !unions->notNull[i]);
}
EXPECT_EQ(true, unions->notNull[24]);
EXPECT_EQ(0, unions->tags[24]);
EXPECT_EQ(true, unions->notNull[25]);
EXPECT_EQ(1, unions->tags[25]);
EXPECT_EQ(0, unions->offsets[25]);
EXPECT_EQ(8, child2->data[unions->offsets[25]]);
reader->skip(44);
reader->next(batch, 26, 0);
ASSERT_EQ(26, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(26, unions->numElements);
ASSERT_EQ(true, unions->hasNulls);
ASSERT_EQ(1, child2->numElements);
ASSERT_EQ(true, !child2->hasNulls);
EXPECT_EQ(true, unions->notNull[0]);
EXPECT_EQ(0, unions->tags[0]);
EXPECT_EQ(true, unions->notNull[1]);
EXPECT_EQ(1, unions->tags[1]);
EXPECT_EQ(0, unions->offsets[1]);
EXPECT_EQ(31, child2->data[unions->offsets[1]]);
for (size_t i = 2; i < 26; ++i) {
EXPECT_EQ(true, !unions->notNull[i]);
}
}
TEST(TestColumnReader, testUnionLongSkip) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(3, true);
selectedColumns.push_back(false);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_,
proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// [1] * 1300 + [0] * 1300
const unsigned char buffer1[] = { 0x7f, 0x01, 0x7f, 0x01,
0x7f, 0x01, 0x7f, 0x01,
0x7f, 0x01, 0x7f, 0x01,
0x7f, 0x01, 0x7f, 0x01,
0x7f, 0x01, 0x7f, 0x01,
0x7f, 0x00, 0x7f, 0x00,
0x7f, 0x00, 0x7f, 0x00,
0x7f, 0x00, 0x7f, 0x00,
0x7f, 0x00, 0x7f, 0x00,
0x7f, 0x00, 0x7f, 0x00};
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer1, ARRAY_SIZE(buffer1))));
// range(0, 1300)
const unsigned char buffer2[] = { 0x7f, 0x01, 0x00,
0x7f, 0x01, 0x84, 0x02,
0x7f, 0x01, 0x88, 0x04,
0x7f, 0x01, 0x8c, 0x06,
0x7f, 0x01, 0x90, 0x08,
0x7f, 0x01, 0x94, 0x0a,
0x7f, 0x01, 0x98, 0x0c,
0x7f, 0x01, 0x9c, 0x0e,
0x7f, 0x01, 0xa0, 0x10,
0x7f, 0x01, 0xa4, 0x12};
EXPECT_CALL(streams, getStreamProxy(2, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer2, ARRAY_SIZE(buffer2))));
// create the row type
std::unique_ptr<Type> unionType = createUnionType();
unionType->addUnionChild(createPrimitiveType(LONG));
unionType->addUnionChild(createPrimitiveType(INT));
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", std::move(unionType));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(512, *getDefaultPool());
UnionVectorBatch *unions = new UnionVectorBatch(512, *getDefaultPool());
LongVectorBatch *child1 = new LongVectorBatch(512, *getDefaultPool());
batch.fields.push_back(unions);
unions->children.push_back(child1);
unions->children.push_back(nullptr);
reader->next(batch, 10, 0);
ASSERT_EQ(10, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(10, unions->numElements);
ASSERT_EQ(true, !unions->hasNulls);
ASSERT_EQ(0, child1->numElements);
ASSERT_EQ(true, !child1->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
EXPECT_EQ(1, unions->tags[i]);
EXPECT_EQ(i, unions->offsets[i]);
}
reader->skip(2490);
reader->next(batch, 100, 0);
ASSERT_EQ(100, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(100, unions->numElements);
ASSERT_EQ(true, !unions->hasNulls);
ASSERT_EQ(100, child1->numElements);
ASSERT_EQ(true, !child1->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
EXPECT_EQ(0, unions->tags[i]);
EXPECT_EQ(i, unions->offsets[i]);
EXPECT_EQ(i + 1200, child1->data[unions->offsets[i]]);
}
}
TEST(TestColumnReader, testUnionWithManyVariants) {
MockStripeStreams streams;
// set getSelectedColumns()
std::vector<bool> selectedColumns(132, true);
EXPECT_CALL(streams, getSelectedColumns())
.WillRepeatedly(testing::Return(selectedColumns));
// set getEncoding
proto::ColumnEncoding directEncoding;
directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
EXPECT_CALL(streams, getEncoding(testing::_))
.WillRepeatedly(testing::Return(directEncoding));
// set getStream
EXPECT_CALL(streams, getStreamProxy(testing::_,
proto::Stream_Kind_PRESENT, true))
.WillRepeatedly(testing::Return(nullptr));
// list(range(130)) * 3
unsigned char tagBuffer[129 * 3 + 7];
// three literal runs of length 128 and one literal run of 6
tagBuffer[0] = 0x80;
tagBuffer[129] = 0x80;
tagBuffer[258] = 0x80;
tagBuffer[387] = 0xfa;
for(size_t i=0; i < 128; ++i) {
tagBuffer[i+1] = static_cast<unsigned char>(i);
tagBuffer[i+130] = static_cast<unsigned char>((i + 128) % 130);
tagBuffer[i+259] = static_cast<unsigned char>((i + 256) % 130);
if (i < 6) {
tagBuffer[i+388] = static_cast<unsigned char>((i + 384) % 130);
}
}
EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(tagBuffer, ARRAY_SIZE(tagBuffer))));
// for variant in range(0, 130):
// [variant & 0x3f, (variant & 0x3f) + 1, (variant & 0x3f) + 2]
unsigned char buffer[3 * 130];
for(size_t variant = 0; variant < 130; ++variant) {
buffer[3 * variant] = 0x00;
buffer[3 * variant + 1] = 0x01;
buffer[3 * variant + 2] = static_cast<unsigned char>((variant * 2) & 0x7f);
EXPECT_CALL(streams, getStreamProxy(variant + 2, proto::Stream_Kind_DATA,
true))
.WillRepeatedly(testing::Return(new SeekableArrayInputStream
(buffer + 3 * variant, 3)));
}
// create the row type
std::unique_ptr<Type> unionType = createUnionType();
for(size_t variant=0; variant < 130; ++variant) {
unionType->addUnionChild(createPrimitiveType(LONG));
}
std::unique_ptr<Type> rowType = createStructType();
rowType->addStructField("col0", std::move(unionType));
std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
StructVectorBatch batch(512, *getDefaultPool());
UnionVectorBatch *unions = new UnionVectorBatch(512, *getDefaultPool());
batch.fields.push_back(unions);
for(size_t variant=0; variant < 130; ++variant) {
unions->children.push_back(new LongVectorBatch(512, *getDefaultPool()));
}
reader->next(batch, 130, 0);
ASSERT_EQ(130, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(130, unions->numElements);
ASSERT_EQ(true, !unions->hasNulls);
for(size_t variant=0; variant < 130; ++variant) {
ASSERT_EQ(1, unions->children[variant]->numElements);
ASSERT_EQ(true, !unions->children[variant]->hasNulls);
}
for (size_t i = 0; i < batch.numElements; ++i) {
EXPECT_EQ(i, unions->tags[i]);
EXPECT_EQ(0, unions->offsets[i]);
EXPECT_EQ(i & 0x3f,
dynamic_cast<LongVectorBatch*>(unions->children[unions->tags[i]])
->data[unions->offsets[i]]);
}
reader->skip(30);
reader->next(batch, 230, 0);
ASSERT_EQ(230, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(230, unions->numElements);
ASSERT_EQ(true, !unions->hasNulls);
// check each variant has the right overall information
for(size_t variant=0; variant < 130; ++variant) {
ASSERT_EQ(variant < 30 ? 1 : 2, unions->children[variant]->numElements);
ASSERT_EQ(true, !unions->children[variant]->hasNulls);
}
// check to see if each row is right
for(size_t i=0; i < batch.numElements; ++i) {
size_t variant = (i + 30) % 130;
ASSERT_EQ(variant, unions->tags[i]);
ASSERT_EQ(i / 130, unions->offsets[i]);
EXPECT_EQ((variant & 0x3f) + (i < 100 ? 1 : 2),
dynamic_cast<LongVectorBatch*>(unions->children[variant])
->data[unions->offsets[i]]);
}
}
INSTANTIATE_TEST_CASE_P(OrcColumnReaderTest, TestColumnReaderEncoded, Values(true, false));
} // namespace orc