| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <cstring> |
| |
| #include "Reader.hh" |
| #include "orc/Reader.hh" |
| |
| #include "Adaptor.hh" |
| #include "MemoryInputStream.hh" |
| #include "MemoryOutputStream.hh" |
| |
| #include "wrap/gmock.h" |
| #include "wrap/gtest-wrapper.h" |
| |
| namespace orc { |
| |
| using ::testing::ElementsAreArray; |
| |
| static const int DEFAULT_MEM_STREAM_SIZE = 1024 * 1024; // 1M |
| |
| TEST(TestReader, testWriterVersions) { |
| EXPECT_EQ("original", writerVersionToString(WriterVersion_ORIGINAL)); |
| EXPECT_EQ("HIVE-8732", writerVersionToString(WriterVersion_HIVE_8732)); |
| EXPECT_EQ("HIVE-4243", writerVersionToString(WriterVersion_HIVE_4243)); |
| EXPECT_EQ("HIVE-12055", writerVersionToString(WriterVersion_HIVE_12055)); |
| EXPECT_EQ("HIVE-13083", writerVersionToString(WriterVersion_HIVE_13083)); |
| EXPECT_EQ("future - 99", writerVersionToString(static_cast<WriterVersion>(99))); |
| } |
| |
| TEST(TestReader, testCompressionNames) { |
| EXPECT_EQ("none", compressionKindToString(CompressionKind_NONE)); |
| EXPECT_EQ("zlib", compressionKindToString(CompressionKind_ZLIB)); |
| EXPECT_EQ("snappy", compressionKindToString(CompressionKind_SNAPPY)); |
| EXPECT_EQ("lzo", compressionKindToString(CompressionKind_LZO)); |
| EXPECT_EQ("lz4", compressionKindToString(CompressionKind_LZ4)); |
| EXPECT_EQ("zstd", compressionKindToString(CompressionKind_ZSTD)); |
| EXPECT_EQ("unknown - 99", compressionKindToString(static_cast<CompressionKind>(99))); |
| } |
| |
| TEST(TestRowReader, computeBatchSize) { |
| uint64_t rowIndexStride = 100; |
| uint64_t rowsInCurrentStripe = 100 * 8 + 50; |
| std::vector<uint64_t> nextSkippedRows = {0, 0, 400, 400, 0, 0, 800, 800, 0}; |
| |
| EXPECT_EQ(0, RowReaderImpl::computeBatchSize(1024, 0, rowsInCurrentStripe, rowIndexStride, |
| nextSkippedRows)); |
| EXPECT_EQ(0, RowReaderImpl::computeBatchSize(1024, 50, rowsInCurrentStripe, rowIndexStride, |
| nextSkippedRows)); |
| EXPECT_EQ(200, RowReaderImpl::computeBatchSize(1024, 200, rowsInCurrentStripe, rowIndexStride, |
| nextSkippedRows)); |
| EXPECT_EQ(150, RowReaderImpl::computeBatchSize(1024, 250, rowsInCurrentStripe, rowIndexStride, |
| nextSkippedRows)); |
| EXPECT_EQ(0, RowReaderImpl::computeBatchSize(1024, 550, rowsInCurrentStripe, rowIndexStride, |
| nextSkippedRows)); |
| EXPECT_EQ(100, RowReaderImpl::computeBatchSize(1024, 700, rowsInCurrentStripe, rowIndexStride, |
| nextSkippedRows)); |
| EXPECT_EQ(50, RowReaderImpl::computeBatchSize(50, 700, rowsInCurrentStripe, rowIndexStride, |
| nextSkippedRows)); |
| EXPECT_EQ(0, RowReaderImpl::computeBatchSize(50, 810, rowsInCurrentStripe, rowIndexStride, |
| nextSkippedRows)); |
| EXPECT_EQ(0, RowReaderImpl::computeBatchSize(50, 900, rowsInCurrentStripe, rowIndexStride, |
| nextSkippedRows)); |
| } |
| |
| TEST(TestRowReader, advanceToNextRowGroup) { |
| uint64_t rowIndexStride = 100; |
| uint64_t rowsInCurrentStripe = 100 * 8 + 50; |
| std::vector<uint64_t> nextSkippedRows = {0, 0, 400, 400, 0, 0, 800, 800, 0}; |
| |
| EXPECT_EQ(200, RowReaderImpl::advanceToNextRowGroup(0, rowsInCurrentStripe, rowIndexStride, |
| nextSkippedRows)); |
| EXPECT_EQ(200, RowReaderImpl::advanceToNextRowGroup(150, rowsInCurrentStripe, rowIndexStride, |
| nextSkippedRows)); |
| EXPECT_EQ(250, RowReaderImpl::advanceToNextRowGroup(250, rowsInCurrentStripe, rowIndexStride, |
| nextSkippedRows)); |
| EXPECT_EQ(350, RowReaderImpl::advanceToNextRowGroup(350, rowsInCurrentStripe, rowIndexStride, |
| nextSkippedRows)); |
| EXPECT_EQ(350, RowReaderImpl::advanceToNextRowGroup(350, rowsInCurrentStripe, rowIndexStride, |
| nextSkippedRows)); |
| EXPECT_EQ(600, RowReaderImpl::advanceToNextRowGroup(500, rowsInCurrentStripe, rowIndexStride, |
| nextSkippedRows)); |
| EXPECT_EQ(699, RowReaderImpl::advanceToNextRowGroup(699, rowsInCurrentStripe, rowIndexStride, |
| nextSkippedRows)); |
| EXPECT_EQ(799, RowReaderImpl::advanceToNextRowGroup(799, rowsInCurrentStripe, rowIndexStride, |
| nextSkippedRows)); |
| EXPECT_EQ(850, RowReaderImpl::advanceToNextRowGroup(800, rowsInCurrentStripe, rowIndexStride, |
| nextSkippedRows)); |
| EXPECT_EQ(850, RowReaderImpl::advanceToNextRowGroup(900, rowsInCurrentStripe, rowIndexStride, |
| nextSkippedRows)); |
| } |
| |
| void CheckFileWithSargs(const char* fileName, const char* softwareVersion) { |
| std::stringstream ss; |
| if (const char* example_dir = std::getenv("ORC_EXAMPLE_DIR")) { |
| ss << example_dir; |
| } else { |
| ss << "../../../examples"; |
| } |
| // Read a file with bloom filters written by CPP writer in version 1.6.11. |
| ss << "/" << fileName; |
| ReaderOptions readerOpts; |
| readerOpts.setReaderMetrics(nullptr); |
| std::unique_ptr<Reader> reader = |
| createReader(readLocalFile(ss.str().c_str(), readerOpts.getReaderMetrics()), readerOpts); |
| EXPECT_EQ(WriterId::ORC_CPP_WRITER, reader->getWriterId()); |
| EXPECT_EQ(softwareVersion, reader->getSoftwareVersion()); |
| |
| // Create SearchArgument with a EQUALS predicate which can leverage the bloom filters. |
| RowReaderOptions rowReaderOpts; |
| std::unique_ptr<SearchArgumentBuilder> sarg = SearchArgumentFactory::newBuilder(); |
| // Integer value 18000000000 has an inconsistent hash before the fix of ORC-1024. |
| sarg->equals(1, PredicateDataType::LONG, Literal(static_cast<int64_t>(18000000000L))); |
| std::unique_ptr<SearchArgument> final_sarg = sarg->build(); |
| rowReaderOpts.searchArgument(std::move(final_sarg)); |
| std::unique_ptr<RowReader> rowReader = reader->createRowReader(rowReaderOpts); |
| |
| // Make sure bad bloom filters won't affect the results. |
| std::unique_ptr<ColumnVectorBatch> batch = rowReader->createRowBatch(1024); |
| EXPECT_TRUE(rowReader->next(*batch)); |
| EXPECT_EQ(5, batch->numElements); |
| EXPECT_FALSE(rowReader->next(*batch)); |
| } |
| |
| TEST(TestRowReader, testSkipBadBloomFilters) { |
| CheckFileWithSargs("bad_bloom_filter_1.6.11.orc", "ORC C++ 1.6.11"); |
| CheckFileWithSargs("bad_bloom_filter_1.6.0.orc", "ORC C++"); |
| } |
| |
| void verifySelection(const std::unique_ptr<Reader>& reader, |
| const RowReaderOptions::IdReadIntentMap& idReadIntentMap, |
| const std::vector<uint32_t>& expectedSelection) { |
| RowReaderOptions rowReaderOpts; |
| rowReaderOpts.includeTypesWithIntents(idReadIntentMap); |
| std::unique_ptr<RowReader> rowReader = reader->createRowReader(rowReaderOpts); |
| std::vector<bool> expected(reader->getType().getMaximumColumnId() + 1, false); |
| for (auto id : expectedSelection) { |
| expected[id] = true; |
| } |
| ASSERT_THAT(rowReader->getSelectedColumns(), ElementsAreArray(expected)); |
| } |
| |
| std::unique_ptr<Reader> createNestedListMemReader(MemoryOutputStream& memStream, |
| const std::vector<uint32_t>& stripesToPrefetch, |
| const std::list<uint64_t>& columnsToPrefetch, |
| bool prefetchTwice) { |
| MemoryPool* pool = getDefaultPool(); |
| |
| auto type = std::unique_ptr<Type>( |
| Type::buildTypeFromString("struct<" |
| "int_array:array<int>," |
| "int_array_array_array:array<array<array<int>>>" |
| ">")); |
| WriterOptions options; |
| options.setStripeSize(1024 * 1024) |
| .setCompressionBlockSize(1024) |
| .setMemoryBlockSize(64) |
| .setCompression(CompressionKind_NONE) |
| .setMemoryPool(pool) |
| .setRowIndexStride(1000); |
| |
| auto writer = createWriter(*type, &memStream, options); |
| auto batch = writer->createRowBatch(100); |
| auto& type0StructBatch = dynamic_cast<StructVectorBatch&>(*batch); |
| auto& type1ListBatch = dynamic_cast<ListVectorBatch&>(*type0StructBatch.fields[0]); |
| auto& type2LongBatch = dynamic_cast<LongVectorBatch&>(*type1ListBatch.elements); |
| auto& type3ListBatch = dynamic_cast<ListVectorBatch&>(*type0StructBatch.fields[1]); |
| auto& type4ListBatch = dynamic_cast<ListVectorBatch&>(*type3ListBatch.elements); |
| auto& type5ListBatch = dynamic_cast<ListVectorBatch&>(*type4ListBatch.elements); |
| auto& type6LongBatch = dynamic_cast<LongVectorBatch&>(*type5ListBatch.elements); |
| |
| type6LongBatch.numElements = 3; |
| type6LongBatch.data[0] = 1; |
| type6LongBatch.data[1] = 2; |
| type6LongBatch.data[2] = 3; |
| |
| type5ListBatch.numElements = 3; |
| type5ListBatch.offsets[0] = 0; |
| type5ListBatch.offsets[1] = 1; |
| type5ListBatch.offsets[2] = 2; |
| type5ListBatch.offsets[3] = 3; |
| |
| type4ListBatch.numElements = 3; |
| type4ListBatch.offsets[0] = 0; |
| type4ListBatch.offsets[1] = 1; |
| type4ListBatch.offsets[2] = 2; |
| type4ListBatch.offsets[3] = 3; |
| |
| type3ListBatch.numElements = 1; |
| type3ListBatch.offsets[0] = 0; |
| type3ListBatch.offsets[1] = 3; |
| |
| type2LongBatch.numElements = 2; |
| type2LongBatch.data[0] = -1; |
| type2LongBatch.data[1] = -2; |
| |
| type1ListBatch.numElements = 1; |
| type1ListBatch.offsets[0] = 0; |
| type1ListBatch.offsets[1] = 2; |
| |
| type0StructBatch.numElements = 1; |
| |
| writer->add(*batch); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| ReaderOptions readerOptions; |
| readerOptions.setMemoryPool(*pool); |
| auto reader = createReader(std::move(inStream), readerOptions); |
| |
| reader->preBuffer(stripesToPrefetch, columnsToPrefetch); |
| if (prefetchTwice) { |
| reader->preBuffer(stripesToPrefetch, columnsToPrefetch); |
| } |
| |
| return reader; |
| } |
| |
| class TestReadIntentFromNestedList |
| : public ::testing::TestWithParam< |
| std::tuple<std::vector<uint32_t>, std::list<uint64_t>, bool>> {}; |
| |
| TEST_P(TestReadIntentFromNestedList, testListAll) { |
| const auto& params = GetParam(); |
| const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params); |
| const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params); |
| bool prefetchTwice = std::get<2>(params); |
| |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| std::unique_ptr<Reader> reader = |
| createNestedListMemReader(memStream, stripesToPrefetch, columnsToPrefetch, prefetchTwice); |
| |
| // select all of int_array. |
| verifySelection(reader, {{1, ReadIntent_ALL}}, {0, 1, 2}); |
| } |
| |
| TEST_P(TestReadIntentFromNestedList, testListOffsets) { |
| const auto& params = GetParam(); |
| const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params); |
| const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params); |
| bool prefetchTwice = std::get<2>(params); |
| |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| std::unique_ptr<Reader> reader = |
| createNestedListMemReader(memStream, stripesToPrefetch, columnsToPrefetch, prefetchTwice); |
| |
| // select only the offsets of int_array. |
| verifySelection(reader, {{1, ReadIntent_OFFSETS}}, {0, 1}); |
| |
| // select only the offsets of int_array and the outermost offsets of |
| // int_array_array_array. |
| verifySelection(reader, {{1, ReadIntent_OFFSETS}, {3, ReadIntent_OFFSETS}}, {0, 1, 3}); |
| |
| // select the entire offsets of int_array_array_array without the elements. |
| verifySelection(reader, {{3, ReadIntent_OFFSETS}, {5, ReadIntent_OFFSETS}}, {0, 3, 4, 5}); |
| } |
| |
| TEST_P(TestReadIntentFromNestedList, testListAllAndOffsets) { |
| const auto& params = GetParam(); |
| const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params); |
| const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params); |
| bool prefetchTwice = std::get<2>(params); |
| |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| std::unique_ptr<Reader> reader = |
| createNestedListMemReader(memStream, stripesToPrefetch, columnsToPrefetch, prefetchTwice); |
| |
| // select all of int_array and only the outermost offsets of int_array_array_array. |
| verifySelection(reader, {{1, ReadIntent_ALL}, {3, ReadIntent_OFFSETS}}, {0, 1, 2, 3}); |
| } |
| |
| TEST_P(TestReadIntentFromNestedList, testListConflictingIntent) { |
| const auto& params = GetParam(); |
| const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params); |
| const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params); |
| bool prefetchTwice = std::get<2>(params); |
| |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| std::unique_ptr<Reader> reader = |
| createNestedListMemReader(memStream, stripesToPrefetch, columnsToPrefetch, prefetchTwice); |
| |
| // test conflicting ReadIntent on nested list. |
| verifySelection(reader, {{3, ReadIntent_OFFSETS}, {5, ReadIntent_ALL}}, {0, 3, 4, 5, 6}); |
| verifySelection(reader, {{3, ReadIntent_ALL}, {5, ReadIntent_OFFSETS}}, {0, 3, 4, 5, 6}); |
| } |
| |
| TEST_P(TestReadIntentFromNestedList, testRowBatchContent) { |
| const auto& params = GetParam(); |
| const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params); |
| const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params); |
| bool prefetchTwice = std::get<2>(params); |
| |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| std::unique_ptr<Reader> reader = |
| createNestedListMemReader(memStream, stripesToPrefetch, columnsToPrefetch, prefetchTwice); |
| |
| // select all of int_array and only the offsets of int_array_array. |
| RowReaderOptions::IdReadIntentMap idReadIntentMap = {{1, ReadIntent_ALL}, |
| {3, ReadIntent_OFFSETS}}; |
| RowReaderOptions rowReaderOpts; |
| rowReaderOpts.includeTypesWithIntents(idReadIntentMap); |
| std::unique_ptr<RowReader> rowReader = reader->createRowReader(rowReaderOpts); |
| |
| // Read a row batch. |
| std::unique_ptr<ColumnVectorBatch> batch = rowReader->createRowBatch(1024); |
| EXPECT_TRUE(rowReader->next(*batch)); |
| EXPECT_EQ(1, batch->numElements); |
| auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch); |
| |
| // verify content of int_array selection. |
| auto& intArrayBatch = dynamic_cast<ListVectorBatch&>(*structBatch.fields[0]); |
| auto& innerLongBatch = dynamic_cast<LongVectorBatch&>(*intArrayBatch.elements); |
| EXPECT_EQ(1, intArrayBatch.numElements); |
| EXPECT_NE(nullptr, intArrayBatch.offsets.data()); |
| EXPECT_EQ(0, intArrayBatch.offsets.data()[0]); |
| EXPECT_EQ(2, intArrayBatch.offsets.data()[1]); |
| EXPECT_EQ(2, innerLongBatch.numElements); |
| EXPECT_NE(nullptr, innerLongBatch.data.data()); |
| EXPECT_EQ(-1, innerLongBatch.data.data()[0]); |
| EXPECT_EQ(-2, innerLongBatch.data.data()[1]); |
| |
| // verify content of int_array_array_array selection. |
| auto& intArrayArrayArrayBatch = dynamic_cast<ListVectorBatch&>(*structBatch.fields[1]); |
| EXPECT_EQ(1, intArrayArrayArrayBatch.numElements); |
| EXPECT_NE(nullptr, intArrayArrayArrayBatch.offsets.data()); |
| EXPECT_EQ(0, intArrayArrayArrayBatch.offsets.data()[0]); |
| EXPECT_EQ(3, intArrayArrayArrayBatch.offsets.data()[1]); |
| EXPECT_EQ(nullptr, intArrayArrayArrayBatch.elements); |
| } |
| |
| INSTANTIATE_TEST_SUITE_P( |
| TestReadIntentFromNestedListInstance, TestReadIntentFromNestedList, |
| ::testing::Values( |
| std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{}, true), |
| std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{}, false), |
| std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{1, 3}, true), |
| std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{1, 3}, false), |
| std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{}, true), |
| std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{}, false), |
| std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{1, 3}, true), |
| std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{1, 3}, false), |
| std::make_tuple(std::vector<uint32_t>{1000}, std::list<uint64_t>{1000}, true), |
| std::make_tuple(std::vector<uint32_t>{1000}, std::list<uint64_t>{1000}, false))); |
| |
| std::unique_ptr<Reader> createNestedMapMemReader(MemoryOutputStream& memStream, |
| const std::vector<uint32_t>& stripesToPrefetch, |
| const std::list<uint64_t>& columnsToPrefetch, |
| bool prefetchTwice) { |
| MemoryPool* pool = getDefaultPool(); |
| |
| auto type = std::unique_ptr<Type>( |
| Type::buildTypeFromString("struct<" |
| "id:int," |
| "single_map:map<string,string>," |
| "nested_map:map<string,map<string,map<string,string>>>" |
| ">")); |
| WriterOptions options; |
| options.setStripeSize(1024 * 1024) |
| .setCompressionBlockSize(1024) |
| .setMemoryBlockSize(64) |
| .setCompression(CompressionKind_NONE) |
| .setMemoryPool(pool) |
| .setRowIndexStride(1000); |
| |
| auto writer = createWriter(*type, &memStream, options); |
| auto batch = writer->createRowBatch(100); |
| auto& type0StructBatch = dynamic_cast<StructVectorBatch&>(*batch); |
| auto& type1LongBatch = dynamic_cast<LongVectorBatch&>(*type0StructBatch.fields[0]); |
| auto& type2MapBatch = dynamic_cast<MapVectorBatch&>(*type0StructBatch.fields[1]); |
| auto& type3StringBatch = dynamic_cast<StringVectorBatch&>(*type2MapBatch.keys); |
| auto& type4StringBatch = dynamic_cast<StringVectorBatch&>(*type2MapBatch.elements); |
| auto& type5MapBatch = dynamic_cast<MapVectorBatch&>(*type0StructBatch.fields[2]); |
| auto& type6StringBatch = dynamic_cast<StringVectorBatch&>(*type5MapBatch.keys); |
| auto& type7MapBatch = dynamic_cast<MapVectorBatch&>(*type5MapBatch.elements); |
| auto& type8StringBatch = dynamic_cast<StringVectorBatch&>(*type7MapBatch.keys); |
| auto& type9MapBatch = dynamic_cast<MapVectorBatch&>(*type7MapBatch.elements); |
| auto& type10StringBatch = dynamic_cast<StringVectorBatch&>(*type9MapBatch.keys); |
| auto& type11StringBatch = dynamic_cast<StringVectorBatch&>(*type9MapBatch.elements); |
| |
| std::string map2Key = "k0"; |
| std::string map2Element = "v0"; |
| std::string map5Key = "k1"; |
| std::string map7Key = "k2"; |
| std::string map9Key = "k3"; |
| std::string map9Element = "v3"; |
| |
| type11StringBatch.numElements = 1; |
| type11StringBatch.data[0] = const_cast<char*>(map9Element.c_str()); |
| type11StringBatch.length[0] = static_cast<int64_t>(map9Element.length()); |
| |
| type10StringBatch.numElements = 1; |
| type10StringBatch.data[0] = const_cast<char*>(map9Key.c_str()); |
| type10StringBatch.length[0] = static_cast<int64_t>(map9Key.length()); |
| |
| type9MapBatch.numElements = 1; |
| type9MapBatch.offsets[0] = 0; |
| type9MapBatch.offsets[1] = 1; |
| |
| type8StringBatch.numElements = 1; |
| type8StringBatch.data[0] = const_cast<char*>(map7Key.c_str()); |
| type8StringBatch.length[0] = static_cast<int64_t>(map7Key.length()); |
| |
| type7MapBatch.numElements = 1; |
| type7MapBatch.offsets[0] = 0; |
| type7MapBatch.offsets[1] = 1; |
| |
| type6StringBatch.numElements = 1; |
| type6StringBatch.data[0] = const_cast<char*>(map5Key.c_str()); |
| type6StringBatch.length[0] = static_cast<int64_t>(map5Key.length()); |
| |
| type5MapBatch.numElements = 1; |
| type5MapBatch.offsets[0] = 0; |
| type5MapBatch.offsets[1] = 1; |
| |
| type4StringBatch.numElements = 1; |
| type4StringBatch.data[0] = const_cast<char*>(map2Element.c_str()); |
| type4StringBatch.length[0] = static_cast<int64_t>(map2Element.length()); |
| |
| type3StringBatch.numElements = 1; |
| type3StringBatch.data[0] = const_cast<char*>(map2Key.c_str()); |
| type3StringBatch.length[0] = static_cast<int64_t>(map2Key.length()); |
| |
| type2MapBatch.numElements = 1; |
| type2MapBatch.offsets[0] = 0; |
| type2MapBatch.offsets[1] = 1; |
| |
| type1LongBatch.numElements = 1; |
| type1LongBatch.data[0] = 0; |
| |
| type0StructBatch.numElements = 1; |
| |
| writer->add(*batch); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| ReaderOptions readerOptions; |
| readerOptions.setMemoryPool(*pool); |
| auto reader = createReader(std::move(inStream), readerOptions); |
| |
| reader->preBuffer(stripesToPrefetch, columnsToPrefetch); |
| if (prefetchTwice) { |
| reader->preBuffer(stripesToPrefetch, columnsToPrefetch); |
| } |
| return reader; |
| } |
| |
| class TestReadIntentFromNestedMap |
| : public ::testing::TestWithParam< |
| std::tuple<std::vector<uint32_t>, std::list<uint64_t>, bool>> {}; |
| |
| TEST_P(TestReadIntentFromNestedMap, testMapAll) { |
| const auto& params = GetParam(); |
| const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params); |
| const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params); |
| bool prefetchTwice = std::get<2>(params); |
| |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| std::unique_ptr<Reader> reader = |
| createNestedMapMemReader(memStream, stripesToPrefetch, columnsToPrefetch, prefetchTwice); |
| |
| // select all of single_map. |
| verifySelection(reader, {{2, ReadIntent_ALL}}, {0, 2, 3, 4}); |
| } |
| |
| TEST_P(TestReadIntentFromNestedMap, testMapOffsets) { |
| const auto& params = GetParam(); |
| const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params); |
| const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params); |
| bool prefetchTwice = std::get<2>(params); |
| |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| std::unique_ptr<Reader> reader = |
| createNestedMapMemReader(memStream, stripesToPrefetch, columnsToPrefetch, prefetchTwice); |
| |
| // select only the offsets of single_map. |
| verifySelection(reader, {{2, ReadIntent_OFFSETS}}, {0, 2}); |
| |
| // select only the offsets of single_map and the outermost offsets of nested_map. |
| verifySelection(reader, {{2, ReadIntent_OFFSETS}, {5, ReadIntent_OFFSETS}}, {0, 2, 5}); |
| |
| // select the entire offsets of nested_map without the map items of the innermost map. |
| verifySelection(reader, {{5, ReadIntent_OFFSETS}, {9, ReadIntent_OFFSETS}}, {0, 5, 7, 9}); |
| } |
| |
| TEST_P(TestReadIntentFromNestedMap, testMapAllAndOffsets) { |
| const auto& params = GetParam(); |
| const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params); |
| const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params); |
| bool prefetchTwice = std::get<2>(params); |
| |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| std::unique_ptr<Reader> reader = |
| createNestedMapMemReader(memStream, stripesToPrefetch, columnsToPrefetch, prefetchTwice); |
| |
| // select all of single_map and only the outermost offsets of nested_map. |
| verifySelection(reader, {{2, ReadIntent_ALL}, {5, ReadIntent_OFFSETS}}, {0, 2, 3, 4, 5}); |
| } |
| |
| TEST_P(TestReadIntentFromNestedMap, testMapConflictingIntent) { |
| const auto& params = GetParam(); |
| const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params); |
| const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params); |
| bool prefetchTwice = std::get<2>(params); |
| |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| std::unique_ptr<Reader> reader = |
| createNestedMapMemReader(memStream, stripesToPrefetch, columnsToPrefetch, prefetchTwice); |
| |
| // test conflicting ReadIntent on nested_map. |
| verifySelection(reader, {{5, ReadIntent_OFFSETS}, {9, ReadIntent_ALL}}, {0, 5, 7, 9, 10, 11}); |
| verifySelection(reader, {{5, ReadIntent_ALL}, {9, ReadIntent_OFFSETS}}, |
| {0, 5, 6, 7, 8, 9, 10, 11}); |
| verifySelection(reader, {{5, ReadIntent_OFFSETS}, {7, ReadIntent_ALL}, {9, ReadIntent_OFFSETS}}, |
| {0, 5, 7, 8, 9, 10, 11}); |
| } |
| |
| TEST_P(TestReadIntentFromNestedMap, testMapRowBatchContent) { |
| const auto& params = GetParam(); |
| const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params); |
| const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params); |
| bool prefetchTwice = std::get<2>(params); |
| |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| std::unique_ptr<Reader> reader = |
| createNestedMapMemReader(memStream, stripesToPrefetch, columnsToPrefetch, prefetchTwice); |
| |
| // select all of single_map and only the offsets of nested_map. |
| RowReaderOptions::IdReadIntentMap idReadIntentMap = {{2, ReadIntent_ALL}, |
| {5, ReadIntent_OFFSETS}}; |
| RowReaderOptions rowReaderOpts; |
| rowReaderOpts.includeTypesWithIntents(idReadIntentMap); |
| std::unique_ptr<RowReader> rowReader = reader->createRowReader(rowReaderOpts); |
| |
| // Read a row batch. |
| std::unique_ptr<ColumnVectorBatch> batch = rowReader->createRowBatch(1024); |
| EXPECT_TRUE(rowReader->next(*batch)); |
| EXPECT_EQ(1, batch->numElements); |
| auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch); |
| |
| // verify content of single_map selection. |
| auto& mapBatch = dynamic_cast<MapVectorBatch&>(*structBatch.fields[0]); |
| auto& keyBatch = dynamic_cast<StringVectorBatch&>(*mapBatch.keys); |
| auto& valueBatch = dynamic_cast<StringVectorBatch&>(*mapBatch.elements); |
| EXPECT_EQ(1, mapBatch.numElements); |
| EXPECT_NE(nullptr, mapBatch.offsets.data()); |
| EXPECT_EQ(0, mapBatch.offsets.data()[0]); |
| EXPECT_EQ(1, mapBatch.offsets.data()[1]); |
| // verify key content. |
| EXPECT_EQ(1, keyBatch.numElements); |
| EXPECT_NE(nullptr, keyBatch.length.data()); |
| EXPECT_NE(nullptr, keyBatch.data.data()); |
| EXPECT_EQ(2, keyBatch.length.data()[0]); |
| EXPECT_EQ(0, strncmp("k0", keyBatch.data.data()[0], 2)); |
| // verify value content. |
| EXPECT_EQ(1, valueBatch.numElements); |
| EXPECT_NE(nullptr, valueBatch.length.data()); |
| EXPECT_NE(nullptr, valueBatch.data.data()); |
| EXPECT_EQ(2, valueBatch.length.data()[0]); |
| EXPECT_EQ(0, strncmp("v0", valueBatch.data.data()[0], 2)); |
| |
| // verify content of nested_map selection. |
| auto& nestedMapBatch = dynamic_cast<MapVectorBatch&>(*structBatch.fields[1]); |
| EXPECT_EQ(1, nestedMapBatch.numElements); |
| EXPECT_NE(nullptr, nestedMapBatch.offsets.data()); |
| EXPECT_EQ(0, nestedMapBatch.offsets.data()[0]); |
| EXPECT_EQ(1, nestedMapBatch.offsets.data()[1]); |
| EXPECT_EQ(nullptr, nestedMapBatch.keys); |
| EXPECT_EQ(nullptr, nestedMapBatch.elements); |
| } |
| |
| INSTANTIATE_TEST_SUITE_P( |
| TestReadIntentFromNestedMapInstance, TestReadIntentFromNestedMap, |
| ::testing::Values( |
| std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{}, true), |
| std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{}, false), |
| std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{1, 5}, true), |
| std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{1, 5}, false), |
| std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{}, true), |
| std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{}, false), |
| std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{1, 5}, true), |
| std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{1, 5}, false), |
| std::make_tuple(std::vector<uint32_t>{1000}, std::list<uint64_t>{1000}, true), |
| std::make_tuple(std::vector<uint32_t>{1000}, std::list<uint64_t>{1000}, false))); |
| |
| std::unique_ptr<Reader> createNestedUnionMemReader(MemoryOutputStream& memStream, |
| const std::vector<uint32_t>& stripesToPrefetch, |
| const std::list<uint64_t>& columnsToPrefetch, |
| bool prefetchTwice) { |
| MemoryPool* pool = getDefaultPool(); |
| |
| auto type = std::unique_ptr<Type>( |
| Type::buildTypeFromString("struct<" |
| "id:int," |
| "single_union:uniontype<int,string>," |
| "nested_union:uniontype<uniontype<int,uniontype<int,string>>,int>" |
| ">")); |
| WriterOptions options; |
| options.setStripeSize(1024 * 1024) |
| .setCompressionBlockSize(1024) |
| .setMemoryBlockSize(64) |
| .setCompression(CompressionKind_NONE) |
| .setMemoryPool(pool) |
| .setRowIndexStride(1000); |
| |
| auto writer = createWriter(*type, &memStream, options); |
| auto batch = writer->createRowBatch(100); |
| auto& type0StructBatch = dynamic_cast<StructVectorBatch&>(*batch); |
| auto& type1LongBatch = dynamic_cast<LongVectorBatch&>(*type0StructBatch.fields[0]); |
| auto& type2UnionBatch = dynamic_cast<UnionVectorBatch&>(*type0StructBatch.fields[1]); |
| auto& type3LongBatch = dynamic_cast<LongVectorBatch&>(*type2UnionBatch.children[0]); |
| auto& type4StringBatch = dynamic_cast<StringVectorBatch&>(*type2UnionBatch.children[1]); |
| auto& type5UnionBatch = dynamic_cast<UnionVectorBatch&>(*type0StructBatch.fields[2]); |
| auto& type6UnionBatch = dynamic_cast<UnionVectorBatch&>(*type5UnionBatch.children[0]); |
| auto& type7LongBatch = dynamic_cast<LongVectorBatch&>(*type6UnionBatch.children[0]); |
| auto& type8UnionBatch = dynamic_cast<UnionVectorBatch&>(*type6UnionBatch.children[1]); |
| auto& type9LongBatch = dynamic_cast<LongVectorBatch&>(*type8UnionBatch.children[0]); |
| auto& type10StringBatch = dynamic_cast<StringVectorBatch&>(*type8UnionBatch.children[1]); |
| auto& type11LongBatch = dynamic_cast<LongVectorBatch&>(*type5UnionBatch.children[1]); |
| |
| std::string string4Element = "s1"; |
| std::string string10Element = "n1"; |
| |
| // first row |
| type1LongBatch.data[0] = 0; |
| type2UnionBatch.tags[0] = 0; |
| type2UnionBatch.offsets[0] = 0; |
| type3LongBatch.data[0] = 0; |
| type5UnionBatch.tags[0] = 0; |
| type5UnionBatch.offsets[0] = 0; |
| type6UnionBatch.tags[0] = 1; |
| type6UnionBatch.offsets[0] = 0; |
| type8UnionBatch.tags[0] = 0; |
| type8UnionBatch.offsets[0] = 0; |
| type9LongBatch.data[0] = 1; |
| |
| // second row |
| type1LongBatch.data[1] = 1; |
| type2UnionBatch.tags[1] = 1; |
| type2UnionBatch.offsets[1] = 0; |
| type4StringBatch.data[0] = const_cast<char*>(string4Element.c_str()); |
| type4StringBatch.length[0] = static_cast<int64_t>(string4Element.length()); |
| type5UnionBatch.tags[1] = 0; |
| type5UnionBatch.offsets[1] = 1; |
| type6UnionBatch.tags[1] = 1; |
| type6UnionBatch.offsets[1] = 1; |
| type8UnionBatch.tags[1] = 1; |
| type8UnionBatch.offsets[1] = 0; |
| type10StringBatch.data[0] = const_cast<char*>(string10Element.c_str()); |
| type10StringBatch.length[0] = static_cast<int64_t>(string10Element.length()); |
| |
| // update numElements |
| type11LongBatch.numElements = 0; |
| type10StringBatch.numElements = 1; |
| type9LongBatch.numElements = 1; |
| type8UnionBatch.numElements = 2; |
| type7LongBatch.numElements = 0; |
| type6UnionBatch.numElements = 2; |
| type5UnionBatch.numElements = 2; |
| type4StringBatch.numElements = 1; |
| type3LongBatch.numElements = 1; |
| type2UnionBatch.numElements = 2; |
| type1LongBatch.numElements = 2; |
| type0StructBatch.numElements = 2; |
| |
| writer->add(*batch); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| ReaderOptions readerOptions; |
| readerOptions.setMemoryPool(*pool); |
| readerOptions.setReaderMetrics(nullptr); |
| auto reader = createReader(std::move(inStream), readerOptions); |
| |
| reader->preBuffer(stripesToPrefetch, columnsToPrefetch); |
| if (prefetchTwice) { |
| reader->preBuffer(stripesToPrefetch, columnsToPrefetch); |
| } |
| |
| return reader; |
| } |
| |
| class TestReadIntentFromNestedUnion |
| : public ::testing::TestWithParam< |
| std::tuple<std::vector<uint32_t>, std::list<uint64_t>, bool>> {}; |
| |
| TEST_P(TestReadIntentFromNestedUnion, testUnionAll) { |
| const auto& params = GetParam(); |
| const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params); |
| const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params); |
| bool prefetchTwice = std::get<2>(params); |
| |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| std::unique_ptr<Reader> reader = |
| createNestedUnionMemReader(memStream, stripesToPrefetch, columnsToPrefetch, prefetchTwice); |
| |
| // select all of single_union. |
| verifySelection(reader, {{2, ReadIntent_ALL}}, {0, 2, 3, 4}); |
| } |
| |
| TEST_P(TestReadIntentFromNestedUnion, testUnionOffsets) { |
| const auto& params = GetParam(); |
| const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params); |
| const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params); |
| bool prefetchTwice = std::get<2>(params); |
| |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| std::unique_ptr<Reader> reader = |
| createNestedUnionMemReader(memStream, stripesToPrefetch, columnsToPrefetch, prefetchTwice); |
| |
| // select only the offsets of single_union. |
| verifySelection(reader, {{2, ReadIntent_OFFSETS}}, {0, 2}); |
| |
| // select only the offsets of single_union and the outermost offsets of nested_union. |
| verifySelection(reader, {{2, ReadIntent_OFFSETS}, {5, ReadIntent_OFFSETS}}, {0, 2, 5}); |
| |
| // select only the offsets of single_union and the innermost offsets of nested_union. |
| verifySelection(reader, {{2, ReadIntent_OFFSETS}, {8, ReadIntent_OFFSETS}}, |
| {0, 2, 5, 6, 7, 8, 11}); |
| } |
| |
| TEST_P(TestReadIntentFromNestedUnion, testUnionAllAndOffsets) { |
| const auto& params = GetParam(); |
| const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params); |
| const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params); |
| bool prefetchTwice = std::get<2>(params); |
| |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| std::unique_ptr<Reader> reader = |
| createNestedUnionMemReader(memStream, stripesToPrefetch, columnsToPrefetch, prefetchTwice); |
| |
| // select all of single_union and only the outermost offsets of nested_union. |
| verifySelection(reader, {{2, ReadIntent_ALL}, {5, ReadIntent_OFFSETS}}, {0, 2, 3, 4, 5}); |
| } |
| |
| TEST_P(TestReadIntentFromNestedUnion, testUnionConflictingIntent) { |
| const auto& params = GetParam(); |
| const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params); |
| const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params); |
| bool prefetchTwice = std::get<2>(params); |
| |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| std::unique_ptr<Reader> reader = |
| createNestedUnionMemReader(memStream, stripesToPrefetch, columnsToPrefetch, prefetchTwice); |
| |
| // test conflicting ReadIntent on nested_union. |
| verifySelection(reader, {{5, ReadIntent_OFFSETS}, {8, ReadIntent_ALL}}, |
| {0, 5, 6, 7, 8, 9, 10, 11}); |
| verifySelection(reader, {{5, ReadIntent_ALL}, {8, ReadIntent_OFFSETS}}, |
| {0, 5, 6, 7, 8, 9, 10, 11}); |
| verifySelection(reader, {{5, ReadIntent_OFFSETS}, {6, ReadIntent_ALL}, {8, ReadIntent_OFFSETS}}, |
| {0, 5, 6, 7, 8, 9, 10, 11}); |
| } |
| |
| TEST_P(TestReadIntentFromNestedUnion, testUnionRowBatchContent) { |
| const auto& params = GetParam(); |
| const std::vector<uint32_t>& stripesToPrefetch = std::get<0>(params); |
| const std::list<uint64_t>& columnsToPrefetch = std::get<1>(params); |
| bool prefetchTwice = std::get<2>(params); |
| |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| std::unique_ptr<Reader> reader = |
| createNestedUnionMemReader(memStream, stripesToPrefetch, columnsToPrefetch, prefetchTwice); |
| |
| // select all of single_union and only the offsets of nested_union. |
| RowReaderOptions::IdReadIntentMap idReadIntentMap = {{2, ReadIntent_ALL}, |
| {5, ReadIntent_OFFSETS}}; |
| RowReaderOptions rowReaderOpts; |
| rowReaderOpts.includeTypesWithIntents(idReadIntentMap); |
| std::unique_ptr<RowReader> rowReader = reader->createRowReader(rowReaderOpts); |
| |
| // Read a row batch. |
| std::unique_ptr<ColumnVectorBatch> batch = rowReader->createRowBatch(1024); |
| EXPECT_TRUE(rowReader->next(*batch)); |
| EXPECT_EQ(2, batch->numElements); |
| auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch); |
| |
| // verify content of single_union selection. |
| auto& unionBatch = dynamic_cast<UnionVectorBatch&>(*structBatch.fields[0]); |
| EXPECT_EQ(2, unionBatch.numElements); |
| EXPECT_EQ(2, unionBatch.children.size()); |
| auto& longBatch = dynamic_cast<LongVectorBatch&>(*unionBatch.children[0]); |
| auto& stringBatch = dynamic_cast<StringVectorBatch&>(*unionBatch.children[1]); |
| EXPECT_EQ(1, longBatch.numElements); |
| EXPECT_EQ(1, stringBatch.numElements); |
| EXPECT_NE(nullptr, unionBatch.tags.data()); |
| EXPECT_NE(nullptr, unionBatch.offsets.data()); |
| EXPECT_NE(nullptr, longBatch.data.data()); |
| EXPECT_NE(nullptr, stringBatch.length.data()); |
| // verify content of the first row. |
| EXPECT_EQ(0, unionBatch.tags.data()[0]); |
| EXPECT_EQ(0, unionBatch.offsets.data()[0]); |
| EXPECT_EQ(0, longBatch.data.data()[0]); |
| // verify content of the second row. |
| EXPECT_EQ(1, unionBatch.tags.data()[1]); |
| EXPECT_EQ(0, unionBatch.offsets.data()[1]); |
| EXPECT_EQ(2, stringBatch.length.data()[0]); |
| EXPECT_EQ(0, strncmp("s1", stringBatch.data.data()[0], 2)); |
| |
| // verify content of nested_union selection. |
| auto& nestedUnionBatch = dynamic_cast<UnionVectorBatch&>(*structBatch.fields[1]); |
| EXPECT_EQ(2, nestedUnionBatch.numElements); |
| EXPECT_EQ(0, nestedUnionBatch.children.size()); |
| EXPECT_NE(nullptr, nestedUnionBatch.tags.data()); |
| EXPECT_NE(nullptr, nestedUnionBatch.offsets.data()); |
| // verify that tags and offsets are still read. |
| EXPECT_EQ(0, nestedUnionBatch.tags.data()[0]); |
| EXPECT_EQ(0, nestedUnionBatch.tags.data()[1]); |
| EXPECT_EQ(0, nestedUnionBatch.offsets.data()[0]); |
| EXPECT_EQ(1, nestedUnionBatch.offsets.data()[1]); |
| } |
| |
| INSTANTIATE_TEST_SUITE_P( |
| TestReadIntentFromNestedUnionInstance, TestReadIntentFromNestedUnion, |
| ::testing::Values( |
| std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{}, true), |
| std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{}, false), |
| std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{1, 2}, true), |
| std::make_tuple(std::vector<uint32_t>{}, std::list<uint64_t>{1, 2}, false), |
| std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{}, true), |
| std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{}, false), |
| std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{1, 2}, true), |
| std::make_tuple(std::vector<uint32_t>{0}, std::list<uint64_t>{1, 2}, false), |
| std::make_tuple(std::vector<uint32_t>{1000}, std::list<uint64_t>{1000}, true), |
| std::make_tuple(std::vector<uint32_t>{1000}, std::list<uint64_t>{1000}, false))); |
| |
| TEST(TestReadIntent, testSeekOverEmptyPresentStream) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| uint64_t rowCount = 5000; |
| |
| { |
| auto type = std::unique_ptr<Type>( |
| Type::buildTypeFromString("struct<col1:struct<col2:int>,col3:struct<col4:int>," |
| "col5:array<int>,col6:map<int,int>>")); |
| WriterOptions options; |
| options.setStripeSize(1024 * 1024) |
| .setCompressionBlockSize(1024) |
| .setMemoryBlockSize(64) |
| .setCompression(CompressionKind_NONE) |
| .setMemoryPool(pool) |
| .setRowIndexStride(1000); |
| |
| // the child columns of the col3,col5,col6 have the empty present stream |
| auto writer = createWriter(*type, &memStream, options); |
| auto batch = writer->createRowBatch(rowCount); |
| auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch); |
| auto& structBatch1 = dynamic_cast<StructVectorBatch&>(*structBatch.fields[0]); |
| auto& structBatch2 = dynamic_cast<StructVectorBatch&>(*structBatch.fields[1]); |
| auto& listBatch = dynamic_cast<ListVectorBatch&>(*structBatch.fields[2]); |
| auto& mapBatch = dynamic_cast<MapVectorBatch&>(*structBatch.fields[3]); |
| |
| auto& longBatch1 = dynamic_cast<LongVectorBatch&>(*structBatch1.fields[0]); |
| auto& longBatch2 = dynamic_cast<LongVectorBatch&>(*structBatch2.fields[0]); |
| auto& longBatch3 = dynamic_cast<LongVectorBatch&>(*listBatch.elements); |
| auto& longKeyBatch = dynamic_cast<LongVectorBatch&>(*mapBatch.keys); |
| auto& longValueBatch = dynamic_cast<LongVectorBatch&>(*mapBatch.elements); |
| |
| structBatch.numElements = rowCount; |
| structBatch1.numElements = rowCount; |
| structBatch2.numElements = rowCount; |
| listBatch.numElements = rowCount; |
| mapBatch.numElements = rowCount; |
| longBatch1.numElements = rowCount; |
| longBatch2.numElements = rowCount; |
| longBatch3.numElements = rowCount; |
| longKeyBatch.numElements = rowCount; |
| longValueBatch.numElements = rowCount; |
| |
| structBatch1.hasNulls = false; |
| structBatch2.hasNulls = true; |
| listBatch.hasNulls = true; |
| mapBatch.hasNulls = true; |
| longBatch1.hasNulls = false; |
| longBatch2.hasNulls = true; |
| longBatch3.hasNulls = true; |
| longKeyBatch.hasNulls = true; |
| longValueBatch.hasNulls = true; |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| longBatch1.data[i] = static_cast<int64_t>(i); |
| longBatch1.notNull[i] = 1; |
| |
| structBatch2.notNull[i] = 0; |
| listBatch.notNull[i] = 0; |
| listBatch.offsets[i] = 0; |
| mapBatch.notNull[i] = 0; |
| longBatch2.notNull[i] = 0; |
| longBatch3.notNull[i] = 0; |
| longKeyBatch.notNull[i] = 0; |
| longValueBatch.notNull[i] = 0; |
| } |
| writer->add(*batch); |
| writer->close(); |
| } |
| { |
| auto inStream = |
| std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| ReaderOptions readerOptions; |
| readerOptions.setMemoryPool(*pool); |
| std::unique_ptr<Reader> reader = createReader(std::move(inStream), readerOptions); |
| EXPECT_EQ(rowCount, reader->getNumberOfRows()); |
| std::unique_ptr<RowReader> rowReader = reader->createRowReader(RowReaderOptions()); |
| auto batch = rowReader->createRowBatch(1000); |
| // seek over the empty present stream |
| rowReader->seekToRow(2000); |
| EXPECT_TRUE(rowReader->next(*batch)); |
| EXPECT_EQ(1000, batch->numElements); |
| auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch); |
| auto& structBatch1 = dynamic_cast<StructVectorBatch&>(*structBatch.fields[0]); |
| auto& structBatch2 = dynamic_cast<StructVectorBatch&>(*structBatch.fields[1]); |
| auto& listBatch = dynamic_cast<ListVectorBatch&>(*structBatch.fields[2]); |
| auto& mapBatch = dynamic_cast<MapVectorBatch&>(*structBatch.fields[3]); |
| |
| auto& longBatch1 = dynamic_cast<LongVectorBatch&>(*structBatch1.fields[0]); |
| for (uint64_t i = 0; i < 1000; ++i) { |
| EXPECT_EQ(longBatch1.data[i], static_cast<int64_t>(i + 2000)); |
| EXPECT_TRUE(longBatch1.notNull[i]); |
| EXPECT_FALSE(structBatch2.notNull[i]); |
| EXPECT_FALSE(listBatch.notNull[i]); |
| EXPECT_FALSE(mapBatch.notNull[i]); |
| } |
| } |
| } |
| |
| namespace { |
| |
| uint64_t writeSampleData(MemoryOutputStream& memStream, uint64_t stripeSize = 1024, |
| uint64_t rowsPerStripe = 1000) { |
| auto type = Type::buildTypeFromString("struct<id:int,name:string,value:double>"); |
| WriterOptions options; |
| options.setStripeSize(stripeSize) |
| .setCompressionBlockSize(1024) |
| .setMemoryBlockSize(64) |
| .setCompression(CompressionKind_NONE) |
| .setRowIndexStride(100); |
| auto writer = createWriter(*type, &memStream, options); |
| |
| uint64_t totalRows = rowsPerStripe * 3; |
| uint64_t batchSize = 100; |
| std::vector<std::string> names(totalRows, ""); |
| |
| for (uint64_t startRow = 0; startRow < totalRows; startRow += batchSize) { |
| uint64_t currentBatchSize = std::min(batchSize, totalRows - startRow); |
| auto batch = writer->createRowBatch(currentBatchSize); |
| auto& structBatch = static_cast<StructVectorBatch&>(*batch); |
| auto& idBatch = static_cast<LongVectorBatch&>(*structBatch.fields[0]); |
| auto& nameBatch = static_cast<StringVectorBatch&>(*structBatch.fields[1]); |
| auto& valueBatch = static_cast<DoubleVectorBatch&>(*structBatch.fields[2]); |
| |
| for (uint64_t i = 0; i < currentBatchSize; ++i) { |
| idBatch.data[i] = static_cast<int64_t>(startRow + i); |
| names[startRow + i] = "name_" + std::to_string(startRow + i); |
| nameBatch.data[i] = const_cast<char*>(names[startRow + i].c_str()); |
| nameBatch.length[i] = static_cast<int64_t>(names[startRow + i].length()); |
| valueBatch.data[i] = static_cast<double>(startRow + i) * 1.5; |
| } |
| |
| structBatch.numElements = currentBatchSize; |
| writer->add(*batch); |
| } |
| |
| writer->close(); |
| return totalRows; |
| } |
| |
| uint64_t readAllRows(RowReader& rowReader, uint64_t batchSize = 1000) { |
| auto batch = rowReader.createRowBatch(batchSize); |
| uint64_t totalRows = 0; |
| while (rowReader.next(*batch)) { |
| totalRows += batch->numElements; |
| } |
| return totalRows; |
| } |
| |
| } // namespace |
| |
| TEST(TestAsyncPrefetch, testAsyncPrefetchCorrectnessWithMultipleStripes) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| uint64_t totalRows = writeSampleData(memStream); |
| |
| auto reader = createReader( |
| std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()), {}); |
| ASSERT_GE(reader->getNumberOfStripes(), 2UL); |
| |
| auto rowReader = reader->createRowReader(RowReaderOptions{}.setEnableAsyncPrefetch(true)); |
| EXPECT_EQ(readAllRows(*rowReader), totalRows); |
| } |
| |
| TEST(TestAsyncPrefetch, testAsyncPrefetchWithVariousConfigurations) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| std::ignore = writeSampleData(memStream); |
| auto reader = createReader( |
| std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()), {}); |
| ASSERT_GE(reader->getNumberOfStripes(), 2UL); |
| |
| struct TestCase { |
| std::string name; |
| std::function<void(RowReaderOptions&)> configureOptions; |
| std::function<void(RowReader&, uint64_t)> validateReader; |
| }; |
| |
| std::vector<TestCase> testCases = { |
| {"WithColumnSelection", |
| [](RowReaderOptions& opts) { |
| opts.include({0, 2}); // Select only id and value columns |
| }, |
| [](RowReader& reader, uint64_t expectedRows) { |
| auto batch = reader.createRowBatch(1000); |
| uint64_t totalRows = 0; |
| while (reader.next(*batch)) { |
| totalRows += batch->numElements; |
| // Verify that only selected columns are present |
| auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch); |
| EXPECT_EQ(2, structBatch.fields.size()); |
| } |
| EXPECT_EQ(totalRows, expectedRows); |
| }}, |
| {"WithSeek", [](RowReaderOptions& /*opts*/) { /* no additional config */ }, |
| [](RowReader& reader, uint64_t expectedRows) { |
| auto batch = reader.createRowBatch(500); |
| |
| // Read some data first |
| EXPECT_TRUE(reader.next(*batch)); |
| EXPECT_GT(batch->numElements, 0UL); |
| |
| // Seek to middle and continue reading |
| uint64_t seekPosition = expectedRows / 2; |
| reader.seekToRow(seekPosition); |
| |
| EXPECT_TRUE(reader.next(*batch)); |
| EXPECT_GT(batch->numElements, 0UL); |
| EXPECT_GE(reader.getRowNumber(), seekPosition); |
| }}, |
| {"WithRange", |
| [](RowReaderOptions& /*opts*/) { |
| // Read only middle portion - will be set dynamically in test |
| }, |
| [](RowReader& reader, uint64_t expectedRows) { |
| uint64_t totalRows = readAllRows(reader); |
| // Should read approximately the specified range (allowing tolerance for stripe |
| // boundaries) |
| EXPECT_LE(totalRows, expectedRows / 2 + 1000); |
| }}}; |
| |
| for (const auto& testCase : testCases) { |
| SCOPED_TRACE("Testing async prefetch " + testCase.name); |
| |
| RowReaderOptions options; |
| options.setEnableAsyncPrefetch(true); |
| |
| if (testCase.name == "WithRange") { |
| uint64_t startRow = reader->getNumberOfRows() / 4; |
| uint64_t endRow = 3 * reader->getNumberOfRows() / 4; |
| options.range(startRow, endRow - startRow); |
| } else { |
| testCase.configureOptions(options); |
| } |
| |
| auto rowReader = reader->createRowReader(options); |
| testCase.validateReader(*rowReader, reader->getNumberOfRows()); |
| } |
| } |
| |
| TEST(TestAsyncPrefetch, testAsyncPrefetchEdgeCases) { |
| // Test with single stripe |
| { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| uint64_t totalRows = writeSampleData(memStream, 1024 * 1024, 100); |
| |
| auto reader = createReader( |
| std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()), {}); |
| ASSERT_EQ(1UL, reader->getNumberOfStripes()); |
| |
| auto rowReader = reader->createRowReader(RowReaderOptions{}.setEnableAsyncPrefetch(true)); |
| ASSERT_EQ(readAllRows(*rowReader), totalRows); |
| } |
| |
| // Test basic functionality with reader metrics (ensure no crashes) |
| { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| uint64_t totalRows = writeSampleData(memStream); |
| |
| auto reader = createReader( |
| std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()), {}); |
| ASSERT_GE(reader->getNumberOfStripes(), 2UL); |
| |
| auto rowReader = reader->createRowReader(RowReaderOptions{}.setEnableAsyncPrefetch(true)); |
| ASSERT_EQ(readAllRows(*rowReader), totalRows); |
| } |
| } |
| |
| class IOCountingInputStream : public InputStream { |
| private: |
| std::unique_ptr<InputStream> wrapped_; |
| mutable std::atomic<uint64_t> readCount_; |
| |
| public: |
| IOCountingInputStream(std::unique_ptr<InputStream> wrapped) |
| : wrapped_(std::move(wrapped)), readCount_(0) {} |
| |
| uint64_t getLength() const override { |
| return wrapped_->getLength(); |
| } |
| |
| uint64_t getNaturalReadSize() const override { |
| return wrapped_->getNaturalReadSize(); |
| } |
| |
| void read(void* buf, uint64_t length, uint64_t offset) override { |
| readCount_.fetch_add(1, std::memory_order_relaxed); |
| wrapped_->read(buf, length, offset); |
| } |
| |
| const std::string& getName() const override { |
| return wrapped_->getName(); |
| } |
| |
| uint64_t getReadCount() const { |
| return readCount_.load(std::memory_order_relaxed); |
| } |
| |
| void resetReadCount() { |
| readCount_.store(0, std::memory_order_relaxed); |
| } |
| }; |
| |
| TEST(TestSmallStripeLookAhead, testIOCountWithDifferentLimits) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| uint64_t totalRows = writeSampleData(memStream, /*stripeSize*/ 1024, /*rowsPerStripe*/ 200); |
| EXPECT_GT(totalRows, 0UL); |
| |
| uint64_t noPrefetchIOCount = 0; |
| uint64_t smallLimitIOCount = 0; |
| uint64_t largeLimitIOCount = 0; |
| uint64_t zeroLimitIOCount = 0; |
| |
| // Test 1: No async prefetch - should have most I/O operations |
| { |
| auto countingStream = std::make_unique<IOCountingInputStream>( |
| std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength())); |
| auto* countingPtr = countingStream.get(); |
| |
| ReaderOptions readerOptions; |
| auto reader = createReader(std::move(countingStream), readerOptions); |
| |
| RowReaderOptions rowReaderOptions; |
| rowReaderOptions.setEnableAsyncPrefetch(false); |
| auto rowReader = reader->createRowReader(rowReaderOptions); |
| |
| countingPtr->resetReadCount(); |
| uint64_t readRows = readAllRows(*rowReader); |
| |
| noPrefetchIOCount = countingPtr->getReadCount(); |
| EXPECT_EQ(readRows, totalRows); |
| EXPECT_GT(noPrefetchIOCount, 0UL); |
| } |
| |
| // Test 2: Async prefetch with small look ahead limit |
| { |
| auto countingStream = std::make_unique<IOCountingInputStream>( |
| std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength())); |
| auto* countingPtr = countingStream.get(); |
| |
| ReaderOptions readerOptions; |
| auto reader = createReader(std::move(countingStream), readerOptions); |
| |
| RowReaderOptions rowReaderOptions; |
| rowReaderOptions.setEnableAsyncPrefetch(true); |
| rowReaderOptions.setSmallStripeLookAheadLimit(1); // Small limit (only 1 stripe ahead) |
| auto rowReader = reader->createRowReader(rowReaderOptions); |
| |
| countingPtr->resetReadCount(); |
| uint64_t readRows = readAllRows(*rowReader); |
| |
| smallLimitIOCount = countingPtr->getReadCount(); |
| EXPECT_EQ(readRows, totalRows); |
| EXPECT_GT(smallLimitIOCount, 0UL); |
| } |
| |
| // Test 3: Async prefetch with large look ahead limit |
| { |
| auto countingStream = std::make_unique<IOCountingInputStream>( |
| std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength())); |
| auto* countingPtr = countingStream.get(); |
| |
| ReaderOptions readerOptions; |
| auto reader = createReader(std::move(countingStream), readerOptions); |
| |
| RowReaderOptions rowReaderOptions; |
| rowReaderOptions.setEnableAsyncPrefetch(true); |
| rowReaderOptions.setSmallStripeLookAheadLimit(3); // Large limit (all remaining stripes) |
| auto rowReader = reader->createRowReader(rowReaderOptions); |
| |
| countingPtr->resetReadCount(); |
| uint64_t readRows = readAllRows(*rowReader); |
| |
| largeLimitIOCount = countingPtr->getReadCount(); |
| EXPECT_EQ(readRows, totalRows); |
| EXPECT_GT(largeLimitIOCount, 0UL); |
| } |
| |
| // Test 4: Async prefetch with zero look ahead limit |
| { |
| auto countingStream = std::make_unique<IOCountingInputStream>( |
| std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength())); |
| auto* countingPtr = countingStream.get(); |
| |
| ReaderOptions readerOptions; |
| auto reader = createReader(std::move(countingStream), readerOptions); |
| |
| RowReaderOptions rowReaderOptions; |
| rowReaderOptions.setEnableAsyncPrefetch(true); |
| rowReaderOptions.setSmallStripeLookAheadLimit(0); // Zero limit |
| auto rowReader = reader->createRowReader(rowReaderOptions); |
| |
| countingPtr->resetReadCount(); |
| uint64_t readRows = readAllRows(*rowReader); |
| |
| zeroLimitIOCount = countingPtr->getReadCount(); |
| EXPECT_EQ(readRows, totalRows); |
| EXPECT_GT(zeroLimitIOCount, 0UL); |
| } |
| |
| EXPECT_LT(zeroLimitIOCount, noPrefetchIOCount); |
| EXPECT_LT(smallLimitIOCount, zeroLimitIOCount); |
| EXPECT_LT(largeLimitIOCount, smallLimitIOCount); |
| } |
| |
| } // namespace orc |