blob: f12db4e9b3ffc9a0911f42b573da8bb37ef9a00e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "orc/OrcFile.hh"
#include "orc/sargs/SearchArgument.hh"
#include "MemoryInputStream.hh"
#include "MemoryOutputStream.hh"
#include "wrap/gtest-wrapper.h"
namespace orc {
static const int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024; // 10M
void createMemTestFile(MemoryOutputStream& memStream, uint64_t rowIndexStride) {
MemoryPool * pool = getDefaultPool();
auto type = std::unique_ptr<Type>(Type::buildTypeFromString(
"struct<int1:bigint,string1:string>"));
WriterOptions options;
options.setStripeSize(1024 * 1024)
.setCompressionBlockSize(1024)
.setCompression(CompressionKind_NONE)
.setMemoryPool(pool)
.setRowIndexStride(rowIndexStride);
auto writer = createWriter(*type, &memStream, options);
auto batch = writer->createRowBatch(3500);
auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
auto& longBatch = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
auto& strBatch = dynamic_cast<StringVectorBatch&>(*structBatch.fields[1]);
// row group stride is 1000, here 3500 rows of data constitute 4 row groups.
// the min/max pair of each row group is as below:
// int1: 0/299700, 300000/599700, 600000/899700, 900000/1049700
// string1: "0"/"9990", "10000"/"19990", "20000"/"29990", "30000"/"34990"
char buffer[3500 * 5];
uint64_t offset = 0;
for (uint64_t i = 0; i < 3500; ++i) {
longBatch.data[i] = static_cast<int64_t>(i * 300);
std::ostringstream ss;
ss << 10 * i;
std::string str = ss.str();
memcpy(buffer + offset, str.c_str(), str.size());
strBatch.data[i] = buffer + offset;
strBatch.length[i] = static_cast<int64_t>(str.size());
offset += str.size();
}
structBatch.numElements = 3500;
longBatch.numElements = 3500;
strBatch.numElements = 3500;
writer->add(*batch);
writer->close();
}
void TestRangePredicates(Reader* reader) {
// Build search argument (x >= 300000 AND x < 600000) for column 'int1'.
// Test twice for using column name and column id respectively.
for (int k = 0; k < 2; ++k) {
std::unique_ptr<SearchArgument> sarg;
if (k == 0) {
sarg = SearchArgumentFactory::newBuilder()
->startAnd()
.startNot()
.lessThan("int1", PredicateDataType::LONG,
Literal(static_cast<int64_t>(300000L)))
.end()
.lessThan("int1", PredicateDataType::LONG,
Literal(static_cast<int64_t>(600000L)))
.end()
.build();
} else {
sarg = SearchArgumentFactory::newBuilder()
->startAnd()
.startNot()
.lessThan(/*columnId=*/1, PredicateDataType::LONG,
Literal(static_cast<int64_t>(300000L)))
.end()
.lessThan(/*columnId=*/1, PredicateDataType::LONG,
Literal(static_cast<int64_t>(600000L)))
.end()
.build();
}
RowReaderOptions rowReaderOpts;
rowReaderOpts.searchArgument(std::move(sarg));
auto rowReader = reader->createRowReader(rowReaderOpts);
auto readBatch = rowReader->createRowBatch(2000);
auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
auto& batch2 = dynamic_cast<StringVectorBatch&>(*batch0.fields[1]);
EXPECT_EQ(true, rowReader->next(*readBatch));
EXPECT_EQ(1000, readBatch->numElements);
EXPECT_EQ(1000, rowReader->getRowNumber());
for (uint64_t i = 1000; i < 2000; ++i) {
EXPECT_EQ(300 * i, batch1.data[i - 1000]);
EXPECT_EQ(std::to_string(10 * i),
std::string(batch2.data[i - 1000], static_cast<size_t>(batch2.length[i - 1000])));
}
EXPECT_EQ(false, rowReader->next(*readBatch));
EXPECT_EQ(3500, rowReader->getRowNumber());
}
}
void TestNoRowsSelected(Reader* reader) {
// Look through the file with no rows selected: x < 0
// Test twice for using column name and column id respectively.
for (int i = 0; i < 2; ++i) {
std::unique_ptr<SearchArgument> sarg;
if (i == 0) {
sarg = SearchArgumentFactory::newBuilder()
->startAnd()
.lessThan("int1", PredicateDataType::LONG,
Literal(static_cast<int64_t>(0)))
.end()
.build();
} else {
sarg = SearchArgumentFactory::newBuilder()
->startAnd()
.lessThan(/*columnId=*/1, PredicateDataType::LONG,
Literal(static_cast<int64_t>(0)))
.end()
.build();
}
RowReaderOptions rowReaderOpts;
rowReaderOpts.searchArgument(std::move(sarg));
auto rowReader = reader->createRowReader(rowReaderOpts);
auto readBatch = rowReader->createRowBatch(2000);
EXPECT_EQ(false, rowReader->next(*readBatch));
EXPECT_EQ(3500, rowReader->getRowNumber());
}
}
void TestOrPredicates(Reader* reader) {
// Select first 1000 and last 500 rows: x < 30000 OR x >= 1020000
// Test twice for using column name and column id respectively.
for (int k = 0; k < 2; ++k) {
std::unique_ptr<SearchArgument> sarg;
if (k == 0) {
sarg = SearchArgumentFactory::newBuilder()
->startOr()
.lessThan("int1", PredicateDataType::LONG,
Literal(static_cast<int64_t>(300 * 100)))
.startNot()
.lessThan("int1", PredicateDataType::LONG,
Literal(static_cast<int64_t>(300 * 3400)))
.end()
.end()
.build();
} else {
sarg = SearchArgumentFactory::newBuilder()
->startOr()
.lessThan(/*columnId=*/1, PredicateDataType::LONG,
Literal(static_cast<int64_t>(300 * 100)))
.startNot()
.lessThan(/*columnId=*/1, PredicateDataType::LONG,
Literal(static_cast<int64_t>(300 * 3400)))
.end()
.end()
.build();
}
RowReaderOptions rowReaderOpts;
rowReaderOpts.searchArgument(std::move(sarg));
auto rowReader = reader->createRowReader(rowReaderOpts);
auto readBatch = rowReader->createRowBatch(2000);
auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
auto& batch2 = dynamic_cast<StringVectorBatch&>(*batch0.fields[1]);
EXPECT_EQ(true, rowReader->next(*readBatch));
EXPECT_EQ(1000, readBatch->numElements);
EXPECT_EQ(0, rowReader->getRowNumber());
for (uint64_t i = 0; i < 1000; ++i) {
EXPECT_EQ(300 * i, batch1.data[i]);
EXPECT_EQ(std::to_string(10 * i),
std::string(batch2.data[i], static_cast<size_t>(batch2.length[i])));
}
EXPECT_EQ(true, rowReader->next(*readBatch));
EXPECT_EQ(500, readBatch->numElements);
EXPECT_EQ(3000, rowReader->getRowNumber());
for (uint64_t i = 3000; i < 3500; ++i) {
EXPECT_EQ(300 * i, batch1.data[i - 3000]);
EXPECT_EQ(std::to_string(10 * i),
std::string(batch2.data[i - 3000], static_cast<size_t>(batch2.length[i - 3000])));
}
EXPECT_EQ(false, rowReader->next(*readBatch));
EXPECT_EQ(3500, rowReader->getRowNumber());
// test seek to 3rd row group but is adjusted to 4th row group
rowReader->seekToRow(2500);
EXPECT_EQ(true, rowReader->next(*readBatch));
EXPECT_EQ(3000, rowReader->getRowNumber());
EXPECT_EQ(500, readBatch->numElements);
for (uint64_t i = 3000; i < 3500; ++i) {
EXPECT_EQ(300 * i, batch1.data[i - 3000]);
EXPECT_EQ(std::to_string(10 * i),
std::string(batch2.data[i - 3000], static_cast<size_t>(batch2.length[i - 3000])));
}
EXPECT_EQ(false, rowReader->next(*readBatch));
EXPECT_EQ(3500, rowReader->getRowNumber());
}
}
void TestSeekWithPredicates(Reader* reader, uint64_t seekRowNumber) {
// Build search argument (x < 300000) for column 'int1'. Only the first row group
// will be selected. It has 1000 rows: (0, "0"), (300, "10"), (600, "20"), ...,
// (299700, "9990").
std::unique_ptr<SearchArgument> sarg = SearchArgumentFactory::newBuilder()
->lessThan("int1", PredicateDataType::LONG,
Literal(static_cast<int64_t>(300000)))
.build();
RowReaderOptions rowReaderOpts;
rowReaderOpts.searchArgument(std::move(sarg));
auto rowReader = reader->createRowReader(rowReaderOpts);
auto readBatch = rowReader->createRowBatch(2000);
auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
auto& batch2 = dynamic_cast<StringVectorBatch&>(*batch0.fields[1]);
rowReader->seekToRow(seekRowNumber);
if (seekRowNumber >= 1000) {
// Seek advance the first row group will go to the end of file
EXPECT_FALSE(rowReader->next(*readBatch));
EXPECT_EQ(0, readBatch->numElements);
EXPECT_EQ(3500, rowReader->getRowNumber());
return;
}
EXPECT_TRUE(rowReader->next(*readBatch));
EXPECT_EQ(1000 - seekRowNumber, readBatch->numElements);
EXPECT_EQ(seekRowNumber, rowReader->getRowNumber());
for (uint64_t i = 0; i < readBatch->numElements; ++i) {
EXPECT_EQ(300 * (i + seekRowNumber), batch1.data[i]);
EXPECT_EQ(std::to_string(10 * (i + seekRowNumber)),
std::string(batch2.data[i], static_cast<size_t>(batch2.length[i])));
}
EXPECT_FALSE(rowReader->next(*readBatch));
EXPECT_EQ(3500, rowReader->getRowNumber());
}
void TestMultipleSeeksWithPredicates(Reader* reader) {
// Build search argument (x >= 300000 AND x < 600000) for column 'int1'. Only the 2nd
// row group will be selected.
std::unique_ptr<SearchArgument> sarg = SearchArgumentFactory::newBuilder()
->startAnd()
.startNot()
.lessThan("int1", PredicateDataType::LONG,
Literal(static_cast<int64_t>(300000L)))
.end()
.lessThan("int1", PredicateDataType::LONG,
Literal(static_cast<int64_t>(600000L)))
.end()
.build();
RowReaderOptions rowReaderOpts;
rowReaderOpts.searchArgument(std::move(sarg));
auto rowReader = reader->createRowReader(rowReaderOpts);
// Read only one row after each seek
auto readBatch = rowReader->createRowBatch(1);
auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
auto& batch2 = dynamic_cast<StringVectorBatch&>(*batch0.fields[1]);
// Seek within the 1st row group will go to the start of the 2nd row group
rowReader->seekToRow(10);
EXPECT_TRUE(rowReader->next(*readBatch));
EXPECT_EQ(1000, rowReader->getRowNumber()) << "Should start at the 2nd row group";
EXPECT_EQ(1, readBatch->numElements);
EXPECT_EQ(300000, batch1.data[0]);
EXPECT_EQ("10000", std::string(batch2.data[0], static_cast<size_t>(batch2.length[0])));
// Seek within the 2nd row group (1000 rows) which is selected by the search argument
uint64_t seekRowNum[] = {1001, 1010, 1100, 1500, 1999};
for (uint64_t pos : seekRowNum) {
rowReader->seekToRow(pos);
EXPECT_TRUE(rowReader->next(*readBatch));
EXPECT_EQ(pos, rowReader->getRowNumber());
EXPECT_EQ(1, readBatch->numElements);
EXPECT_EQ(300 * pos, batch1.data[0]);
EXPECT_EQ(std::to_string(10 * pos),
std::string(batch2.data[0], static_cast<size_t>(batch2.length[0])));
}
// Seek advance the 2nd row group will go to the end of file
rowReader->seekToRow(2000);
EXPECT_FALSE(rowReader->next(*readBatch));
EXPECT_EQ(3500, rowReader->getRowNumber());
EXPECT_EQ(0, readBatch->numElements);
}
TEST(TestPredicatePushdown, testPredicatePushdown) {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
MemoryPool* pool = getDefaultPool();
createMemTestFile(memStream, 1000);
std::unique_ptr<InputStream> inStream(new MemoryInputStream (
memStream.getData(), memStream.getLength()));
ReaderOptions readerOptions;
readerOptions.setMemoryPool(*pool);
std::unique_ptr<Reader> reader = createReader(std::move(inStream), readerOptions);
EXPECT_EQ(3500, reader->getNumberOfRows());
TestRangePredicates(reader.get());
TestNoRowsSelected(reader.get());
TestOrPredicates(reader.get());
uint64_t seekRowNumbers[] = {0, 10, 100, 500, 999, 1000, 1001, 4000};
for (uint64_t seekRowNumber : seekRowNumbers) {
TestSeekWithPredicates(reader.get(), seekRowNumber);
}
TestMultipleSeeksWithPredicates(reader.get());
}
void TestMultipleSeeksWithoutRowIndexes(Reader* reader, bool createSarg) {
RowReaderOptions rowReaderOpts;
if (createSarg) {
// Build search argument x < 300000 for column 'int1'. All rows will be selected
// since there are no row indexes in the file.
std::unique_ptr<SearchArgument> sarg = SearchArgumentFactory::newBuilder()
->lessThan("int1", PredicateDataType::LONG,
Literal(static_cast<int64_t>(300000L)))
.build();
rowReaderOpts.searchArgument(std::move(sarg));
}
auto rowReader = reader->createRowReader(rowReaderOpts);
// Read only one row after each seek
auto readBatch = rowReader->createRowBatch(1);
auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
auto& batch2 = dynamic_cast<StringVectorBatch&>(*batch0.fields[1]);
// Seeks within the file
uint64_t seekRowNum[] = {0, 1, 100, 999, 1001, 1010, 1100, 1500, 1999, 3000, 3499};
for (uint64_t pos : seekRowNum) {
rowReader->seekToRow(pos);
EXPECT_TRUE(rowReader->next(*readBatch));
EXPECT_EQ(pos, rowReader->getRowNumber());
EXPECT_EQ(1, readBatch->numElements);
EXPECT_EQ(300 * pos, batch1.data[0]);
EXPECT_EQ(std::to_string(10 * pos),
std::string(batch2.data[0], static_cast<size_t>(batch2.length[0])));
}
// Seek advance the end of file
rowReader->seekToRow(4000);
EXPECT_FALSE(rowReader->next(*readBatch));
EXPECT_EQ(3500, rowReader->getRowNumber());
EXPECT_EQ(0, readBatch->numElements);
}
TEST(TestPredicatePushdown, testPredicatePushdownWithoutRowIndexes) {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
MemoryPool* pool = getDefaultPool();
// Create the file with rowIndexStride=0, so there are no row groups or row indexes.
createMemTestFile(memStream, 0);
std::unique_ptr<InputStream> inStream(new MemoryInputStream (
memStream.getData(), memStream.getLength()));
ReaderOptions readerOptions;
readerOptions.setMemoryPool(*pool);
std::unique_ptr<Reader> reader = createReader(std::move(inStream), readerOptions);
EXPECT_EQ(3500, reader->getNumberOfRows());
TestMultipleSeeksWithoutRowIndexes(reader.get(), true);
TestMultipleSeeksWithoutRowIndexes(reader.get(), false);
}
// Test Sarg skips the whole file based on file stats.
// Seeking to 'seekRowNumber' (if it's non-negative) before reads.
void TestNoRowsSelectedWithFileStats(Reader* reader, int seekRowNumber) {
std::unique_ptr<SearchArgument> sarg =
SearchArgumentFactory::newBuilder()
->startAnd()
.lessThan("col1", PredicateDataType::LONG,
Literal(static_cast<int64_t>(0)))
.end()
.build();
RowReaderOptions rowReaderOpts;
rowReaderOpts.searchArgument(std::move(sarg));
auto rowReader = reader->createRowReader(rowReaderOpts);
auto readBatch = rowReader->createRowBatch(2000);
if (seekRowNumber >= 0) {
rowReader->seekToRow(static_cast<uint64_t>(seekRowNumber));
}
EXPECT_EQ(false, rowReader->next(*readBatch));
EXPECT_EQ(7000, rowReader->getRowNumber());
}
void TestLastStripeSelectedWithStripeStats(Reader* reader, int seekRowNumber) {
// Sargs: col1 between 3500 and 7000. First stripe (3500 rows) will be skipped.
std::unique_ptr<SearchArgument> sarg =
SearchArgumentFactory::newBuilder()
->between("col1",
PredicateDataType::LONG,
Literal(static_cast<int64_t>(3500)),
Literal(static_cast<int64_t>(7000)))
.build();
RowReaderOptions rowReaderOpts;
rowReaderOpts.searchArgument(std::move(sarg));
auto rowReader = reader->createRowReader(rowReaderOpts);
if (seekRowNumber >= 0) {
rowReader->seekToRow(static_cast<uint64_t>(seekRowNumber));
}
// Seek within the first stripe which is skipped due to PPD. Any seeks within it
// will go to the end of the first stripe.
if (seekRowNumber < 3500) {
auto readBatch = rowReader->createRowBatch(2000);
// 1st batch of 2000 rows
EXPECT_EQ(true, rowReader->next(*readBatch));
// test previous row number
EXPECT_EQ(3500, rowReader->getRowNumber());
EXPECT_EQ(2000, readBatch->numElements);
auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
for (uint64_t i = 0; i < 2000; ++i) {
EXPECT_EQ(i + 3500 , batch1.data[i]);
}
// 2nd batch of the remaining 1500 rows
EXPECT_EQ(true, rowReader->next(*readBatch));
// test previous row number
EXPECT_EQ(5500, rowReader->getRowNumber());
EXPECT_EQ(1500, readBatch->numElements);
for (uint64_t i = 0; i < 1500; ++i) {
EXPECT_EQ(i + 5500 , batch1.data[i]);
}
// no more batches
EXPECT_EQ(false, rowReader->next(*readBatch));
return;
}
// Seek to the end of file
if (seekRowNumber >= 7000) {
auto readBatch = rowReader->createRowBatch(2000);
EXPECT_EQ(false, rowReader->next(*readBatch));
EXPECT_EQ(7000, rowReader->getRowNumber());
return;
}
{
// Seek within the second stripe. Use 3500 as the batch size so we can read all rows
// at once.
auto readBatch = rowReader->createRowBatch(3500);
EXPECT_EQ(true, rowReader->next(*readBatch));
EXPECT_EQ(seekRowNumber, rowReader->getRowNumber());
EXPECT_EQ(7000 - seekRowNumber, readBatch->numElements);
auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
for (uint64_t i = 0; i < readBatch->numElements; ++i) {
EXPECT_EQ(i + static_cast<unsigned long>(seekRowNumber), batch1.data[i]);
}
// no more batches
EXPECT_EQ(false, rowReader->next(*readBatch));
}
}
void TestFirstStripeSelectedWithStripeStats(Reader* reader, int seekRowNumber) {
// Sargs: col1 < 3500. Last stripe (3500 rows) will be skipped.
std::unique_ptr<SearchArgument> sarg = SearchArgumentFactory::newBuilder()
->lessThan("col1",
PredicateDataType::LONG,
Literal(static_cast<int64_t>(3500)))
.build();
RowReaderOptions rowReaderOpts;
rowReaderOpts.searchArgument(std::move(sarg));
auto rowReader = reader->createRowReader(rowReaderOpts);
auto readBatch = rowReader->createRowBatch(3500);
auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
uint64_t firstRowNumber = 0;
if (seekRowNumber >= 0) {
rowReader->seekToRow(static_cast<uint64_t>(seekRowNumber));
firstRowNumber = static_cast<uint64_t>(seekRowNumber);
}
if (seekRowNumber < 3500) {
EXPECT_EQ(true, rowReader->next(*readBatch));
EXPECT_EQ(firstRowNumber, rowReader->getRowNumber());
EXPECT_EQ(3500 - firstRowNumber, readBatch->numElements);
for (uint64_t i = 0; i < readBatch->numElements; ++i) {
EXPECT_EQ(i + firstRowNumber, batch1.data[i]);
}
}
// no more batches
EXPECT_EQ(false, rowReader->next(*readBatch));
EXPECT_EQ(7000, rowReader->getRowNumber());
}
TEST(TestPredicatePushdown, testStripeAndFileStats) {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
MemoryPool * pool = getDefaultPool();
auto type = std::unique_ptr<Type>(Type::buildTypeFromString(
"struct<col1:bigint>"));
WriterOptions options;
options.setStripeSize(1)
.setCompressionBlockSize(1024)
.setCompression(CompressionKind_NONE)
.setMemoryPool(pool)
.setRowIndexStride(1000);
auto writer = createWriter(*type, &memStream, options);
auto batch = writer->createRowBatch(3500);
auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
auto& longBatch = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
// stripe 1 : 0 <= col1 < 3500
// stripe 2 : 3500<= col1 < 7000
uint64_t stripeCount = 2;
for (uint64_t currentStripe = 0; currentStripe < stripeCount; ++currentStripe) {
for (uint64_t i = 0; i < 3500; ++i) {
longBatch.data[i] = static_cast<int64_t>(i + currentStripe * 3500);
}
structBatch.numElements = 3500;
longBatch.numElements = 3500;
writer->add(*batch);
}
writer->close();
std::unique_ptr<InputStream> inStream(new MemoryInputStream (
memStream.getData(), memStream.getLength()));
ReaderOptions readerOptions;
readerOptions.setMemoryPool(*pool);
std::unique_ptr<Reader> reader = createReader(std::move(inStream), readerOptions);
EXPECT_EQ(7000, reader->getNumberOfRows());
EXPECT_EQ(stripeCount, reader->getNumberOfStripes());
// Seek to different positions before each test. -1 means no seek.
int seekRowNumber[] = {-1, 0, 1000, 4000, 8000};
for (int pos : seekRowNumber) {
TestNoRowsSelectedWithFileStats(reader.get(), pos);
TestLastStripeSelectedWithStripeStats(reader.get(), pos);
TestFirstStripeSelectedWithStripeStats(reader.get(), pos);
}
}
} // namespace orc