| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "orc/ColumnPrinter.hh" |
| #include "orc/OrcFile.hh" |
| |
| #include "MemoryInputStream.hh" |
| #include "MemoryOutputStream.hh" |
| #include "Reader.hh" |
| |
| #include "wrap/gmock.h" |
| #include "wrap/gtest-wrapper.h" |
| |
| #include <cmath> |
| #include <ctime> |
| #include <sstream> |
| |
| #ifdef __clang__ |
| DIAGNOSTIC_IGNORE("-Wmissing-variable-declarations") |
| #endif |
| |
| namespace orc { |
| |
| using ::testing::TestWithParam; |
| using ::testing::Values; |
| |
| const int DEFAULT_MEM_STREAM_SIZE = 100 * 1024 * 1024; // 100M |
| |
| std::unique_ptr<Writer> createWriter(uint64_t stripeSize, uint64_t compresionblockSize, |
| CompressionKind compression, const Type& type, |
| MemoryPool* memoryPool, OutputStream* stream, |
| FileVersion version, uint64_t stride = 0, |
| const std::string& timezone = "GMT", |
| bool useTightNumericVector = false) { |
| WriterOptions options; |
| options.setStripeSize(stripeSize); |
| options.setCompressionBlockSize(compresionblockSize); |
| options.setCompression(compression); |
| options.setMemoryPool(memoryPool); |
| options.setRowIndexStride(stride); |
| options.setFileVersion(version); |
| options.setTimezoneName(timezone); |
| options.setUseTightNumericVector(useTightNumericVector); |
| return createWriter(type, stream, options); |
| } |
| |
| std::unique_ptr<Reader> createReader(MemoryPool* memoryPool, |
| std::unique_ptr<InputStream> stream) { |
| ReaderOptions options; |
| options.setMemoryPool(*memoryPool); |
| return createReader(std::move(stream), options); |
| } |
| |
| std::unique_ptr<RowReader> createRowReader(Reader* reader, const std::string& timezone = "GMT", |
| bool useTightNumericVector = false) { |
| RowReaderOptions rowReaderOpts; |
| rowReaderOpts.setTimezoneName(timezone); |
| rowReaderOpts.setUseTightNumericVector(useTightNumericVector); |
| return reader->createRowReader(rowReaderOpts); |
| } |
| |
| class WriterTest : public TestWithParam<FileVersion> { |
| // You can implement all the usual fixture class members here. |
| // To access the test parameter, call GetParam() from class |
| // TestWithParam<T>. |
| void SetUp() override; |
| |
| protected: |
| FileVersion fileVersion; |
| |
| public: |
| WriterTest() : fileVersion(FileVersion::v_0_11()) {} |
| }; |
| |
| void WriterTest::SetUp() { |
| fileVersion = GetParam(); |
| } |
| |
| TEST_P(WriterTest, writeEmptyFile) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:int>")); |
| |
| uint64_t stripeSize = 16 * 1024; // 16K |
| uint64_t compressionBlockSize = 1024; // 1k |
| |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| EXPECT_EQ(fileVersion, reader->getFormatVersion()); |
| EXPECT_EQ(WriterVersion_ORC_135, reader->getWriterVersion()); |
| EXPECT_EQ(0, reader->getNumberOfRows()); |
| |
| WriterId writerId = WriterId::ORC_CPP_WRITER; |
| EXPECT_EQ(writerId, reader->getWriterId()); |
| EXPECT_EQ(1, reader->getWriterIdValue()); |
| |
| std::unique_ptr<ColumnVectorBatch> batch = rowReader->createRowBatch(1024); |
| EXPECT_FALSE(rowReader->next(*batch)); |
| } |
| |
| TEST_P(WriterTest, writeIntFileOneStripe) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:int>")); |
| |
| uint64_t stripeSize = 16 * 1024; // 16K |
| uint64_t compressionBlockSize = 1024; // 1k |
| |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion); |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(1024); |
| StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| LongVectorBatch* longBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[0]); |
| |
| for (uint64_t i = 0; i < 1024; ++i) { |
| longBatch->data[i] = static_cast<int64_t>(i); |
| } |
| structBatch->numElements = 1024; |
| longBatch->numElements = 1024; |
| |
| writer->add(*batch); |
| |
| for (uint64_t i = 1024; i < 2000; ++i) { |
| longBatch->data[i - 1024] = static_cast<int64_t>(i); |
| } |
| structBatch->numElements = 2000 - 1024; |
| longBatch->numElements = 2000 - 1024; |
| |
| writer->add(*batch); |
| writer->addUserMetadata("name0", "value0"); |
| writer->addUserMetadata("name1", "value1"); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| EXPECT_EQ(2000, reader->getNumberOfRows()); |
| |
| batch = rowReader->createRowBatch(2048); |
| EXPECT_TRUE(rowReader->next(*batch)); |
| EXPECT_EQ(2000, batch->numElements); |
| EXPECT_FALSE(rowReader->next(*batch)); |
| |
| std::list<std::string> keys = reader->getMetadataKeys(); |
| EXPECT_EQ(keys.size(), 2); |
| std::list<std::string>::const_iterator itr = keys.begin(); |
| EXPECT_EQ(*itr, "name0"); |
| EXPECT_EQ(reader->getMetadataValue(*itr), "value0"); |
| itr++; |
| EXPECT_EQ(*itr, "name1"); |
| EXPECT_EQ(reader->getMetadataValue(*itr), "value1"); |
| |
| for (uint64_t i = 0; i < 2000; ++i) { |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| longBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[0]); |
| EXPECT_EQ(i, longBatch->data[i]); |
| } |
| } |
| |
| TEST_P(WriterTest, writeIntFileMultipleStripes) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:int>")); |
| |
| uint64_t stripeSize = 1024; // 1K |
| uint64_t compressionBlockSize = 1024; // 1k |
| |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion); |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(65535); |
| StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| LongVectorBatch* longBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[0]); |
| |
| for (uint64_t j = 0; j < 10; ++j) { |
| for (uint64_t i = 0; i < 65535; ++i) { |
| longBatch->data[i] = static_cast<int64_t>(i); |
| } |
| structBatch->numElements = 65535; |
| longBatch->numElements = 65535; |
| |
| writer->add(*batch); |
| } |
| |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| EXPECT_EQ(655350, reader->getNumberOfRows()); |
| |
| batch = rowReader->createRowBatch(65535); |
| for (uint64_t j = 0; j < 10; ++j) { |
| EXPECT_TRUE(rowReader->next(*batch)); |
| EXPECT_EQ(65535, batch->numElements); |
| |
| for (uint64_t i = 0; i < 65535; ++i) { |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| longBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[0]); |
| EXPECT_EQ(i, longBatch->data[i]); |
| } |
| } |
| EXPECT_FALSE(rowReader->next(*batch)); |
| } |
| |
| TEST_P(WriterTest, writeStringAndBinaryColumn) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:string,col2:binary>")); |
| |
| uint64_t stripeSize = 1024; // 1K |
| uint64_t compressionBlockSize = 1024; // 1k |
| |
| char dataBuffer[327675]; |
| uint64_t offset = 0; |
| |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion); |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(65535); |
| StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| StringVectorBatch* strBatch = dynamic_cast<StringVectorBatch*>(structBatch->fields[0]); |
| StringVectorBatch* binBatch = dynamic_cast<StringVectorBatch*>(structBatch->fields[1]); |
| |
| for (uint64_t i = 0; i < 65535; ++i) { |
| std::ostringstream os; |
| os << i; |
| strBatch->data[i] = dataBuffer + offset; |
| strBatch->length[i] = static_cast<int64_t>(os.str().size()); |
| binBatch->data[i] = dataBuffer + offset; |
| binBatch->length[i] = static_cast<int64_t>(os.str().size()); |
| memcpy(dataBuffer + offset, os.str().c_str(), os.str().size()); |
| offset += os.str().size(); |
| } |
| |
| structBatch->numElements = 65535; |
| strBatch->numElements = 65535; |
| binBatch->numElements = 65535; |
| |
| writer->add(*batch); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| EXPECT_EQ(65535, reader->getNumberOfRows()); |
| |
| batch = rowReader->createRowBatch(65535); |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| EXPECT_EQ(65535, batch->numElements); |
| |
| for (uint64_t i = 0; i < 65535; ++i) { |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| strBatch = dynamic_cast<StringVectorBatch*>(structBatch->fields[0]); |
| binBatch = dynamic_cast<StringVectorBatch*>(structBatch->fields[1]); |
| std::string str(strBatch->data[i], static_cast<size_t>(strBatch->length[i])); |
| std::string bin(binBatch->data[i], static_cast<size_t>(binBatch->length[i])); |
| EXPECT_EQ(i, static_cast<uint64_t>(atoi(str.c_str()))); |
| EXPECT_EQ(i, static_cast<uint64_t>(atoi(bin.c_str()))); |
| } |
| |
| EXPECT_FALSE(rowReader->next(*batch)); |
| } |
| |
| TEST_P(WriterTest, writeFloatAndDoubleColumn) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:double,col2:float>")); |
| |
| uint64_t stripeSize = 16 * 1024; |
| uint64_t compressionBlockSize = 1024; |
| uint64_t rowCount = 655350; |
| |
| std::vector<double> data(rowCount); |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| data[i] = 100000 * (std::rand() * 1.0 / RAND_MAX); |
| } |
| |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion); |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); |
| StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| DoubleVectorBatch* doubleBatch = dynamic_cast<DoubleVectorBatch*>(structBatch->fields[0]); |
| DoubleVectorBatch* floatBatch = dynamic_cast<DoubleVectorBatch*>(structBatch->fields[1]); |
| |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| doubleBatch->data[i] = data[i]; |
| floatBatch->data[i] = data[i]; |
| } |
| |
| structBatch->numElements = rowCount; |
| doubleBatch->numElements = rowCount; |
| floatBatch->numElements = rowCount; |
| |
| writer->add(*batch); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| EXPECT_EQ(rowCount, reader->getNumberOfRows()); |
| |
| batch = rowReader->createRowBatch(rowCount); |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| EXPECT_EQ(rowCount, batch->numElements); |
| |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| doubleBatch = dynamic_cast<DoubleVectorBatch*>(structBatch->fields[0]); |
| floatBatch = dynamic_cast<DoubleVectorBatch*>(structBatch->fields[1]); |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| EXPECT_TRUE(std::abs(data[i] - doubleBatch->data[i]) < 0.000001); |
| EXPECT_TRUE(std::abs(static_cast<float>(data[i]) - static_cast<float>(floatBatch->data[i])) < |
| 0.000001f); |
| } |
| EXPECT_FALSE(rowReader->next(*batch)); |
| } |
| |
| TEST_P(WriterTest, writeShortIntLong) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type( |
| Type::buildTypeFromString("struct<col1:smallint,col2:int,col3:bigint>")); |
| |
| uint64_t stripeSize = 16 * 1024; |
| uint64_t compressionBlockSize = 1024; |
| uint64_t rowCount = 65535; |
| |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion); |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); |
| StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| LongVectorBatch* smallIntBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[0]); |
| LongVectorBatch* intBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[1]); |
| LongVectorBatch* bigIntBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[2]); |
| |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| smallIntBatch->data[i] = static_cast<int16_t>(i); |
| intBatch->data[i] = static_cast<int32_t>(i); |
| bigIntBatch->data[i] = static_cast<int64_t>(i); |
| } |
| structBatch->numElements = rowCount; |
| smallIntBatch->numElements = rowCount; |
| intBatch->numElements = rowCount; |
| bigIntBatch->numElements = rowCount; |
| |
| writer->add(*batch); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| EXPECT_EQ(rowCount, reader->getNumberOfRows()); |
| |
| batch = rowReader->createRowBatch(rowCount); |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| smallIntBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[0]); |
| intBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[1]); |
| bigIntBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[2]); |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| EXPECT_EQ(static_cast<int16_t>(i), smallIntBatch->data[i]); |
| EXPECT_EQ(static_cast<int32_t>(i), intBatch->data[i]); |
| EXPECT_EQ(static_cast<int64_t>(i), bigIntBatch->data[i]); |
| } |
| } |
| |
| TEST_P(WriterTest, writeTinyint) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:tinyint>")); |
| |
| uint64_t stripeSize = 16 * 1024; |
| uint64_t compressionBlockSize = 1024; |
| uint64_t rowCount = 65535; |
| |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion); |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); |
| StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[0]); |
| |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| byteBatch->data[i] = static_cast<int8_t>(i); |
| } |
| structBatch->numElements = rowCount; |
| byteBatch->numElements = rowCount; |
| |
| writer->add(*batch); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| EXPECT_EQ(rowCount, reader->getNumberOfRows()); |
| |
| batch = rowReader->createRowBatch(rowCount); |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| byteBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[0]); |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| EXPECT_EQ(static_cast<int8_t>(i), static_cast<int8_t>(byteBatch->data[i])); |
| } |
| } |
| |
| TEST_P(WriterTest, writeBooleanColumn) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:boolean>")); |
| |
| uint64_t stripeSize = 16 * 1024; |
| uint64_t compressionBlockSize = 1024; |
| uint64_t rowCount = 65535; |
| |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion); |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); |
| StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[0]); |
| |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| byteBatch->data[i] = (i % 3) == 0 ? 1 : 0; |
| } |
| structBatch->numElements = rowCount; |
| byteBatch->numElements = rowCount; |
| |
| writer->add(*batch); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| EXPECT_EQ(rowCount, reader->getNumberOfRows()); |
| |
| batch = rowReader->createRowBatch(rowCount); |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| byteBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[0]); |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| EXPECT_EQ((i % 3) == 0 ? 1 : 0, byteBatch->data[i]); |
| } |
| } |
| |
| TEST_P(WriterTest, writeDate) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:date>")); |
| |
| uint64_t stripeSize = 16 * 1024; |
| uint64_t compressionBlockSize = 1024; |
| uint64_t rowCount = 1024; |
| |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion); |
| |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); |
| StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| LongVectorBatch* longBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[0]); |
| |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| longBatch->data[i] = static_cast<int32_t>(i); |
| } |
| structBatch->numElements = rowCount; |
| longBatch->numElements = rowCount; |
| |
| writer->add(*batch); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| EXPECT_EQ(rowCount, reader->getNumberOfRows()); |
| |
| batch = rowReader->createRowBatch(rowCount); |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| longBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[0]); |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| EXPECT_EQ(static_cast<int32_t>(i), longBatch->data[i]); |
| } |
| } |
| |
| TEST_P(WriterTest, writeTimestamp) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:timestamp>")); |
| |
| uint64_t stripeSize = 16 * 1024; |
| uint64_t compressionBlockSize = 1024; |
| uint64_t rowCount = 102400; |
| |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion); |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); |
| StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| TimestampVectorBatch* tsBatch = dynamic_cast<TimestampVectorBatch*>(structBatch->fields[0]); |
| |
| std::vector<std::time_t> times(rowCount); |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| time_t currTime = -14210715; // 1969-07-20 12:34:45 |
| times[i] = static_cast<int64_t>(currTime) + static_cast<int64_t>(i * 3660); |
| tsBatch->data[i] = times[i]; |
| tsBatch->nanoseconds[i] = static_cast<int64_t>(i * 1000); |
| } |
| structBatch->numElements = rowCount; |
| tsBatch->numElements = rowCount; |
| |
| writer->add(*batch); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| EXPECT_EQ(rowCount, reader->getNumberOfRows()); |
| |
| batch = rowReader->createRowBatch(rowCount); |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| tsBatch = dynamic_cast<TimestampVectorBatch*>(structBatch->fields[0]); |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| EXPECT_EQ(times[i], tsBatch->data[i]); |
| EXPECT_EQ(i * 1000, tsBatch->nanoseconds[i]); |
| } |
| } |
| |
| TEST_P(WriterTest, writeNegativeTimestamp) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type(Type::buildTypeFromString("struct<a:timestamp>")); |
| auto writer = createWriter(16 * 1024 * 1024, 64 * 1024, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion); |
| uint64_t batchCount = 5; |
| auto batch = writer->createRowBatch(batchCount * 2); |
| auto structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| auto tsBatch = dynamic_cast<TimestampVectorBatch*>(structBatch->fields[0]); |
| structBatch->numElements = batchCount; |
| tsBatch->numElements = batchCount; |
| const int64_t seconds[] = {-2, -1, 0, 1, 2}; |
| |
| // write 1st batch with nanosecond <= 999999 |
| for (uint64_t i = 0; i < batchCount; ++i) { |
| tsBatch->data[i] = seconds[i]; |
| tsBatch->nanoseconds[i] = 999999; |
| } |
| writer->add(*batch); |
| |
| // write 2nd batch with nanosecond > 999999 |
| for (uint64_t i = 0; i < batchCount; ++i) { |
| tsBatch->data[i] = seconds[i]; |
| tsBatch->nanoseconds[i] = 1000000; |
| } |
| writer->add(*batch); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| auto reader = createReader(pool, std::move(inStream)); |
| auto rowReader = createRowReader(reader.get()); |
| batch = rowReader->createRowBatch(batchCount); |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| tsBatch = dynamic_cast<TimestampVectorBatch*>(structBatch->fields[0]); |
| |
| // read 1st batch with nanosecond <= 999999 |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| for (uint64_t i = 0; i < batchCount; ++i) { |
| EXPECT_EQ(seconds[i], tsBatch->data[i]); |
| EXPECT_EQ(999999, tsBatch->nanoseconds[i]); |
| } |
| |
| // read 2nd batch with nanosecond > 999999 |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| for (uint64_t i = 0; i < batchCount; ++i) { |
| if (seconds[i] == -1) { |
| // reproduce the JDK bug of java.sql.Timestamp. |
| // make sure the C++ ORC writer has consistent effect. |
| EXPECT_EQ(0, tsBatch->data[i]); |
| } else { |
| EXPECT_EQ(seconds[i], tsBatch->data[i]); |
| } |
| EXPECT_EQ(1000000, tsBatch->nanoseconds[i]); |
| } |
| } |
| |
| // TODO: Disable the test below for Windows for following reasons: |
| // First, the timezone name provided by Windows cannot be used as |
| // a parameter to the getTimezoneByName function. Secondly, the |
| // function of setting timezone in Windows is different from Linux. |
| #ifndef _MSC_VER |
| void testWriteTimestampWithTimezone(FileVersion fileVersion, const char* writerTimezone, |
| const char* readerTimezone, const std::string& tsStr, |
| int isDst = 0) { |
| char* tzBk = getenv("TZ"); // backup TZ env |
| |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:timestamp>")); |
| |
| uint64_t stripeSize = 16 * 1024; |
| uint64_t compressionBlockSize = 1024; |
| uint64_t rowCount = 1; |
| |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion, 0, writerTimezone); |
| auto batch = writer->createRowBatch(rowCount); |
| auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch); |
| auto& tsBatch = dynamic_cast<TimestampVectorBatch&>(*structBatch.fields[0]); |
| |
| // write timestamp in the writer timezone |
| setenv("TZ", writerTimezone, 1); |
| tzset(); |
| struct tm tm; |
| memset(&tm, 0, sizeof(struct tm)); |
| strptime(tsStr.c_str(), "%Y-%m-%d %H:%M:%S", &tm); |
| // mktime() does depend on external hint for daylight saving time |
| tm.tm_isdst = isDst; |
| tsBatch.data[0] = mktime(&tm); |
| tsBatch.nanoseconds[0] = 0; |
| structBatch.numElements = rowCount; |
| tsBatch.numElements = rowCount; |
| writer->add(*batch); |
| writer->close(); |
| |
| // read timestamp from the reader timezone |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get(), readerTimezone); |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| |
| // verify we get same wall clock in reader timezone |
| setenv("TZ", readerTimezone, 1); |
| tzset(); |
| memset(&tm, 0, sizeof(struct tm)); |
| time_t ttime = tsBatch.data[0]; |
| localtime_r(&ttime, &tm); |
| char buf[20]; |
| strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", &tm); |
| EXPECT_TRUE(strncmp(buf, tsStr.c_str(), tsStr.size()) == 0); |
| |
| // restore TZ env |
| if (tzBk) { |
| setenv("TZ", tzBk, 1); |
| tzset(); |
| } else { |
| unsetenv("TZ"); |
| tzset(); |
| } |
| } |
| |
| TEST_P(WriterTest, writeTimestampWithTimezone) { |
| const int IS_DST = 1, NOT_DST = 0; |
| testWriteTimestampWithTimezone(fileVersion, "GMT", "GMT", "2001-11-12 18:31:01"); |
| // behavior for Apache Orc (writer & reader timezone can change) |
| testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles", "America/Los_Angeles", |
| "2001-11-12 18:31:01"); |
| testWriteTimestampWithTimezone(fileVersion, "Asia/Shanghai", "Asia/Shanghai", |
| "2001-11-12 18:31:01"); |
| testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles", "Asia/Shanghai", |
| "2001-11-12 18:31:01"); |
| testWriteTimestampWithTimezone(fileVersion, "Asia/Shanghai", "America/Los_Angeles", |
| "2001-11-12 18:31:01"); |
| testWriteTimestampWithTimezone(fileVersion, "GMT", "Asia/Shanghai", "2001-11-12 18:31:01"); |
| testWriteTimestampWithTimezone(fileVersion, "Asia/Shanghai", "GMT", "2001-11-12 18:31:01"); |
| testWriteTimestampWithTimezone(fileVersion, "Asia/Shanghai", "America/Los_Angeles", |
| "2018-01-01 23:59:59"); |
| // daylight saving started at 2012-03-11 02:00:00 in Los Angeles |
| testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles", "Asia/Shanghai", |
| "2012-03-11 01:59:59", NOT_DST); |
| testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles", "Asia/Shanghai", |
| "2012-03-11 03:00:00", IS_DST); |
| testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles", "Asia/Shanghai", |
| "2012-03-11 03:00:01", IS_DST); |
| // daylight saving ended at 2012-11-04 02:00:00 in Los Angeles |
| testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles", "Asia/Shanghai", |
| "2012-11-04 01:59:59", IS_DST); |
| testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles", "Asia/Shanghai", |
| "2012-11-04 02:00:00", NOT_DST); |
| testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles", "Asia/Shanghai", |
| "2012-11-04 02:00:01", NOT_DST); |
| // other daylight saving time |
| testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles", "Asia/Shanghai", |
| "2014-06-06 12:34:56", IS_DST); |
| testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles", "America/Los_Angeles", |
| "2014-06-06 12:34:56", IS_DST); |
| } |
| #endif |
| |
| TEST_P(WriterTest, writeTimestampInstant) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type( |
| Type::buildTypeFromString("struct<col1:timestamp with local time zone>")); |
| |
| uint64_t stripeSize = 16 * 1024; |
| uint64_t compressionBlockSize = 1024; |
| uint64_t rowCount = 102400; |
| |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion); |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); |
| StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| TimestampVectorBatch* tsBatch = dynamic_cast<TimestampVectorBatch*>(structBatch->fields[0]); |
| |
| std::vector<std::time_t> times(rowCount); |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| time_t currTime = -14210715; // 1969-07-20 12:34:45 |
| times[i] = static_cast<int64_t>(currTime) + static_cast<int64_t>(i * 3660); |
| tsBatch->data[i] = times[i]; |
| tsBatch->nanoseconds[i] = static_cast<int64_t>(i * 1000); |
| } |
| structBatch->numElements = rowCount; |
| tsBatch->numElements = rowCount; |
| |
| writer->add(*batch); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| EXPECT_EQ(rowCount, reader->getNumberOfRows()); |
| |
| batch = rowReader->createRowBatch(rowCount); |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| tsBatch = dynamic_cast<TimestampVectorBatch*>(structBatch->fields[0]); |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| EXPECT_EQ(times[i], tsBatch->data[i]); |
| EXPECT_EQ(i * 1000, tsBatch->nanoseconds[i]); |
| } |
| } |
| |
| TEST_P(WriterTest, writeCharAndVarcharColumn) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:char(3),col2:varchar(4)>")); |
| |
| uint64_t stripeSize = 1024; |
| uint64_t compressionBlockSize = 1024; |
| uint64_t rowCount = 65535; |
| |
| char dataBuffer[327675]; |
| uint64_t offset = 0; |
| |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion); |
| |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); |
| StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| StringVectorBatch* charBatch = dynamic_cast<StringVectorBatch*>(structBatch->fields[0]); |
| StringVectorBatch* varcharBatch = dynamic_cast<StringVectorBatch*>(structBatch->fields[1]); |
| |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| std::ostringstream os; |
| os << i; |
| charBatch->data[i] = dataBuffer + offset; |
| charBatch->length[i] = static_cast<int64_t>(os.str().size()); |
| |
| varcharBatch->data[i] = charBatch->data[i]; |
| varcharBatch->length[i] = charBatch->length[i]; |
| |
| memcpy(dataBuffer + offset, os.str().c_str(), os.str().size()); |
| offset += os.str().size(); |
| } |
| |
| structBatch->numElements = rowCount; |
| charBatch->numElements = rowCount; |
| varcharBatch->numElements = rowCount; |
| |
| writer->add(*batch); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| EXPECT_EQ(rowCount, reader->getNumberOfRows()); |
| |
| batch = rowReader->createRowBatch(rowCount); |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| EXPECT_EQ(rowCount, batch->numElements); |
| |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| charBatch = dynamic_cast<StringVectorBatch*>(structBatch->fields[0]); |
| varcharBatch = dynamic_cast<StringVectorBatch*>(structBatch->fields[1]); |
| |
| EXPECT_EQ(3, charBatch->length[i]); |
| EXPECT_FALSE(varcharBatch->length[i] > 4); |
| |
| // test char data |
| std::string charsRead(charBatch->data[i], static_cast<size_t>(charBatch->length[i])); |
| |
| std::ostringstream os; |
| os << i; |
| std::string charsExpected = os.str().substr(0, 3); |
| while (charsExpected.length() < 3) { |
| charsExpected += ' '; |
| } |
| EXPECT_EQ(charsExpected, charsRead); |
| |
| // test varchar data |
| std::string varcharRead(varcharBatch->data[i], static_cast<size_t>(varcharBatch->length[i])); |
| std::string varcharExpected = os.str().substr(0, 4); |
| EXPECT_EQ(varcharRead, varcharExpected); |
| } |
| |
| EXPECT_FALSE(rowReader->next(*batch)); |
| } |
| |
| TEST_P(WriterTest, writeDecimal64Column) { |
| const uint64_t maxPrecision = 18; |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:decimal(18,5)>")); |
| |
| uint64_t stripeSize = 16 * 1024; // 16K |
| uint64_t compressionBlockSize = 1024; // 1k |
| uint64_t rowCount = 1024; |
| |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion); |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); |
| StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| Decimal64VectorBatch* decBatch = dynamic_cast<Decimal64VectorBatch*>(structBatch->fields[0]); |
| |
| // write positive decimals |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| decBatch->values[i] = static_cast<int64_t>(i + 10000); |
| } |
| structBatch->numElements = decBatch->numElements = rowCount; |
| writer->add(*batch); |
| |
| // write negative decimals |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| decBatch->values[i] = static_cast<int64_t>(i - 10000); |
| } |
| structBatch->numElements = decBatch->numElements = rowCount; |
| writer->add(*batch); |
| |
| // write all precision decimals |
| int64_t dec; |
| for (uint64_t i = dec = 0; i < maxPrecision; ++i) { |
| dec = dec * 10 + 9; |
| decBatch->values[i] = dec; |
| decBatch->values[i + maxPrecision] = -dec; |
| } |
| structBatch->numElements = decBatch->numElements = 2 * maxPrecision; |
| writer->add(*batch); |
| |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| EXPECT_EQ((rowCount + maxPrecision) * 2, reader->getNumberOfRows()); |
| |
| // test reading positive decimals |
| batch = rowReader->createRowBatch(rowCount); |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| decBatch = dynamic_cast<Decimal64VectorBatch*>(structBatch->fields[0]); |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| EXPECT_EQ(static_cast<int64_t>(i + 10000), decBatch->values[i]); |
| } |
| |
| // test reading negative decimals |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| decBatch = dynamic_cast<Decimal64VectorBatch*>(structBatch->fields[0]); |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| EXPECT_EQ(static_cast<int64_t>(i - 10000), decBatch->values[i]); |
| } |
| |
| // test reading all precision decimals |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| decBatch = dynamic_cast<Decimal64VectorBatch*>(structBatch->fields[0]); |
| for (uint64_t i = dec = 0; i < maxPrecision; ++i) { |
| dec = dec * 10 + 9; |
| EXPECT_EQ(dec, decBatch->values[i]); |
| EXPECT_EQ(-dec, decBatch->values[i + maxPrecision]); |
| } |
| } |
| |
| TEST_P(WriterTest, writeDecimal128Column) { |
| const uint64_t maxPrecision = 38; |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:decimal(38,10)>")); |
| |
| uint64_t stripeSize = 16 * 1024; |
| uint64_t compressionBlockSize = 1024; |
| uint64_t rowCount = 1024; |
| |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion); |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); |
| StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| Decimal128VectorBatch* decBatch = dynamic_cast<Decimal128VectorBatch*>(structBatch->fields[0]); |
| |
| // write positive decimals |
| std::string base = "1" + std::string(1, '0'); |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| std::ostringstream os; |
| os << i; |
| decBatch->values[i] = Int128(base + os.str()); |
| } |
| structBatch->numElements = decBatch->numElements = rowCount; |
| writer->add(*batch); |
| |
| // write negative decimals |
| std::string nbase = "-" + base; |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| std::ostringstream os; |
| os << i; |
| decBatch->values[i] = Int128(nbase + os.str()); |
| } |
| structBatch->numElements = rowCount; |
| decBatch->numElements = rowCount; |
| writer->add(*batch); |
| |
| // write all precision decimals |
| for (uint64_t i = 0; i < maxPrecision; ++i) { |
| std::string expected = std::string(i + 1, '9'); |
| decBatch->values[i] = Int128(expected); |
| decBatch->values[i + maxPrecision] = Int128("-" + expected); |
| } |
| structBatch->numElements = decBatch->numElements = 2 * maxPrecision; |
| writer->add(*batch); |
| |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| EXPECT_EQ((rowCount + maxPrecision) * 2, reader->getNumberOfRows()); |
| |
| // test reading positive decimals |
| batch = rowReader->createRowBatch(rowCount); |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| decBatch = dynamic_cast<Decimal128VectorBatch*>(structBatch->fields[0]); |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| std::ostringstream os; |
| os << i; |
| EXPECT_EQ(base + os.str(), decBatch->values[i].toString()); |
| } |
| |
| // test reading negative decimals |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| decBatch = dynamic_cast<Decimal128VectorBatch*>(structBatch->fields[0]); |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| std::ostringstream os; |
| os << i; |
| EXPECT_EQ(nbase + os.str(), decBatch->values[i].toString()); |
| } |
| |
| // test reading all precision decimals |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| decBatch = dynamic_cast<Decimal128VectorBatch*>(structBatch->fields[0]); |
| for (uint64_t i = 0; i < maxPrecision; ++i) { |
| std::string expected = std::string(i + 1, '9'); |
| EXPECT_EQ(expected, decBatch->values[i].toString()); |
| EXPECT_EQ("-" + expected, decBatch->values[i + maxPrecision].toString()); |
| } |
| } |
| |
| TEST_P(WriterTest, writeListColumn) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| |
| std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:array<int>>")); |
| |
| uint64_t stripeSize = 1024 * 1024; |
| uint64_t compressionBlockSize = 64 * 1024; |
| uint64_t rowCount = 1024; |
| uint64_t maxListLength = 10; |
| uint64_t offset = 0; |
| |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion); |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount * maxListLength); |
| |
| StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| ListVectorBatch* listBatch = dynamic_cast<ListVectorBatch*>(structBatch->fields[0]); |
| LongVectorBatch* intBatch = dynamic_cast<LongVectorBatch*>(listBatch->elements.get()); |
| int64_t* data = intBatch->data.data(); |
| int64_t* offsets = listBatch->offsets.data(); |
| |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| offsets[i] = static_cast<int64_t>(offset); |
| for (uint64_t length = i % maxListLength + 1; length != 0; --length) { |
| data[offset++] = static_cast<int64_t>(i); |
| } |
| } |
| offsets[rowCount] = static_cast<int64_t>(offset); |
| |
| structBatch->numElements = rowCount; |
| listBatch->numElements = rowCount; |
| |
| writer->add(*batch); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| EXPECT_EQ(rowCount, reader->getNumberOfRows()); |
| |
| batch = rowReader->createRowBatch(rowCount * maxListLength); |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| listBatch = dynamic_cast<ListVectorBatch*>(structBatch->fields[0]); |
| intBatch = dynamic_cast<LongVectorBatch*>(listBatch->elements.get()); |
| data = intBatch->data.data(); |
| offsets = listBatch->offsets.data(); |
| |
| EXPECT_EQ(rowCount, listBatch->numElements); |
| EXPECT_EQ(offset, intBatch->numElements); |
| |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| uint64_t length = i % maxListLength + 1; |
| for (int64_t j = 0; j != length; ++j) { |
| EXPECT_EQ(static_cast<int64_t>(i), data[offsets[i] + j]); |
| } |
| } |
| } |
| |
| TEST_P(WriterTest, writeMapColumn) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:map<string,int>>")); |
| |
| uint64_t stripeSize = 16 * 1024; |
| uint64_t compressionBlockSize = 1024; |
| uint64_t rowCount = 1024, maxListLength = 10, offset = 0; |
| |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion); |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount * maxListLength); |
| StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| MapVectorBatch* mapBatch = dynamic_cast<MapVectorBatch*>(structBatch->fields[0]); |
| StringVectorBatch* keyBatch = dynamic_cast<StringVectorBatch*>(mapBatch->keys.get()); |
| LongVectorBatch* elemBatch = dynamic_cast<LongVectorBatch*>(mapBatch->elements.get()); |
| |
| char dataBuffer[327675]; // 300k |
| uint64_t strOffset = 0; |
| |
| int64_t* offsets = mapBatch->offsets.data(); |
| char** keyData = keyBatch->data.data(); |
| int64_t* keyLength = keyBatch->length.data(); |
| int64_t* elemData = elemBatch->data.data(); |
| |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| offsets[i] = static_cast<int64_t>(offset); |
| for (uint64_t j = 0; j != i % maxListLength + 1; ++j) { |
| std::ostringstream os; |
| os << (i + j); |
| memcpy(dataBuffer + strOffset, os.str().c_str(), os.str().size()); |
| keyData[offset] = dataBuffer + strOffset; |
| |
| keyLength[offset] = static_cast<int64_t>(os.str().size()); |
| elemData[offset++] = static_cast<int64_t>(i); |
| |
| strOffset += os.str().size(); |
| } |
| } |
| offsets[rowCount] = static_cast<int64_t>(offset); |
| |
| structBatch->numElements = rowCount; |
| mapBatch->numElements = rowCount; |
| |
| writer->add(*batch); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| EXPECT_EQ(rowCount, reader->getNumberOfRows()); |
| |
| batch = rowReader->createRowBatch(rowCount * maxListLength); |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| mapBatch = dynamic_cast<MapVectorBatch*>(structBatch->fields[0]); |
| keyBatch = dynamic_cast<StringVectorBatch*>(mapBatch->keys.get()); |
| elemBatch = dynamic_cast<LongVectorBatch*>(mapBatch->elements.get()); |
| offsets = mapBatch->offsets.data(); |
| keyData = keyBatch->data.data(); |
| keyLength = keyBatch->length.data(); |
| elemData = elemBatch->data.data(); |
| |
| EXPECT_EQ(rowCount, mapBatch->numElements); |
| EXPECT_EQ(offset, keyBatch->numElements); |
| EXPECT_EQ(offset, elemBatch->numElements); |
| |
| for (uint64_t i = 0; i != rowCount; ++i) { |
| for (int64_t j = 0; j != i % maxListLength + 1; ++j) { |
| std::ostringstream os; |
| os << i + static_cast<uint64_t>(j); |
| uint64_t lenRead = static_cast<uint64_t>(keyLength[offsets[i] + j]); |
| EXPECT_EQ(os.str(), std::string(keyData[offsets[i] + j], lenRead)); |
| EXPECT_EQ(static_cast<int64_t>(i), elemData[offsets[i] + j]); |
| } |
| } |
| } |
| |
| TEST_P(WriterTest, writeUnionColumn) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type( |
| Type::buildTypeFromString("struct<col1:uniontype<int,double,boolean>>")); |
| |
| uint64_t stripeSize = 16 * 1024; |
| uint64_t compressionBlockSize = 1024; |
| uint64_t rowCount = 3333; |
| |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion); |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); |
| StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| UnionVectorBatch* unionBatch = dynamic_cast<UnionVectorBatch*>(structBatch->fields[0]); |
| unsigned char* tags = unionBatch->tags.data(); |
| uint64_t* offsets = unionBatch->offsets.data(); |
| |
| LongVectorBatch* intBatch = dynamic_cast<LongVectorBatch*>(unionBatch->children[0]); |
| DoubleVectorBatch* doubleBatch = dynamic_cast<DoubleVectorBatch*>(unionBatch->children[1]); |
| LongVectorBatch* boolBatch = dynamic_cast<LongVectorBatch*>(unionBatch->children[2]); |
| int64_t* intData = intBatch->data.data(); |
| double* doubleData = doubleBatch->data.data(); |
| int64_t* boolData = boolBatch->data.data(); |
| |
| uint64_t intOffset = 0, doubleOffset = 0, boolOffset = 0, tag = 0; |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| tags[i] = static_cast<unsigned char>(tag); |
| switch (tag) { |
| case 0: |
| offsets[i] = intOffset; |
| intData[intOffset++] = static_cast<int64_t>(i); |
| break; |
| case 1: |
| offsets[i] = doubleOffset; |
| doubleData[doubleOffset++] = static_cast<double>(i) + 0.5; |
| break; |
| case 2: |
| offsets[i] = boolOffset; |
| boolData[boolOffset++] = (i % 2 == 0) ? 1 : 0; |
| break; |
| } |
| tag = (tag + 1) % 3; |
| } |
| |
| structBatch->numElements = rowCount; |
| unionBatch->numElements = rowCount; |
| |
| writer->add(*batch); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| EXPECT_EQ(rowCount, reader->getNumberOfRows()); |
| |
| batch = rowReader->createRowBatch(rowCount); |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| unionBatch = dynamic_cast<UnionVectorBatch*>(structBatch->fields[0]); |
| tags = unionBatch->tags.data(); |
| offsets = unionBatch->offsets.data(); |
| |
| intBatch = dynamic_cast<LongVectorBatch*>(unionBatch->children[0]); |
| doubleBatch = dynamic_cast<DoubleVectorBatch*>(unionBatch->children[1]); |
| boolBatch = dynamic_cast<LongVectorBatch*>(unionBatch->children[2]); |
| intData = intBatch->data.data(); |
| doubleData = doubleBatch->data.data(); |
| boolData = boolBatch->data.data(); |
| |
| EXPECT_EQ(rowCount, unionBatch->numElements); |
| EXPECT_EQ(rowCount / 3, intBatch->numElements); |
| EXPECT_EQ(rowCount / 3, doubleBatch->numElements); |
| EXPECT_EQ(rowCount / 3, boolBatch->numElements); |
| |
| uint64_t offset; |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| tag = tags[i]; |
| offset = offsets[i]; |
| |
| switch (tag) { |
| case 0: |
| EXPECT_EQ(i, intData[offset]); |
| break; |
| case 1: |
| EXPECT_TRUE(std::abs(static_cast<double>(i) + 0.5 - doubleData[offset]) < 0.000001); |
| break; |
| case 2: |
| EXPECT_EQ(i % 2 == 0 ? 1 : 0, boolData[offset]); |
| break; |
| } |
| } |
| } |
| |
| TEST_P(WriterTest, writeUTF8CharAndVarcharColumn) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:char(2),col2:varchar(2)>")); |
| |
| uint64_t stripeSize = 1024; |
| uint64_t compressionBlockSize = 1024; |
| uint64_t rowCount = 3; |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion); |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); |
| StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| StringVectorBatch* charBatch = dynamic_cast<StringVectorBatch*>(structBatch->fields[0]); |
| StringVectorBatch* varcharBatch = dynamic_cast<StringVectorBatch*>(structBatch->fields[1]); |
| std::vector<std::vector<char>> strs; |
| |
| // input character is 'Ã ' (0xC3, 0xA0) |
| // in total 3 rows, each has 1, 2, and 3 'Ã ' respectively |
| std::vector<char> vec; |
| for (uint64_t i = 0; i != rowCount; ++i) { |
| vec.push_back('\xC3'); |
| vec.push_back('\xA0'); |
| strs.push_back(vec); |
| charBatch->data[i] = varcharBatch->data[i] = strs.back().data(); |
| charBatch->length[i] = varcharBatch->length[i] = static_cast<int64_t>(strs.back().size()); |
| } |
| |
| structBatch->numElements = rowCount; |
| charBatch->numElements = rowCount; |
| varcharBatch->numElements = rowCount; |
| |
| writer->add(*batch); |
| writer->close(); |
| |
| // read and verify data |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| EXPECT_EQ(rowCount, reader->getNumberOfRows()); |
| |
| batch = rowReader->createRowBatch(rowCount); |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| charBatch = dynamic_cast<StringVectorBatch*>(structBatch->fields[0]); |
| varcharBatch = dynamic_cast<StringVectorBatch*>(structBatch->fields[1]); |
| |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| EXPECT_EQ(rowCount, batch->numElements); |
| |
| char expectedPadded[3] = {'\xC3', '\xA0', ' '}; |
| char expectedOneChar[2] = {'\xC3', '\xA0'}; |
| char expectedTwoChars[4] = {'\xC3', '\xA0', '\xC3', '\xA0'}; |
| |
| EXPECT_EQ(3, charBatch->length[0]); |
| EXPECT_EQ(4, charBatch->length[1]); |
| EXPECT_EQ(4, charBatch->length[2]); |
| EXPECT_TRUE(memcmp(charBatch->data[0], expectedPadded, 3) == 0); |
| EXPECT_TRUE(memcmp(charBatch->data[1], expectedTwoChars, 4) == 0); |
| EXPECT_TRUE(memcmp(charBatch->data[2], expectedTwoChars, 4) == 0); |
| |
| EXPECT_EQ(2, varcharBatch->length[0]); |
| EXPECT_EQ(4, varcharBatch->length[1]); |
| EXPECT_EQ(4, varcharBatch->length[2]); |
| EXPECT_TRUE(memcmp(varcharBatch->data[0], expectedOneChar, 2) == 0); |
| EXPECT_TRUE(memcmp(varcharBatch->data[1], expectedTwoChars, 4) == 0); |
| EXPECT_TRUE(memcmp(varcharBatch->data[2], expectedTwoChars, 4) == 0); |
| |
| EXPECT_FALSE(rowReader->next(*batch)); |
| } |
| |
| TEST_P(WriterTest, testWriteListColumnWithNull) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:array<tinyint>>")); |
| |
| uint64_t stripeSize = 1024; |
| uint64_t compressionBlockSize = 1024; |
| |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion); |
| |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(4); |
| StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| ListVectorBatch* listBatch = dynamic_cast<ListVectorBatch*>(structBatch->fields[0]); |
| LongVectorBatch* intBatch = dynamic_cast<LongVectorBatch*>(listBatch->elements.get()); |
| |
| // test data looks like below - |
| // {[1, 2]} |
| // null |
| // {[3, 4]} |
| // {[5, 6]} |
| int64_t* offsets = listBatch->offsets.data(); |
| offsets[0] = 0; |
| offsets[1] = 2; |
| offsets[2] = 2; |
| offsets[3] = 4; |
| offsets[4] = 6; |
| |
| intBatch->resize(6); |
| for (uint64_t i = 0; i < 6; ++i) { |
| intBatch->notNull[i] = 1; |
| } |
| |
| int64_t* data = intBatch->data.data(); |
| for (int8_t i = 1; i < 7; ++i) { |
| data[i - 1] = i; |
| } |
| |
| structBatch->numElements = 4; |
| |
| listBatch->notNull[1] = 0; |
| listBatch->hasNulls = true; |
| listBatch->numElements = 4; |
| |
| writer->add(*batch); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| EXPECT_EQ(4, reader->getNumberOfRows()); |
| |
| batch = rowReader->createRowBatch(4 * 2); |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| listBatch = dynamic_cast<ListVectorBatch*>(structBatch->fields[0]); |
| intBatch = dynamic_cast<LongVectorBatch*>(listBatch->elements.get()); |
| |
| data = intBatch->data.data(); |
| offsets = listBatch->offsets.data(); |
| |
| EXPECT_EQ(4, structBatch->numElements); |
| EXPECT_EQ(4, listBatch->numElements); |
| EXPECT_EQ(1, listBatch->notNull[0]); |
| EXPECT_EQ(0, listBatch->notNull[1]); |
| EXPECT_EQ(1, listBatch->notNull[2]); |
| EXPECT_EQ(1, listBatch->notNull[3]); |
| |
| EXPECT_EQ(0, offsets[0]); |
| EXPECT_EQ(2, offsets[1]); |
| EXPECT_EQ(2, offsets[2]); |
| EXPECT_EQ(4, offsets[3]); |
| EXPECT_EQ(6, offsets[4]); |
| |
| for (int8_t i = 1; i < 7; ++i) { |
| EXPECT_EQ(i, data[i - 1]); |
| } |
| } |
| |
| TEST_P(WriterTest, testWriteNestedStructWithNull) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col0:struct<col1:bigint>>")); |
| |
| uint64_t stripeSize = 1024; |
| uint64_t compressionBlockSize = 1024; |
| |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion); |
| |
| // test data looks like below - |
| // {0} |
| // null |
| // {1} |
| // {2} |
| // {3} |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(5); |
| StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| StructVectorBatch* structBatch2 = dynamic_cast<StructVectorBatch*>(structBatch->fields[0]); |
| LongVectorBatch* intBatch = dynamic_cast<LongVectorBatch*>(structBatch2->fields[0]); |
| |
| structBatch->numElements = 5; |
| structBatch2->numElements = 5; |
| structBatch2->hasNulls = true; |
| structBatch2->notNull[1] = 0; |
| |
| intBatch->resize(5); |
| for (int64_t i = 0; i < 5; ++i) { |
| intBatch->data.data()[i] = i; |
| } |
| intBatch->notNull[1] = 0; |
| intBatch->hasNulls = true; |
| |
| writer->add(*batch); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| EXPECT_EQ(5, reader->getNumberOfRows()); |
| |
| batch = rowReader->createRowBatch(5); |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| structBatch2 = dynamic_cast<StructVectorBatch*>(structBatch->fields[0]); |
| intBatch = dynamic_cast<LongVectorBatch*>(structBatch2->fields[0]); |
| |
| for (uint64_t i = 0; i < 5; ++i) { |
| EXPECT_EQ(1, structBatch->notNull[i]); |
| } |
| |
| for (uint64_t i = 0; i < 5; ++i) { |
| if (i == 1) { |
| EXPECT_EQ(0, structBatch2->notNull[i]); |
| } else { |
| EXPECT_EQ(1, structBatch2->notNull[i]); |
| } |
| } |
| |
| for (uint64_t i = 0; i < 5; ++i) { |
| if (i == 1) { |
| EXPECT_EQ(0, intBatch->notNull[i]); |
| } else { |
| EXPECT_EQ(1, intBatch->notNull[i]); |
| } |
| } |
| |
| int64_t* data = intBatch->data.data(); |
| for (int8_t i = 0; i < 5; ++i) { |
| if (i != 1) { |
| EXPECT_EQ(i, data[i]); |
| } |
| } |
| } |
| |
| TEST_P(WriterTest, testWriteNestedStructWithNullIndex) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col0:struct<col1:bigint>>")); |
| |
| uint64_t stripeSize = 1024; |
| uint64_t compressionBlockSize = 1024; |
| |
| // 10000 rows with every 1000 row as an RG |
| // Each RG has 100 null rows except that the 5th RG is all null |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion, 1000); |
| |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(10000); |
| StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| StructVectorBatch* structBatch2 = dynamic_cast<StructVectorBatch*>(structBatch->fields[0]); |
| LongVectorBatch* intBatch = dynamic_cast<LongVectorBatch*>(structBatch2->fields[0]); |
| |
| structBatch->numElements = 10000; |
| structBatch2->numElements = 10000; |
| structBatch2->hasNulls = true; |
| |
| intBatch->resize(10000); |
| intBatch->hasNulls = true; |
| for (uint64_t i = 0; i < 10; ++i) { |
| for (uint64_t j = i * 1000 + 100 * i; j < i * 1000 + 100 * i + 100; ++j) { |
| structBatch2->notNull[j] = 0; |
| intBatch->notNull[j] = 0; |
| } |
| } |
| |
| for (uint64_t i = 5000; i < 6000; ++i) { |
| structBatch2->notNull[i] = 0; |
| intBatch->notNull[i] = 0; |
| } |
| |
| for (int64_t i = 0; i < 10000; ++i) { |
| intBatch->data.data()[i] = i; |
| } |
| |
| writer->add(*batch); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| EXPECT_EQ(10000, reader->getNumberOfRows()); |
| |
| batch = rowReader->createRowBatch(1000); |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| structBatch2 = dynamic_cast<StructVectorBatch*>(structBatch->fields[0]); |
| intBatch = dynamic_cast<LongVectorBatch*>(structBatch2->fields[0]); |
| |
| // Read rows 0 - 1000 |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| for (uint64_t i = 0; i < 1000; ++i) { |
| EXPECT_EQ(1, structBatch->notNull[i]); |
| } |
| |
| for (uint64_t i = 0; i < 1000; ++i) { |
| if (i < 100) { |
| EXPECT_EQ(0, structBatch2->notNull[i]); |
| } else { |
| EXPECT_EQ(1, structBatch2->notNull[i]); |
| } |
| } |
| |
| for (uint64_t i = 0; i < 1000; ++i) { |
| if (i < 100) { |
| EXPECT_EQ(0, intBatch->notNull[i]); |
| } else { |
| EXPECT_EQ(1, intBatch->notNull[i]); |
| EXPECT_EQ(i, intBatch->data.data()[i]); |
| } |
| } |
| |
| // Read rows 1800 - 2800, in which 2200 - 2300 are nulls |
| rowReader->seekToRow(1800); |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| for (uint64_t i = 0; i < 1000; ++i) { |
| EXPECT_EQ(1, structBatch->notNull[i]); |
| } |
| |
| for (uint64_t i = 0; i < 1000; ++i) { |
| if (i >= 400 && i < 500) { |
| EXPECT_EQ(0, structBatch2->notNull[i]); |
| } else { |
| EXPECT_EQ(1, structBatch2->notNull[i]); |
| } |
| } |
| |
| for (uint64_t i = 0; i < 1000; ++i) { |
| if (i >= 400 && i < 500) { |
| EXPECT_EQ(0, intBatch->notNull[i]); |
| } else { |
| EXPECT_EQ(1, intBatch->notNull[i]); |
| EXPECT_EQ(i + 1800, intBatch->data.data()[i]); |
| } |
| } |
| |
| // Read rows 5000 - 6000, all nulls |
| rowReader->seekToRow(5000); |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| for (uint64_t i = 0; i < 1000; ++i) { |
| EXPECT_EQ(1, structBatch->notNull[i]); |
| } |
| |
| for (uint64_t i = 0; i < 1000; ++i) { |
| EXPECT_EQ(0, structBatch2->notNull[i]); |
| } |
| |
| for (uint64_t i = 0; i < 1000; ++i) { |
| EXPECT_EQ(0, intBatch->notNull[i]); |
| } |
| |
| // Read rows 7200 - 8200, in which 7700 - 7800 are null |
| rowReader->seekToRow(7200); |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| for (uint64_t i = 0; i < 1000; ++i) { |
| EXPECT_EQ(1, structBatch->notNull[i]); |
| } |
| |
| for (uint64_t i = 0; i < 1000; ++i) { |
| if (i >= 500 && i < 600) { |
| EXPECT_EQ(0, structBatch2->notNull[i]); |
| } else { |
| EXPECT_EQ(1, structBatch2->notNull[i]); |
| } |
| } |
| |
| for (uint64_t i = 0; i < 1000; ++i) { |
| if (i >= 500 && i < 600) { |
| EXPECT_EQ(0, intBatch->notNull[i]); |
| } else { |
| EXPECT_EQ(1, intBatch->notNull[i]); |
| EXPECT_EQ(i + 7200, intBatch->data.data()[i]); |
| } |
| } |
| } |
| |
| TEST_P(WriterTest, testBloomFilter) { |
| WriterOptions options; |
| options.setStripeSize(1024) |
| .setCompressionBlockSize(64) |
| .setCompression(CompressionKind_ZSTD) |
| .setMemoryPool(getDefaultPool()) |
| .setRowIndexStride(10000) |
| .setFileVersion(fileVersion) |
| .setColumnsUseBloomFilter({1, 2, 3}); |
| |
| // write 65535 rows of data |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type(Type::buildTypeFromString("struct<c1:bigint,c2:string,c3:binary>")); |
| |
| char dataBuffer[327675]; // 300k |
| uint64_t offset = 0; |
| uint64_t rowCount = 65535; |
| |
| std::unique_ptr<Writer> writer = createWriter(*type, &memStream, options); |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); |
| StructVectorBatch& structBatch = dynamic_cast<StructVectorBatch&>(*batch); |
| LongVectorBatch& longBatch = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]); |
| StringVectorBatch& strBatch = dynamic_cast<StringVectorBatch&>(*structBatch.fields[1]); |
| StringVectorBatch& binBatch = dynamic_cast<StringVectorBatch&>(*structBatch.fields[2]); |
| |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| // each row group has a unique value |
| uint64_t data = (i / options.getRowIndexStride()); |
| |
| // c1 |
| longBatch.data[i] = static_cast<int64_t>(data); |
| |
| // c2 |
| std::ostringstream os; |
| os << data; |
| strBatch.data[i] = dataBuffer + offset; |
| strBatch.length[i] = static_cast<int64_t>(os.str().size()); |
| memcpy(dataBuffer + offset, os.str().c_str(), os.str().size()); |
| |
| // c3 |
| binBatch.data[i] = dataBuffer + offset; |
| binBatch.length[i] = static_cast<int64_t>(os.str().size()); |
| memcpy(dataBuffer + offset, os.str().c_str(), os.str().size()); |
| offset += os.str().size(); |
| } |
| |
| structBatch.numElements = rowCount; |
| longBatch.numElements = rowCount; |
| strBatch.numElements = rowCount; |
| binBatch.numElements = rowCount; |
| writer->add(*batch); |
| writer->close(); |
| |
| // verify bloomfilters |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| EXPECT_EQ(rowCount, reader->getNumberOfRows()); |
| |
| EXPECT_EQ(3, reader->getBloomFilters(0, {}).size()); |
| EXPECT_EQ(1, reader->getBloomFilters(0, {1}).size()); |
| EXPECT_EQ(1, reader->getBloomFilters(0, {2}).size()); |
| EXPECT_EQ(1, reader->getBloomFilters(0, {3}).size()); |
| |
| std::map<uint32_t, BloomFilterIndex> bfs = reader->getBloomFilters(0, {1, 2, 3}); |
| EXPECT_EQ(3, bfs.size()); |
| EXPECT_EQ(7, bfs[1].entries.size()); |
| EXPECT_EQ(7, bfs[2].entries.size()); |
| EXPECT_EQ(7, bfs[3].entries.size()); |
| |
| // test bloomfilters |
| for (uint64_t rg = 0; rg <= rowCount / options.getRowIndexStride(); ++rg) { |
| for (uint64_t value = 0; value <= 100; ++value) { |
| std::string str = to_string(static_cast<int64_t>(value)); |
| if (value == rg) { |
| EXPECT_TRUE(bfs[1].entries[rg]->testLong(static_cast<int64_t>(value))); |
| EXPECT_TRUE(bfs[2].entries[rg]->testBytes(str.c_str(), static_cast<int64_t>(str.size()))); |
| EXPECT_TRUE(bfs[3].entries[rg]->testBytes(str.c_str(), static_cast<int64_t>(str.size()))); |
| } else { |
| EXPECT_FALSE(bfs[1].entries[rg]->testLong(static_cast<int64_t>(value))); |
| EXPECT_FALSE( |
| bfs[2].entries[rg]->testBytes(str.c_str(), static_cast<int64_t>(str.size()))); |
| EXPECT_FALSE( |
| bfs[3].entries[rg]->testBytes(str.c_str(), static_cast<int64_t>(str.size()))); |
| } |
| } |
| } |
| } |
| |
| TEST(WriterTest, testSuppressPresentStream) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| size_t rowCount = 2000; |
| { |
| auto type = std::unique_ptr<Type>(Type::buildTypeFromString("struct<col1:int,col2:int>")); |
| WriterOptions options; |
| options.setStripeSize(1024 * 1024) |
| .setCompressionBlockSize(1024) |
| .setCompression(CompressionKind_NONE) |
| .setMemoryPool(pool) |
| .setRowIndexStride(1000); |
| |
| auto writer = createWriter(*type, &memStream, options); |
| auto batch = writer->createRowBatch(rowCount); |
| auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch); |
| auto& longBatch1 = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]); |
| auto& longBatch2 = dynamic_cast<LongVectorBatch&>(*structBatch.fields[1]); |
| structBatch.numElements = rowCount; |
| longBatch1.numElements = rowCount; |
| longBatch2.numElements = rowCount; |
| longBatch1.hasNulls = true; |
| for (size_t i = 0; i < rowCount; ++i) { |
| if (i % 2 == 0) { |
| longBatch1.notNull[i] = 0; |
| } else { |
| longBatch1.notNull[i] = 1; |
| longBatch1.data[i] = static_cast<int64_t>(i * 100); |
| } |
| longBatch2.data[i] = static_cast<int64_t>(i * 300); |
| } |
| writer->add(*batch); |
| writer->close(); |
| } |
| // read file & check the present stream |
| { |
| auto inStream = |
| std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| ReaderOptions readerOptions; |
| readerOptions.setMemoryPool(*pool); |
| std::unique_ptr<Reader> reader = createReader(std::move(inStream), readerOptions); |
| EXPECT_EQ(rowCount, reader->getNumberOfRows()); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| auto batch = rowReader->createRowBatch(1000); |
| EXPECT_TRUE(rowReader->next(*batch)); |
| EXPECT_EQ(1000, batch->numElements); |
| auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch); |
| auto& longBatch1 = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]); |
| auto& longBatch2 = dynamic_cast<LongVectorBatch&>(*structBatch.fields[1]); |
| for (size_t i = 0; i < 1000; ++i) { |
| if (i % 2 == 0) { |
| EXPECT_FALSE(longBatch1.notNull[i]); |
| } else { |
| EXPECT_TRUE(longBatch1.notNull[i]); |
| EXPECT_EQ(longBatch1.data[i], static_cast<int64_t>(i * 100)); |
| } |
| EXPECT_EQ(longBatch2.data[i], static_cast<int64_t>(i * 300)); |
| } |
| // Read rows 1500 - 2000 |
| rowReader->seekToRow(1500); |
| EXPECT_TRUE(rowReader->next(*batch)); |
| EXPECT_EQ(500, batch->numElements); |
| for (size_t i = 0; i < 500; ++i) { |
| if (i % 2 == 0) { |
| EXPECT_FALSE(longBatch1.notNull[i]); |
| } else { |
| EXPECT_TRUE(longBatch1.notNull[i]); |
| EXPECT_EQ(longBatch1.data[i], static_cast<int64_t>((i + 1500) * 100)); |
| } |
| EXPECT_EQ(longBatch2.data[i], static_cast<int64_t>((i + 1500) * 300)); |
| } |
| // fetch StripeFooter from pb stream |
| std::unique_ptr<StripeInformation> stripeInfo = reader->getStripe(0); |
| ReaderImpl* readerImpl = dynamic_cast<ReaderImpl*>(reader.get()); |
| auto pbStream = std::make_unique<SeekableFileInputStream>( |
| readerImpl->getStream(), |
| stripeInfo->getOffset() + stripeInfo->getIndexLength() + stripeInfo->getDataLength(), |
| stripeInfo->getFooterLength(), *pool); |
| proto::StripeFooter stripeFooter; |
| if (!stripeFooter.ParseFromZeroCopyStream(pbStream.get())) { |
| throw ParseError("Parse stripe footer from pb stream failed"); |
| } |
| for (int i = 0; i < stripeFooter.streams_size(); ++i) { |
| const proto::Stream& stream = stripeFooter.streams(i); |
| if (stream.has_kind() && stream.kind() == proto::Stream_Kind_PRESENT) { |
| EXPECT_EQ(stream.column(), 1UL); |
| } |
| } |
| } |
| } |
| |
| // Before the fix of ORC-1288, this case will trigger the bug about |
| // invalid memory freeing with zlib compression when writing a orc file |
| // that contains multiple stripes, and each stripe contains multiple columns |
| // with no null values. |
| void testSuppressPresentStream(orc::CompressionKind kind) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| uint64_t rowCount = 5000000; |
| auto type = std::unique_ptr<Type>(Type::buildTypeFromString("struct<c0:int>")); |
| WriterOptions options; |
| options.setStripeSize(1024).setCompressionBlockSize(1024).setCompression(kind).setMemoryPool( |
| pool); |
| |
| auto writer = createWriter(*type, &memStream, options); |
| auto batch = writer->createRowBatch(rowCount); |
| auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch); |
| auto& longBatch = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]); |
| uint64_t rows = 0; |
| uint64_t batchSize = 10000; |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| longBatch.data[i] = static_cast<int64_t>(i); |
| ++rows; |
| if (rows == batchSize) { |
| structBatch.numElements = rows; |
| longBatch.numElements = rows; |
| writer->add(*batch); |
| rows = 0; |
| } |
| } |
| if (rows != 0) { |
| structBatch.numElements = rows; |
| longBatch.numElements = rows; |
| writer->add(*batch); |
| rows = 0; |
| } |
| writer->close(); |
| } |
| |
| TEST(WriterTest, suppressPresentStreamWithCompressionKinds) { |
| testSuppressPresentStream(CompressionKind_ZLIB); |
| testSuppressPresentStream(CompressionKind_ZSTD); |
| testSuppressPresentStream(CompressionKind_LZ4); |
| testSuppressPresentStream(CompressionKind_SNAPPY); |
| } |
| |
| void testSetOutputBufferCapacity(uint64_t capacity) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| size_t rowCount = 1000; |
| { |
| auto type = std::unique_ptr<Type>(Type::buildTypeFromString("struct<col1:int,col2:int>")); |
| WriterOptions options; |
| options.setStripeSize(1024 * 1024) |
| .setCompressionBlockSize(64 * 1024) |
| .setCompression(CompressionKind_NONE) |
| .setMemoryPool(pool) |
| .setRowIndexStride(1000) |
| .setOutputBufferCapacity(capacity); |
| |
| auto writer = createWriter(*type, &memStream, options); |
| auto batch = writer->createRowBatch(rowCount); |
| auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch); |
| auto& longBatch1 = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]); |
| auto& longBatch2 = dynamic_cast<LongVectorBatch&>(*structBatch.fields[1]); |
| structBatch.numElements = rowCount; |
| longBatch1.numElements = rowCount; |
| longBatch2.numElements = rowCount; |
| for (size_t i = 0; i < rowCount; ++i) { |
| longBatch1.data[i] = static_cast<int64_t>(i * 100); |
| longBatch2.data[i] = static_cast<int64_t>(i * 300); |
| } |
| writer->add(*batch); |
| writer->close(); |
| } |
| // read orc file & check the data |
| { |
| std::unique_ptr<InputStream> inStream( |
| new MemoryInputStream(memStream.getData(), memStream.getLength())); |
| ReaderOptions readerOptions; |
| readerOptions.setMemoryPool(*pool); |
| std::unique_ptr<Reader> reader = createReader(std::move(inStream), readerOptions); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| auto batch = rowReader->createRowBatch(rowCount); |
| EXPECT_TRUE(rowReader->next(*batch)); |
| EXPECT_EQ(rowCount, batch->numElements); |
| auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch); |
| auto& longBatch1 = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]); |
| auto& longBatch2 = dynamic_cast<LongVectorBatch&>(*structBatch.fields[1]); |
| for (size_t i = 0; i < rowCount; ++i) { |
| EXPECT_EQ(longBatch1.data[i], static_cast<int64_t>(i * 100)); |
| EXPECT_EQ(longBatch2.data[i], static_cast<int64_t>(i * 300)); |
| } |
| } |
| } |
| |
| TEST(WriterTest, setOutputBufferCapacity) { |
| // compression block size > output buffer capacity |
| testSetOutputBufferCapacity(1024); |
| // compression block size = output buffer capacity |
| testSetOutputBufferCapacity(64 * 1024); |
| // compression block size < output buffer capacity |
| testSetOutputBufferCapacity(1024 * 1024); |
| } |
| |
| TEST_P(WriterTest, testWriteFixedWidthNumericVectorBatch) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| std::unique_ptr<Type> type( |
| Type::buildTypeFromString("struct<col1:double,col2:float,col3:int,col4:smallint,col5:" |
| "tinyint,col6:bigint,col7:boolean>")); |
| |
| uint64_t stripeSize = 16 * 1024; |
| uint64_t compressionBlockSize = 1024; |
| uint64_t rowCount = 65530; |
| |
| std::vector<double> data(rowCount); |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| data[i] = 100000 * (std::rand() * 1.0 / RAND_MAX); |
| } |
| |
| std::unique_ptr<Writer> writer = |
| createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool, |
| &memStream, fileVersion, 0, "GMT", true); |
| // start from here/ |
| std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount / 2); |
| StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| DoubleVectorBatch* doubleBatch = dynamic_cast<DoubleVectorBatch*>(structBatch->fields[0]); |
| FloatVectorBatch* floatBatch = dynamic_cast<FloatVectorBatch*>(structBatch->fields[1]); |
| IntVectorBatch* intBatch = dynamic_cast<IntVectorBatch*>(structBatch->fields[2]); |
| ShortVectorBatch* shortBatch = dynamic_cast<ShortVectorBatch*>(structBatch->fields[3]); |
| ByteVectorBatch* byteBatch = dynamic_cast<ByteVectorBatch*>(structBatch->fields[4]); |
| LongVectorBatch* longBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[5]); |
| ByteVectorBatch* boolBatch = dynamic_cast<ByteVectorBatch*>(structBatch->fields[6]); |
| structBatch->resize(rowCount); |
| doubleBatch->resize(rowCount); |
| floatBatch->resize(rowCount); |
| intBatch->resize(rowCount); |
| shortBatch->resize(rowCount); |
| byteBatch->resize(rowCount); |
| longBatch->resize(rowCount); |
| boolBatch->resize(rowCount); |
| |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| structBatch->notNull[i] = 1; |
| doubleBatch->notNull[i] = 1; |
| floatBatch->notNull[i] = 1; |
| intBatch->notNull[i] = 1; |
| shortBatch->notNull[i] = 1; |
| byteBatch->notNull[i] = 1; |
| longBatch->notNull[i] = 1; |
| boolBatch->notNull[i] = 1; |
| |
| doubleBatch->data[i] = data[i]; |
| floatBatch->data[i] = static_cast<float>(data[i]); |
| intBatch->data[i] = static_cast<int32_t>(i); |
| shortBatch->data[i] = static_cast<int16_t>(i); |
| byteBatch->data[i] = static_cast<int8_t>(i); |
| longBatch->data[i] = static_cast<int64_t>(i); |
| boolBatch->data[i] = static_cast<bool>((i % 17) % 2); |
| } |
| |
| structBatch->numElements = rowCount; |
| doubleBatch->numElements = rowCount; |
| floatBatch->numElements = rowCount; |
| intBatch->numElements = rowCount; |
| shortBatch->numElements = rowCount; |
| byteBatch->numElements = rowCount; |
| longBatch->numElements = rowCount; |
| boolBatch->numElements = rowCount; |
| |
| writer->add(*batch); |
| writer->close(); |
| |
| auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength()); |
| std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get(), "GMT", true); |
| |
| EXPECT_EQ(rowCount, reader->getNumberOfRows()); |
| |
| batch = rowReader->createRowBatch(rowCount); |
| EXPECT_EQ(true, rowReader->next(*batch)); |
| EXPECT_EQ(rowCount, batch->numElements); |
| |
| structBatch = dynamic_cast<StructVectorBatch*>(batch.get()); |
| doubleBatch = dynamic_cast<DoubleVectorBatch*>(structBatch->fields[0]); |
| floatBatch = dynamic_cast<FloatVectorBatch*>(structBatch->fields[1]); |
| intBatch = dynamic_cast<IntVectorBatch*>(structBatch->fields[2]); |
| shortBatch = dynamic_cast<ShortVectorBatch*>(structBatch->fields[3]); |
| byteBatch = dynamic_cast<ByteVectorBatch*>(structBatch->fields[4]); |
| longBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[5]); |
| boolBatch = dynamic_cast<ByteVectorBatch*>(structBatch->fields[6]); |
| for (uint64_t i = 0; i < rowCount; ++i) { |
| EXPECT_TRUE(std::abs(data[i] - doubleBatch->data[i]) < 0.000001); |
| EXPECT_TRUE(std::abs(static_cast<float>(data[i]) - static_cast<float>(floatBatch->data[i])) < |
| 0.000001f); |
| EXPECT_EQ(intBatch->data[i], static_cast<int32_t>(i)); |
| EXPECT_EQ(shortBatch->data[i], static_cast<int16_t>(i)); |
| EXPECT_EQ(byteBatch->data[i], static_cast<int8_t>(i)); |
| EXPECT_EQ(longBatch->data[i], static_cast<int64_t>(i)); |
| EXPECT_EQ(boolBatch->data[i], static_cast<bool>((i % 17) % 2)); |
| } |
| EXPECT_FALSE(rowReader->next(*batch)); |
| } |
| |
| // first stripe has no null value and second stripe has null value. |
| // make sure stripes do not have dirty data in the present streams. |
| TEST_P(WriterTest, testSuppressPresentStreamInPreStripe) { |
| MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); |
| MemoryPool* pool = getDefaultPool(); |
| |
| // [1-998000): notNull, value is equal to index |
| // [998000-999000): null |
| // [999000-1000000]: notNoll, value is equal to index |
| size_t rowCount = 1000000; |
| size_t nullBeginCount = 998000; |
| size_t nullEndCount = 999000; |
| size_t batchSize = 5; |
| { |
| auto type = std::unique_ptr<Type>(Type::buildTypeFromString("struct<col1:int>")); |
| WriterOptions options; |
| options.setStripeSize(16 * 1024) |
| .setCompressionBlockSize(1024) |
| .setCompression(CompressionKind_NONE) |
| .setMemoryPool(pool) |
| .setRowIndexStride(1000); |
| |
| auto writer = createWriter(*type, &memStream, options); |
| |
| uint64_t batchCount = rowCount / batchSize; |
| size_t rowsWrite = 0; |
| for (uint64_t batchIdx = 0; batchIdx < batchCount; batchIdx++) { |
| auto batch = writer->createRowBatch(batchSize); |
| auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch); |
| auto& longBatch = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]); |
| structBatch.numElements = batchSize; |
| longBatch.numElements = batchSize; |
| longBatch.hasNulls = false; |
| for (uint64_t row = 0; row < batchSize; ++row) { |
| size_t rowIndex = rowsWrite + row + 1; |
| if (rowIndex < nullBeginCount || rowIndex >= nullEndCount) { |
| longBatch.data[row] = static_cast<int64_t>(rowIndex); |
| } else { |
| longBatch.notNull[row] = 0; |
| longBatch.hasNulls = true; |
| } |
| } |
| |
| writer->add(*batch); |
| rowsWrite += batch->numElements; |
| } |
| writer->close(); |
| } |
| // read file & check the column value correct |
| { |
| std::unique_ptr<MemoryInputStream> inStream(new MemoryInputStream( |
| memStream.getData(), memStream.getLength())); |
| ReaderOptions readerOptions; |
| readerOptions.setMemoryPool(*pool); |
| std::unique_ptr<Reader> reader = createReader(std::move(inStream), readerOptions); |
| EXPECT_EQ(reader->getNumberOfStripes(), 2); |
| EXPECT_EQ(rowCount, reader->getNumberOfRows()); |
| std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); |
| size_t rowsRead = 0; |
| while (rowsRead < rowCount) { |
| auto batch = rowReader->createRowBatch(1000); |
| EXPECT_TRUE(rowReader->next(*batch)); |
| auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch); |
| auto& longBatch = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]); |
| for (size_t i = 0; i < batch->numElements; ++i) { |
| size_t rowIndex = rowsRead + i + 1; |
| if (rowIndex < nullBeginCount || rowIndex >= nullEndCount) { |
| EXPECT_TRUE(longBatch.notNull[i]); |
| EXPECT_EQ(longBatch.data[i], static_cast<int64_t>(rowIndex)); |
| } else { |
| EXPECT_FALSE(longBatch.notNull[i]); |
| } |
| } |
| rowsRead += batch->numElements; |
| } |
| } |
| } |
| |
| INSTANTIATE_TEST_SUITE_P(OrcTest, WriterTest, |
| Values(FileVersion::v_0_11(), FileVersion::v_0_12(), |
| FileVersion::UNSTABLE_PRE_2_0())); |
| } // namespace orc |