blob: 87514d87db636e1409bd7df9d33a281f9ba9397e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <limits>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "arrow/array/array_binary.h"
#include "arrow/util/macros.h"
#include "parquet/column_page.h"
#include "parquet/column_reader.h"
#include "parquet/schema.h"
#include "parquet/test_util.h"
#include "parquet/types.h"
namespace parquet {
using ParquetType = parquet::Type;
using internal::BinaryRecordReader;
using internal::LevelInfo;
using schema::GroupNode;
using schema::NodePtr;
using schema::PrimitiveNode;
using testing::ElementsAre;
namespace test {
template <typename T>
static inline bool vector_equal_with_def_levels(const std::vector<T>& left,
const std::vector<int16_t>& def_levels,
int16_t max_def_levels,
int16_t max_rep_levels,
const std::vector<T>& right) {
size_t i_left = 0;
size_t i_right = 0;
for (size_t i = 0; i < def_levels.size(); i++) {
if (def_levels[i] == max_def_levels) {
// Compare
if (left[i_left] != right[i_right]) {
std::cerr << "index " << i << " left was " << left[i_left] << " right was "
<< right[i] << std::endl;
return false;
}
i_left++;
i_right++;
} else if (def_levels[i] == (max_def_levels - 1)) {
// Null entry on the lowest nested level
i_right++;
} else if (def_levels[i] < (max_def_levels - 1)) {
// Null entry on a higher nesting level, only supported for non-repeating data
if (max_rep_levels == 0) {
i_right++;
}
}
}
return true;
}
class TestPrimitiveReader : public ::testing::Test {
public:
void InitReader(const ColumnDescriptor* d) {
auto pager = std::make_unique<MockPageReader>(pages_);
reader_ = ColumnReader::Make(d, std::move(pager));
}
void CheckResults() {
std::vector<int32_t> vresult(num_values_, -1);
std::vector<int16_t> dresult(num_levels_, -1);
std::vector<int16_t> rresult(num_levels_, -1);
int64_t values_read = 0;
int total_values_read = 0;
int batch_actual = 0;
Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
int32_t batch_size = 8;
int batch = 0;
// This will cover both the cases
// 1) batch_size < page_size (multiple ReadBatch from a single page)
// 2) batch_size > page_size (BatchRead limits to a single page)
do {
batch = static_cast<int>(reader->ReadBatch(
batch_size, &dresult[0] + batch_actual, &rresult[0] + batch_actual,
&vresult[0] + total_values_read, &values_read));
total_values_read += static_cast<int>(values_read);
batch_actual += batch;
batch_size = std::min(1 << 24, std::max(batch_size * 2, 4096));
} while (batch > 0);
ASSERT_EQ(num_levels_, batch_actual);
ASSERT_EQ(num_values_, total_values_read);
ASSERT_TRUE(vector_equal(values_, vresult));
if (max_def_level_ > 0) {
ASSERT_TRUE(vector_equal(def_levels_, dresult));
}
if (max_rep_level_ > 0) {
ASSERT_TRUE(vector_equal(rep_levels_, rresult));
}
// catch improper writes at EOS
batch_actual =
static_cast<int>(reader->ReadBatch(5, nullptr, nullptr, nullptr, &values_read));
ASSERT_EQ(0, batch_actual);
ASSERT_EQ(0, values_read);
}
void Clear() {
values_.clear();
def_levels_.clear();
rep_levels_.clear();
pages_.clear();
reader_.reset();
}
void ExecutePlain(int num_pages, int levels_per_page, const ColumnDescriptor* d) {
num_values_ =
MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_,
values_, data_buffer_, pages_, Encoding::PLAIN);
num_levels_ = num_pages * levels_per_page;
InitReader(d);
CheckResults();
Clear();
}
void ExecuteDict(int num_pages, int levels_per_page, const ColumnDescriptor* d) {
num_values_ =
MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_,
values_, data_buffer_, pages_, Encoding::RLE_DICTIONARY);
num_levels_ = num_pages * levels_per_page;
InitReader(d);
CheckResults();
Clear();
}
protected:
int num_levels_;
int num_values_;
int16_t max_def_level_;
int16_t max_rep_level_;
std::vector<std::shared_ptr<Page>> pages_;
std::shared_ptr<ColumnReader> reader_;
std::vector<int32_t> values_;
std::vector<int16_t> def_levels_;
std::vector<int16_t> rep_levels_;
std::vector<uint8_t> data_buffer_; // For BA and FLBA
};
TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
int levels_per_page = 100;
int num_pages = 50;
max_def_level_ = 0;
max_rep_level_ = 0;
NodePtr type = schema::Int32("a", Repetition::REQUIRED);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
ASSERT_NO_FATAL_FAILURE(ExecutePlain(num_pages, levels_per_page, &descr));
ASSERT_NO_FATAL_FAILURE(ExecuteDict(num_pages, levels_per_page, &descr));
}
TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
int levels_per_page = 100;
int num_pages = 50;
max_def_level_ = 4;
max_rep_level_ = 0;
NodePtr type = schema::Int32("b", Repetition::OPTIONAL);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
ASSERT_NO_FATAL_FAILURE(ExecutePlain(num_pages, levels_per_page, &descr));
ASSERT_NO_FATAL_FAILURE(ExecuteDict(num_pages, levels_per_page, &descr));
}
TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
int levels_per_page = 100;
int num_pages = 50;
max_def_level_ = 4;
max_rep_level_ = 2;
NodePtr type = schema::Int32("c", Repetition::REPEATED);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
ASSERT_NO_FATAL_FAILURE(ExecutePlain(num_pages, levels_per_page, &descr));
ASSERT_NO_FATAL_FAILURE(ExecuteDict(num_pages, levels_per_page, &descr));
}
// Tests skipping around page boundaries.
TEST_F(TestPrimitiveReader, TestSkipAroundPageBoundaries) {
int levels_per_page = 100;
int num_pages = 7;
max_def_level_ = 0;
max_rep_level_ = 0;
NodePtr type = schema::Int32("b", Repetition::REQUIRED);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
MakePages<Int32Type>(&descr, num_pages, levels_per_page, def_levels_, rep_levels_,
values_, data_buffer_, pages_, Encoding::PLAIN);
InitReader(&descr);
std::vector<int32_t> vresult(levels_per_page / 2, -1);
std::vector<int16_t> dresult(levels_per_page / 2, -1);
std::vector<int16_t> rresult(levels_per_page / 2, -1);
Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
int64_t values_read = 0;
// 1) skip_size > page_size (multiple pages skipped)
// Skip first 2 pages
int64_t levels_skipped = reader->Skip(2 * levels_per_page);
ASSERT_EQ(2 * levels_per_page, levels_skipped);
// Read half a page
reader->ReadBatch(levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(),
&values_read);
std::vector<int32_t> sub_values(
values_.begin() + 2 * levels_per_page,
values_.begin() + static_cast<int>(2.5 * levels_per_page));
ASSERT_TRUE(vector_equal(sub_values, vresult));
// 2) skip_size == page_size (skip across two pages from page 2.5 to 3.5)
levels_skipped = reader->Skip(levels_per_page);
ASSERT_EQ(levels_per_page, levels_skipped);
// Read half a page (page 3.5 to 4)
reader->ReadBatch(levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(),
&values_read);
sub_values.clear();
sub_values.insert(sub_values.end(),
values_.begin() + static_cast<int>(3.5 * levels_per_page),
values_.begin() + 4 * levels_per_page);
ASSERT_TRUE(vector_equal(sub_values, vresult));
// 3) skip_size == page_size (skip page 4 from start of the page to the end)
levels_skipped = reader->Skip(levels_per_page);
ASSERT_EQ(levels_per_page, levels_skipped);
// Read half a page (page 5 to 5.5)
reader->ReadBatch(levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(),
&values_read);
sub_values.clear();
sub_values.insert(sub_values.end(),
values_.begin() + static_cast<int>(5.0 * levels_per_page),
values_.begin() + static_cast<int>(5.5 * levels_per_page));
ASSERT_TRUE(vector_equal(sub_values, vresult));
// 4) skip_size < page_size (skip limited to a single page)
// Skip half a page (page 5.5 to 6)
levels_skipped = reader->Skip(levels_per_page / 2);
ASSERT_EQ(0.5 * levels_per_page, levels_skipped);
// Read half a page (6 to 6.5)
reader->ReadBatch(levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(),
&values_read);
sub_values.clear();
sub_values.insert(sub_values.end(),
values_.begin() + static_cast<int>(6.0 * levels_per_page),
values_.begin() + static_cast<int>(6.5 * levels_per_page));
ASSERT_TRUE(vector_equal(sub_values, vresult));
// 5) skip_size = 0
levels_skipped = reader->Skip(0);
ASSERT_EQ(0, levels_skipped);
// 6) Skip past the end page.
levels_skipped = reader->Skip(levels_per_page / 2 + 10);
ASSERT_EQ(levels_per_page / 2, levels_skipped);
values_.clear();
def_levels_.clear();
rep_levels_.clear();
pages_.clear();
reader_.reset();
}
// Skip with repeated field. This test makes it clear that we are skipping
// values and not records.
TEST_F(TestPrimitiveReader, TestSkipRepeatedField) {
// Example schema: message M { repeated int32 b = 1 }
max_def_level_ = 1;
max_rep_level_ = 1;
NodePtr type = schema::Int32("b", Repetition::REPEATED);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
// Example rows: {}, {[10, 10]}, {[20, 20, 20]}
std::vector<int32_t> values = {10, 10, 20, 20, 20};
std::vector<int16_t> def_levels = {0, 1, 1, 1, 1, 1};
std::vector<int16_t> rep_levels = {0, 0, 1, 0, 1, 1};
num_values_ = static_cast<int>(def_levels.size());
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
&descr, values, num_values_, Encoding::PLAIN, /*indices=*/{},
/*indices_size=*/0, def_levels, max_def_level_, rep_levels, max_rep_level_);
pages_.push_back(std::move(page));
InitReader(&descr);
Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
// Vectors to hold read values, definition levels, and repetition levels.
std::vector<int32_t> read_vals(4, -1);
std::vector<int16_t> read_defs(4, -1);
std::vector<int16_t> read_reps(4, -1);
// Skip two levels.
int64_t levels_skipped = reader->Skip(2);
ASSERT_EQ(2, levels_skipped);
int64_t num_read_values = 0;
// Read the next set of values
reader->ReadBatch(10, read_defs.data(), read_reps.data(), read_vals.data(),
&num_read_values);
ASSERT_EQ(num_read_values, 4);
// Note that we end up in the record with {[10, 10]}
ASSERT_TRUE(vector_equal({10, 20, 20, 20}, read_vals));
ASSERT_TRUE(vector_equal({1, 1, 1, 1}, read_defs));
ASSERT_TRUE(vector_equal({1, 0, 1, 1}, read_reps));
// No values remain in data page
levels_skipped = reader->Skip(2);
ASSERT_EQ(0, levels_skipped);
reader->ReadBatch(10, read_defs.data(), read_reps.data(), read_vals.data(),
&num_read_values);
ASSERT_EQ(num_read_values, 0);
}
// Page claims to have two values but only 1 is present.
TEST_F(TestPrimitiveReader, TestReadValuesMissing) {
max_def_level_ = 1;
max_rep_level_ = 0;
constexpr int batch_size = 1;
std::vector<bool> values(1, false);
std::vector<int16_t> input_def_levels(1, 1);
NodePtr type = schema::Boolean("a", Repetition::OPTIONAL);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
// The data page falls back to plain encoding
std::shared_ptr<ResizableBuffer> dummy = AllocateBuffer();
std::shared_ptr<DataPageV1> data_page = MakeDataPage<BooleanType>(
&descr, values, /*num_values=*/2, Encoding::PLAIN, /*indices=*/{},
/*indices_size=*/0, /*def_levels=*/input_def_levels, max_def_level_,
/*rep_levels=*/{},
/*max_rep_level=*/max_rep_level_);
pages_.push_back(data_page);
InitReader(&descr);
auto reader = static_cast<BoolReader*>(reader_.get());
ASSERT_TRUE(reader->HasNext());
std::vector<int16_t> def_levels(batch_size, 0);
std::vector<int16_t> rep_levels(batch_size, 0);
bool values_out[batch_size];
int64_t values_read;
EXPECT_EQ(1, reader->ReadBatch(batch_size, def_levels.data(), rep_levels.data(),
values_out, &values_read));
ASSERT_THROW(reader->ReadBatch(batch_size, def_levels.data(), rep_levels.data(),
values_out, &values_read),
ParquetException);
}
// GH-41321: When max_def_level > 0 or max_rep_level > 0, and
// Page has more or less levels than the `num_values` in
// PageHeader. We should detect and throw exception.
TEST_F(TestPrimitiveReader, DefRepLevelNotExpected) {
auto do_check = [&](const NodePtr& type, const std::vector<int16_t>& input_def_levels,
const std::vector<int16_t>& input_rep_levels, int num_values) {
std::vector<bool> values(num_values, false);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
// The data page falls back to plain encoding
std::shared_ptr<ResizableBuffer> dummy = AllocateBuffer();
std::shared_ptr<DataPageV1> data_page = MakeDataPage<BooleanType>(
&descr, values, /*num_values=*/num_values, Encoding::PLAIN, /*indices=*/{},
/*indices_size=*/0, /*def_levels=*/input_def_levels, max_def_level_,
/*rep_levels=*/input_rep_levels,
/*max_rep_level=*/max_rep_level_);
pages_.push_back(data_page);
InitReader(&descr);
auto reader = static_cast<BoolReader*>(reader_.get());
ASSERT_TRUE(reader->HasNext());
constexpr int batch_size = 10;
std::vector<int16_t> def_levels(batch_size, 0);
std::vector<int16_t> rep_levels(batch_size, 0);
bool values_out[batch_size];
int64_t values_read;
EXPECT_THROW_THAT(
[&]() {
reader->ReadBatch(batch_size, def_levels.data(), rep_levels.data(), values_out,
&values_read);
},
ParquetException,
::testing::Property(&ParquetException::what,
::testing::HasSubstr("Number of decoded rep / def levels do "
"not match num_values in page header")));
};
// storing def-levels less than value in page-header
{
max_def_level_ = 1;
max_rep_level_ = 0;
NodePtr type = schema::Boolean("a", Repetition::OPTIONAL);
std::vector<int16_t> input_def_levels(1, 1);
std::vector<int16_t> input_rep_levels{};
do_check(type, input_def_levels, input_rep_levels, /*num_values=*/3);
}
// storing def-levels more than value in page-header
{
max_def_level_ = 1;
max_rep_level_ = 0;
NodePtr type = schema::Boolean("a", Repetition::OPTIONAL);
std::vector<int16_t> input_def_levels(2, 1);
std::vector<int16_t> input_rep_levels{};
do_check(type, input_def_levels, input_rep_levels, /*num_values=*/1);
}
// storing rep-levels less than value in page-header
{
max_def_level_ = 0;
max_rep_level_ = 1;
NodePtr type = schema::Boolean("a", Repetition::REPEATED);
std::vector<int16_t> input_def_levels{};
std::vector<int16_t> input_rep_levels(3, 0);
do_check(type, input_def_levels, input_rep_levels, /*num_values=*/4);
}
// storing rep-levels more than value in page-header
{
max_def_level_ = 0;
max_rep_level_ = 1;
NodePtr type = schema::Boolean("a", Repetition::REPEATED);
std::vector<int16_t> input_def_levels{};
std::vector<int16_t> input_rep_levels(2, 1);
do_check(type, input_def_levels, input_rep_levels, /*num_values=*/1);
}
}
// Repetition level byte length reported in Page but Max Repetition level
// is zero for the column.
TEST_F(TestPrimitiveReader, TestRepetitionLvlBytesWithMaxRepetitionZero) {
constexpr int batch_size = 4;
max_def_level_ = 1;
max_rep_level_ = 0;
NodePtr type = schema::Int32("a", Repetition::OPTIONAL);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
// Bytes here came from the example parquet file in ARROW-17453's int32
// column which was delta bit-packed. The key part is the first three
// bytes: the page header reports 1 byte for repetition levels even
// though the max rep level is 0. If that byte isn't skipped then
// we get def levels of [1, 1, 0, 0] instead of the correct [1, 1, 1, 0].
const std::vector<uint8_t> page_data{0x3, 0x3, 0x7, 0x80, 0x1, 0x4, 0x3,
0x18, 0x1, 0x2, 0x0, 0x0, 0x0, 0xc,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0};
std::shared_ptr<DataPageV2> data_page =
std::make_shared<DataPageV2>(Buffer::Wrap(page_data.data(), page_data.size()), 4, 1,
4, Encoding::DELTA_BINARY_PACKED, 2, 1, 21);
pages_.push_back(data_page);
InitReader(&descr);
auto reader = static_cast<Int32Reader*>(reader_.get());
int16_t def_levels_out[batch_size];
int32_t values[batch_size];
int64_t values_read;
ASSERT_TRUE(reader->HasNext());
EXPECT_EQ(4, reader->ReadBatch(batch_size, def_levels_out, /*rep_levels=*/nullptr,
values, &values_read));
EXPECT_EQ(3, values_read);
}
// Page claims to have two values but only 1 is present.
TEST_F(TestPrimitiveReader, TestReadValuesMissingWithDictionary) {
constexpr int batch_size = 1;
max_def_level_ = 1;
max_rep_level_ = 0;
NodePtr type = schema::Int32("a", Repetition::OPTIONAL);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
std::shared_ptr<ResizableBuffer> dummy = AllocateBuffer();
std::shared_ptr<DictionaryPage> dict_page =
std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN);
std::vector<int16_t> input_def_levels(1, 0);
std::shared_ptr<DataPageV1> data_page = MakeDataPage<Int32Type>(
&descr, {}, /*num_values=*/2, Encoding::RLE_DICTIONARY, /*indices=*/{},
/*indices_size=*/0, /*def_levels=*/input_def_levels, max_def_level_,
/*rep_levels=*/{},
/*max_rep_level=*/0);
pages_.push_back(dict_page);
pages_.push_back(data_page);
InitReader(&descr);
auto reader = static_cast<ByteArrayReader*>(reader_.get());
const ByteArray* dict = nullptr;
int32_t dict_len = 0;
int64_t indices_read = 0;
int32_t indices[batch_size];
int16_t def_levels_out[batch_size];
ASSERT_TRUE(reader->HasNext());
EXPECT_EQ(1, reader->ReadBatchWithDictionary(batch_size, def_levels_out,
/*rep_levels=*/nullptr, indices,
&indices_read, &dict, &dict_len));
ASSERT_THROW(reader->ReadBatchWithDictionary(batch_size, def_levels_out,
/*rep_levels=*/nullptr, indices,
&indices_read, &dict, &dict_len),
ParquetException);
}
TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) {
max_def_level_ = 0;
max_rep_level_ = 0;
NodePtr type = schema::Int32("a", Repetition::REQUIRED);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
std::shared_ptr<ResizableBuffer> dummy = AllocateBuffer();
std::shared_ptr<DictionaryPage> dict_page =
std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN);
std::shared_ptr<DataPageV1> data_page = MakeDataPage<Int32Type>(
&descr, {}, 0, Encoding::RLE_DICTIONARY, {}, 0, {}, 0, {}, 0);
pages_.push_back(dict_page);
pages_.push_back(data_page);
InitReader(&descr);
// Tests Dict : PLAIN, Data : RLE_DICTIONARY
ASSERT_NO_THROW(reader_->HasNext());
pages_.clear();
dict_page = std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN_DICTIONARY);
data_page = MakeDataPage<Int32Type>(&descr, {}, 0, Encoding::PLAIN_DICTIONARY, {}, 0,
{}, 0, {}, 0);
pages_.push_back(dict_page);
pages_.push_back(data_page);
InitReader(&descr);
// Tests Dict : PLAIN_DICTIONARY, Data : PLAIN_DICTIONARY
ASSERT_NO_THROW(reader_->HasNext());
pages_.clear();
data_page = MakeDataPage<Int32Type>(&descr, {}, 0, Encoding::RLE_DICTIONARY, {}, 0, {},
0, {}, 0);
pages_.push_back(data_page);
InitReader(&descr);
// Tests dictionary page must occur before data page
ASSERT_THROW(reader_->HasNext(), ParquetException);
pages_.clear();
dict_page = std::make_shared<DictionaryPage>(dummy, 0, Encoding::DELTA_BYTE_ARRAY);
pages_.push_back(dict_page);
InitReader(&descr);
// Tests only RLE_DICTIONARY is supported
ASSERT_THROW(reader_->HasNext(), ParquetException);
pages_.clear();
std::shared_ptr<DictionaryPage> dict_page1 =
std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN_DICTIONARY);
std::shared_ptr<DictionaryPage> dict_page2 =
std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN);
pages_.push_back(dict_page1);
pages_.push_back(dict_page2);
InitReader(&descr);
// Column cannot have more than one dictionary
ASSERT_THROW(reader_->HasNext(), ParquetException);
pages_.clear();
data_page = MakeDataPage<Int32Type>(&descr, {}, 0, Encoding::DELTA_BYTE_ARRAY, {}, 0,
{}, 0, {}, 0);
pages_.push_back(data_page);
InitReader(&descr);
// unsupported encoding
ASSERT_THROW(reader_->HasNext(), ParquetException);
pages_.clear();
}
TEST_F(TestPrimitiveReader, TestDictionaryEncodedPagesWithExposeEncoding) {
max_def_level_ = 0;
max_rep_level_ = 0;
int levels_per_page = 100;
int num_pages = 5;
std::vector<int16_t> def_levels;
std::vector<int16_t> rep_levels;
std::vector<ByteArray> values;
std::vector<uint8_t> buffer;
NodePtr type = schema::ByteArray("a", Repetition::REQUIRED);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
// Fully dictionary encoded
MakePages<ByteArrayType>(&descr, num_pages, levels_per_page, def_levels, rep_levels,
values, buffer, pages_, Encoding::RLE_DICTIONARY);
InitReader(&descr);
auto reader = static_cast<ByteArrayReader*>(reader_.get());
const ByteArray* dict = nullptr;
int32_t dict_len = 0;
int64_t total_indices = 0;
int64_t indices_read = 0;
int64_t value_size = values.size();
auto indices = std::make_unique<int32_t[]>(value_size);
while (total_indices < value_size && reader->HasNext()) {
const ByteArray* tmp_dict = nullptr;
int32_t tmp_dict_len = 0;
EXPECT_NO_THROW(reader->ReadBatchWithDictionary(
value_size, /*def_levels=*/nullptr,
/*rep_levels=*/nullptr, indices.get() + total_indices, &indices_read, &tmp_dict,
&tmp_dict_len));
if (tmp_dict != nullptr) {
// Dictionary is read along with data
EXPECT_GT(indices_read, 0);
dict = tmp_dict;
dict_len = tmp_dict_len;
} else {
// Dictionary is not read when there's no data
EXPECT_EQ(indices_read, 0);
}
total_indices += indices_read;
}
EXPECT_EQ(total_indices, value_size);
for (int64_t i = 0; i < total_indices; ++i) {
EXPECT_LT(indices[i], dict_len);
EXPECT_EQ(dict[indices[i]].len, values[i].len);
EXPECT_EQ(memcmp(dict[indices[i]].ptr, values[i].ptr, values[i].len), 0);
}
pages_.clear();
}
TEST_F(TestPrimitiveReader, TestNonDictionaryEncodedPagesWithExposeEncoding) {
max_def_level_ = 0;
max_rep_level_ = 0;
int64_t value_size = 100;
std::vector<int32_t> values(value_size, 0);
NodePtr type = schema::Int32("a", Repetition::REQUIRED);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
// The data page falls back to plain encoding
std::shared_ptr<ResizableBuffer> dummy = AllocateBuffer();
std::shared_ptr<DictionaryPage> dict_page =
std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN);
std::shared_ptr<DataPageV1> data_page = MakeDataPage<Int32Type>(
&descr, values, static_cast<int>(value_size), Encoding::PLAIN, /*indices=*/{},
/*indices_size=*/0, /*def_levels=*/{}, /*max_def_level=*/0, /*rep_levels=*/{},
/*max_rep_level=*/0);
pages_.push_back(dict_page);
pages_.push_back(data_page);
InitReader(&descr);
auto reader = static_cast<ByteArrayReader*>(reader_.get());
const ByteArray* dict = nullptr;
int32_t dict_len = 0;
int64_t indices_read = 0;
auto indices = std::make_unique<int32_t[]>(value_size);
// Dictionary cannot be exposed when it's not fully dictionary encoded
EXPECT_THROW(reader->ReadBatchWithDictionary(value_size, /*def_levels=*/nullptr,
/*rep_levels=*/nullptr, indices.get(),
&indices_read, &dict, &dict_len),
ParquetException);
pages_.clear();
}
namespace {
internal::LevelInfo ComputeLevelInfo(const ColumnDescriptor* descr) {
internal::LevelInfo level_info;
level_info.def_level = descr->max_definition_level();
level_info.rep_level = descr->max_repetition_level();
int16_t min_spaced_def_level = descr->max_definition_level();
const ::parquet::schema::Node* node = descr->schema_node().get();
while (node != nullptr && !node->is_repeated()) {
if (node->is_optional()) {
min_spaced_def_level--;
}
node = node->parent();
}
level_info.repeated_ancestor_def_level = min_spaced_def_level;
return level_info;
}
} // namespace
using ReadDenseForNullable = bool;
class RecordReaderPrimitiveTypeTest
: public ::testing::TestWithParam<ReadDenseForNullable> {
public:
const int32_t kNullValue = -1;
void Init(NodePtr column) {
NodePtr root = GroupNode::Make("root", Repetition::REQUIRED, {column});
schema_descriptor_.Init(root);
descr_ = schema_descriptor_.Column(0);
record_reader_ = internal::RecordReader::Make(descr_, ComputeLevelInfo(descr_),
::arrow::default_memory_pool(),
/*read_dictionary=*/false, GetParam());
}
void CheckReadValues(std::vector<int32_t> expected_values,
std::vector<int16_t> expected_defs,
std::vector<int16_t> expected_reps) {
const auto read_values = reinterpret_cast<const int32_t*>(record_reader_->values());
std::vector<int32_t> read_vals(read_values,
read_values + record_reader_->values_written());
ASSERT_EQ(read_vals.size(), expected_values.size());
for (size_t i = 0; i < expected_values.size(); ++i) {
if (expected_values[i] != kNullValue) {
ASSERT_EQ(expected_values[i], read_values[i]);
}
}
if (!descr_->schema_node()->is_required()) {
std::vector<int16_t> read_defs(
record_reader_->def_levels(),
record_reader_->def_levels() + record_reader_->levels_position());
ASSERT_TRUE(vector_equal(expected_defs, read_defs));
}
if (descr_->schema_node()->is_repeated()) {
std::vector<int16_t> read_reps(
record_reader_->rep_levels(),
record_reader_->rep_levels() + record_reader_->levels_position());
ASSERT_TRUE(vector_equal(expected_reps, read_reps));
}
}
void CheckState(int64_t values_written, int64_t null_count, int64_t levels_written,
int64_t levels_position) {
ASSERT_EQ(record_reader_->values_written(), values_written);
ASSERT_EQ(record_reader_->null_count(), null_count);
ASSERT_EQ(record_reader_->levels_written(), levels_written);
ASSERT_EQ(record_reader_->levels_position(), levels_position);
}
protected:
SchemaDescriptor schema_descriptor_;
std::shared_ptr<internal::RecordReader> record_reader_;
const ColumnDescriptor* descr_;
};
// Tests reading a required field. The expected results are the same for
// reading dense and spaced.
TEST_P(RecordReaderPrimitiveTypeTest, ReadRequired) {
Init(schema::Int32("b", Repetition::REQUIRED));
// Records look like: {10, 20, 20, 30, 30, 30}
std::vector<std::shared_ptr<Page>> pages;
std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
std::vector<int16_t> def_levels = {};
std::vector<int16_t> rep_levels = {};
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
descr_, values, /*num_values=*/static_cast<int>(def_levels.size()), Encoding::PLAIN,
/*indices=*/{},
/*indices_size=*/0, def_levels, descr_->max_definition_level(), rep_levels,
descr_->max_repetition_level());
pages.push_back(std::move(page));
auto pager = std::make_unique<MockPageReader>(pages);
record_reader_->SetPageReader(std::move(pager));
// Read [10]
int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1);
ASSERT_EQ(records_read, 1);
CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/0,
/*levels_position=*/0);
CheckReadValues(/*expected_values=*/{10}, /*expected_defs=*/{},
/*expected_reps=*/{});
record_reader_->Reset();
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0,
/*levels_position=*/0);
// Read 20, 20, 30, 30, 30
records_read = record_reader_->ReadRecords(/*num_records=*/10);
ASSERT_EQ(records_read, 5);
CheckState(/*values_written=*/5, /*null_count=*/0, /*levels_written=*/0,
/*levels_position=*/0);
CheckReadValues(/*expected_values=*/{20, 20, 30, 30, 30},
/*expected_defs=*/{},
/*expected_reps=*/{});
record_reader_->Reset();
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0,
/*levels_position=*/0);
}
// Tests reading an optional field.
// Use a max definition field > 1 to test both cases where parent is present or
// parent is missing.
TEST_P(RecordReaderPrimitiveTypeTest, ReadOptional) {
NodePtr column = GroupNode::Make(
"a", Repetition::OPTIONAL,
{PrimitiveNode::Make("element", Repetition::OPTIONAL, ParquetType::INT32)});
Init(column);
// Records look like: {10, null, 20, 20, null, 30, 30, 30, null}
std::vector<std::shared_ptr<Page>> pages;
std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
std::vector<int16_t> def_levels = {2, 0, 2, 2, 1, 2, 2, 2, 0};
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
descr_, values, /*num_values=*/static_cast<int>(def_levels.size()), Encoding::PLAIN,
/*indices=*/{},
/*indices_size=*/0, def_levels, descr_->max_definition_level(), /*rep_levels=*/{},
descr_->max_repetition_level());
pages.push_back(std::move(page));
auto pager = std::make_unique<MockPageReader>(pages);
record_reader_->SetPageReader(std::move(pager));
// Read 10, null
int64_t records_read = record_reader_->ReadRecords(/*num_records=*/2);
ASSERT_EQ(records_read, 2);
if (GetParam() == /*read_dense_for_nullable=*/true) {
CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/9,
/*levels_position=*/2);
CheckReadValues(/*expected_values=*/{10}, /*expected_defs=*/{2, 0},
/*expected_reps=*/{});
} else {
CheckState(/*values_written=*/2, /*null_count=*/1, /*levels_written=*/9,
/*levels_position=*/2);
CheckReadValues(/*expected_values=*/{10, kNullValue}, /*expected_defs=*/{2, 0},
/*expected_reps=*/{});
}
record_reader_->Reset();
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/7,
/*levels_position=*/0);
// Read 20, 20, null (parent present), 30, 30, 30
records_read = record_reader_->ReadRecords(/*num_records=*/6);
ASSERT_EQ(records_read, 6);
if (GetParam() == /*read_dense_for_nullable=*/true) {
CheckState(/*values_written=*/5, /*null_count=*/0, /*levels_written=*/7,
/*levels_position=*/6);
CheckReadValues(/*expected_values=*/{20, 20, 30, 30, 30},
/*expected_defs=*/{2, 2, 1, 2, 2, 2},
/*expected_reps=*/{});
} else {
CheckState(/*values_written=*/6, /*null_count=*/1, /*levels_written=*/7,
/*levels_position=*/6);
CheckReadValues(/*expected_values=*/{20, 20, kNullValue, 30, 30, 30},
/*expected_defs=*/{2, 2, 1, 2, 2, 2},
/*expected_reps=*/{});
}
record_reader_->Reset();
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/1,
/*levels_position=*/0);
// Read the last null value and read past the end.
records_read = record_reader_->ReadRecords(/*num_records=*/3);
ASSERT_EQ(records_read, 1);
if (GetParam() == /*read_dense_for_nullable=*/true) {
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/1,
/*levels_position=*/1);
CheckReadValues(/*expected_values=*/{},
/*expected_defs=*/{0},
/*expected_reps=*/{});
} else {
CheckState(/*values_written=*/1, /*null_count=*/1, /*levels_written=*/1,
/*levels_position=*/1);
CheckReadValues(/*expected_values=*/{kNullValue},
/*expected_defs=*/{0},
/*expected_reps=*/{});
}
record_reader_->Reset();
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0,
/*levels_position=*/0);
}
// Tests reading a required repeated field. The results are the same for reading
// dense or spaced.
TEST_P(RecordReaderPrimitiveTypeTest, ReadRequiredRepeated) {
NodePtr column = GroupNode::Make(
"p", Repetition::REQUIRED,
{GroupNode::Make(
"list", Repetition::REPEATED,
{PrimitiveNode::Make("element", Repetition::REQUIRED, ParquetType::INT32)})});
Init(column);
// Records look like: {[10], [20, 20], [30, 30, 30]}
std::vector<std::shared_ptr<Page>> pages;
std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
std::vector<int16_t> def_levels = {1, 1, 1, 1, 1, 1};
std::vector<int16_t> rep_levels = {0, 0, 1, 0, 1, 1};
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
descr_, values, /*num_values=*/static_cast<int>(def_levels.size()), Encoding::PLAIN,
/*indices=*/{},
/*indices_size=*/0, def_levels, descr_->max_definition_level(), rep_levels,
descr_->max_repetition_level());
pages.push_back(std::move(page));
auto pager = std::make_unique<MockPageReader>(pages);
record_reader_->SetPageReader(std::move(pager));
// Read [10]
int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1);
ASSERT_EQ(records_read, 1);
CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/6,
/*levels_position=*/1);
CheckReadValues(/*expected_values=*/{10}, /*expected_defs=*/{1},
/*expected_reps=*/{0});
record_reader_->Reset();
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/5,
/*levels_position=*/0);
// Read [20, 20], [30, 30, 30]
records_read = record_reader_->ReadRecords(/*num_records=*/3);
ASSERT_EQ(records_read, 2);
CheckState(/*values_written=*/5, /*null_count=*/0, /*levels_written=*/5,
/*levels_position=*/5);
CheckReadValues(/*expected_values=*/{20, 20, 30, 30, 30},
/*expected_defs=*/{1, 1, 1, 1, 1},
/*expected_reps=*/{0, 1, 0, 1, 1});
record_reader_->Reset();
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0,
/*levels_position=*/0);
}
// Tests reading a nullable repeated field. Tests reading null values at
// different levels and reading an empty list.
TEST_P(RecordReaderPrimitiveTypeTest, ReadNullableRepeated) {
NodePtr column = GroupNode::Make(
"p", Repetition::OPTIONAL,
{GroupNode::Make(
"list", Repetition::REPEATED,
{PrimitiveNode::Make("element", Repetition::OPTIONAL, ParquetType::INT32)})});
Init(column);
// Records look like: {[10], null, [20, 20], [], [30, 30, null, 30]}
// Some explanation regarding the behavior. When reading spaced, for an empty list or
// for a top-level null, we do not leave a space and we do not count it towards
// null_count. For a leaf-level null, we leave a space for it and we count it towards
// null_count. When reading dense, null_count is always 0, and we do not leave any space
// for values.
std::vector<std::shared_ptr<Page>> pages;
std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
std::vector<int16_t> def_levels = {3, 0, 3, 3, 1, 3, 3, 2, 3};
std::vector<int16_t> rep_levels = {0, 0, 0, 1, 0, 0, 1, 1, 1};
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
descr_, values, /*num_values=*/static_cast<int>(def_levels.size()), Encoding::PLAIN,
/*indices=*/{},
/*indices_size=*/0, def_levels, descr_->max_definition_level(), rep_levels,
descr_->max_repetition_level());
pages.push_back(std::move(page));
auto pager = std::make_unique<MockPageReader>(pages);
record_reader_->SetPageReader(std::move(pager));
// Test reading 0 records.
int64_t records_read = record_reader_->ReadRecords(/*num_records=*/0);
ASSERT_EQ(records_read, 0);
// Test the descr() accessor.
ASSERT_EQ(record_reader_->descr()->max_definition_level(), 3);
// Read [10], null
// We do not read this null for both reading dense and spaced.
records_read = record_reader_->ReadRecords(/*num_records=*/2);
ASSERT_EQ(records_read, 2);
if (GetParam() == /*read_dense_for_nullable=*/true) {
CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/9,
/*levels_position=*/2);
CheckReadValues(/*expected_values=*/{10}, /*expected_defs=*/{3, 0},
/*expected_reps=*/{0, 0});
} else {
CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/9,
/*levels_position=*/2);
CheckReadValues(/*expected_values=*/{10}, /*expected_defs=*/{3, 0},
/*expected_reps=*/{0, 0});
}
record_reader_->Reset();
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/7,
/*levels_position=*/0);
// Read [20, 20], []
// We do not read any value for this, it will be counted towards null count
// when reading spaced.
records_read = record_reader_->ReadRecords(/*num_records=*/2);
ASSERT_EQ(records_read, 2);
if (GetParam() == /*read_dense_for_nullable=*/true) {
CheckState(/*values_written=*/2, /*null_count=*/0, /*levels_written=*/7,
/*levels_position=*/3);
CheckReadValues(/*expected_values=*/{20, 20},
/*expected_defs=*/{3, 3, 1},
/*expected_reps=*/{0, 1, 0});
} else {
CheckState(/*values_written=*/2, /*null_count=*/0, /*levels_written=*/7,
/*levels_position=*/3);
CheckReadValues(/*expected_values=*/{20, 20},
/*expected_defs=*/{3, 3, 1},
/*expected_reps=*/{0, 1, 0});
}
record_reader_->Reset();
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/4,
/*levels_position=*/0);
// Test reading 0 records.
records_read = record_reader_->ReadRecords(/*num_records=*/0);
ASSERT_EQ(records_read, 0);
// Read the last record.
records_read = record_reader_->ReadRecords(/*num_records=*/1);
ASSERT_EQ(records_read, 1);
if (GetParam() == /*read_dense_for_nullable=*/true) {
CheckState(/*values_written=*/3, /*null_count=*/0, /*levels_written=*/4,
/*levels_position=*/4);
CheckReadValues(/*expected_values=*/{30, 30, 30},
/*expected_defs=*/{3, 3, 2, 3},
/*expected_reps=*/{0, 1, 1, 1});
} else {
CheckState(/*values_written=*/4, /*null_count=*/1, /*levels_written=*/4,
/*levels_position=*/4);
CheckReadValues(/*expected_values=*/{30, 30, kNullValue, 30},
/*expected_defs=*/{3, 3, 2, 3},
/*expected_reps=*/{0, 1, 1, 1});
}
record_reader_->Reset();
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0,
/*levels_position=*/0);
}
// Test that we can skip required top level field.
TEST_P(RecordReaderPrimitiveTypeTest, SkipRequiredTopLevel) {
Init(schema::Int32("b", Repetition::REQUIRED));
std::vector<std::shared_ptr<Page>> pages;
std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
descr_, values, /*num_values=*/static_cast<int>(values.size()), Encoding::PLAIN,
/*indices=*/{},
/*indices_size=*/0, /*def_levels=*/{}, descr_->max_definition_level(),
/*rep_levels=*/{}, descr_->max_repetition_level());
pages.push_back(std::move(page));
auto pager = std::make_unique<MockPageReader>(pages);
record_reader_->SetPageReader(std::move(pager));
int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/3);
ASSERT_EQ(records_skipped, 3);
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0,
/*levels_position=*/0);
int64_t records_read = record_reader_->ReadRecords(/*num_records=*/2);
ASSERT_EQ(records_read, 2);
CheckState(/*values_written=*/2, /*null_count=*/0, /*levels_written=*/0,
/*levels_position=*/0);
CheckReadValues(/*expected_values=*/{30, 30}, /*expected_defs=*/{},
/*expected_reps=*/{});
record_reader_->Reset();
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0,
/*levels_position=*/0);
}
// Skip an optional field. Intentionally included some null values.
TEST_P(RecordReaderPrimitiveTypeTest, SkipOptional) {
Init(schema::Int32("b", Repetition::OPTIONAL));
// Records look like {null, 10, 20, 30, null, 40, 50, 60}
std::vector<std::shared_ptr<Page>> pages;
std::vector<int32_t> values = {10, 20, 30, 40, 50, 60};
std::vector<int16_t> def_levels = {0, 1, 1, 0, 1, 1, 1, 1};
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
descr_, values, /*num_values=*/static_cast<int>(values.size()), Encoding::PLAIN,
/*indices=*/{},
/*indices_size=*/0, def_levels, descr_->max_definition_level(),
/*rep_levels=*/{}, descr_->max_repetition_level());
pages.push_back(std::move(page));
auto pager = std::make_unique<MockPageReader>(pages);
record_reader_->SetPageReader(std::move(pager));
{
// Skip {null, 10}
// This also tests when we start with a Skip.
int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/2);
ASSERT_EQ(records_skipped, 2);
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0,
/*levels_position=*/0);
}
{
// Read 3 records: {20, null, 30}
int64_t records_read = record_reader_->ReadRecords(/*num_records=*/3);
ASSERT_EQ(records_read, 3);
if (GetParam() == /*read_dense_for_nullable=*/true) {
// We had skipped 2 of the levels above. So there is only 6 left in total to
// read, and we read 3 of them here.
CheckState(/*values_written=*/2, /*null_count=*/0, /*levels_written=*/6,
/*levels_position=*/3);
CheckReadValues(/*expected_values=*/{20, 30}, /*expected_defs=*/{1, 0, 1},
/*expected_reps=*/{});
} else {
CheckState(/*values_written=*/3, /*null_count=*/1, /*levels_written=*/6,
/*levels_position=*/3);
CheckReadValues(/*expected_values=*/{20, kNullValue, 30},
/*expected_defs=*/{1, 0, 1},
/*expected_reps=*/{});
}
record_reader_->Reset();
}
{
// Skip {40, 50}.
int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/2);
ASSERT_EQ(records_skipped, 2);
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/1,
/*levels_position=*/0);
CheckReadValues(/*expected_values=*/{}, /*expected_defs=*/{},
/*expected_reps=*/{});
// Try reset after a Skip.
record_reader_->Reset();
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/1,
/*levels_position=*/0);
}
{
// Read to the end of the column. Read {60}
// This test checks that ReadAndThrowAwayValues works, since if it
// does not we would read the wrong values.
int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1);
ASSERT_EQ(records_read, 1);
CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/1,
/*levels_position=*/1);
CheckReadValues(/*expected_values=*/{60},
/*expected_defs=*/{1},
/*expected_reps=*/{});
}
// We have exhausted all the records.
ASSERT_EQ(record_reader_->ReadRecords(/*num_records=*/3), 0);
ASSERT_EQ(record_reader_->SkipRecords(/*num_records=*/3), 0);
}
// Test skipping for repeated fields.
TEST_P(RecordReaderPrimitiveTypeTest, SkipRepeated) {
Init(schema::Int32("b", Repetition::REPEATED));
// Records look like {null, [20, 20, 20], null, [30, 30], [40]}
std::vector<std::shared_ptr<Page>> pages;
std::vector<int32_t> values = {20, 20, 20, 30, 30, 40};
std::vector<int16_t> def_levels = {0, 1, 1, 1, 0, 1, 1, 1};
std::vector<int16_t> rep_levels = {0, 0, 1, 1, 0, 0, 1, 0};
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
descr_, values, /*num_values=*/static_cast<int>(values.size()), Encoding::PLAIN,
/*indices=*/{},
/*indices_size=*/0, def_levels, descr_->max_definition_level(), rep_levels,
descr_->max_repetition_level());
pages.push_back(std::move(page));
auto pager = std::make_unique<MockPageReader>(pages);
record_reader_->SetPageReader(std::move(pager));
{
// Skip 0 records.
int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/0);
ASSERT_EQ(records_skipped, 0);
}
{
// This should skip the first null record.
int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/1);
ASSERT_EQ(records_skipped, 1);
ASSERT_EQ(record_reader_->values_written(), 0);
ASSERT_EQ(record_reader_->null_count(), 0);
// For repeated fields, we need to read the levels to find the record
// boundaries and skip. So some levels are read, however, the skipped
// level should not be there after the skip. That's why levels_position()
// is 0.
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/7,
/*levels_position=*/0);
CheckReadValues(/*expected_values=*/{},
/*expected_defs=*/{},
/*expected_reps=*/{});
}
{
// Read [20, 20, 20]
int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1);
ASSERT_EQ(records_read, 1);
CheckState(/*values_written=*/3, /*null_count=*/0, /*levels_written=*/7,
/*levels_position=*/3);
CheckReadValues(/*expected_values=*/{20, 20, 20},
/*expected_defs=*/{1, 1, 1},
/*expected_reps=*/{0, 1, 1});
}
{
// Skip 0 records.
int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/0);
ASSERT_EQ(records_skipped, 0);
}
{
// Skip the null record and also skip [30, 30]
int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/2);
ASSERT_EQ(records_skipped, 2);
// We remove the skipped levels from the buffer.
CheckState(/*values_written=*/3, /*null_count=*/0, /*levels_written=*/4,
/*levels_position=*/3);
CheckReadValues(/*expected_values=*/{20, 20, 20},
/*expected_defs=*/{1, 1, 1},
/*expected_reps=*/{0, 1, 1});
}
{
// Read [40]
int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1);
ASSERT_EQ(records_read, 1);
CheckState(/*values_written=*/4, /*null_count=*/0, /*levels_written=*/4,
/*levels_position=*/4);
CheckReadValues(/*expected_values=*/{20, 20, 20, 40},
/*expected_defs=*/{1, 1, 1, 1},
/*expected_reps=*/{0, 1, 1, 0});
}
}
// Tests that for repeated fields, we first consume what is in the buffer
// before reading more levels.
TEST_P(RecordReaderPrimitiveTypeTest, SkipRepeatedConsumeBufferFirst) {
Init(schema::Int32("b", Repetition::REPEATED));
std::vector<std::shared_ptr<Page>> pages;
std::vector<int32_t> values(2048, 10);
std::vector<int16_t> def_levels(2048, 1);
std::vector<int16_t> rep_levels(2048, 0);
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
descr_, values, /*num_values=*/static_cast<int>(values.size()), Encoding::PLAIN,
/*indices=*/{},
/*indices_size=*/0, def_levels, descr_->max_definition_level(), rep_levels,
descr_->max_repetition_level());
pages.push_back(std::move(page));
auto pager = std::make_unique<MockPageReader>(pages);
record_reader_->SetPageReader(std::move(pager));
{
// Read 1000 records. We will read 1024 levels because that is the minimum
// number of levels to read.
int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1000);
ASSERT_EQ(records_read, 1000);
CheckState(/*values_written=*/1000, /*null_count=*/0, /*levels_written=*/1024,
/*levels_position=*/1000);
std::vector<int32_t> expected_values(1000, 10);
std::vector<int16_t> expected_def_levels(1000, 1);
std::vector<int16_t> expected_rep_levels(1000, 0);
CheckReadValues(expected_values, expected_def_levels, expected_rep_levels);
// Reset removes the already consumed values and levels.
record_reader_->Reset();
}
{ // Skip 12 records. Since we already have 24 in the buffer, we should not be
// reading any more levels into the buffer, we will just consume 12 of it.
int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/12);
ASSERT_EQ(records_skipped, 12);
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/12,
/*levels_position=*/0);
// Everything is empty because we reset the reader before this skip.
CheckReadValues(/*expected_values=*/{}, /*expected_def_levels=*/{},
/*expected_rep_levels=*/{});
}
}
// Test reading when one record spans multiple pages for a repeated field.
TEST_P(RecordReaderPrimitiveTypeTest, ReadPartialRecord) {
Init(schema::Int32("b", Repetition::REPEATED));
std::vector<std::shared_ptr<Page>> pages;
// Page 1: {[10], [20, 20, 20 ... } continues to next page.
{
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
descr_, /*values=*/{10, 20, 20, 20}, /*num_values=*/4, Encoding::PLAIN,
/*indices=*/{},
/*indices_size=*/0, /*def_levels=*/{1, 1, 1, 1}, descr_->max_definition_level(),
/*rep_levels=*/{0, 0, 1, 1}, descr_->max_repetition_level());
pages.push_back(std::move(page));
}
// Page 2: {... 20, 20, ...} continues from previous page and to next page.
{
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
descr_, /*values=*/{20, 20}, /*num_values=*/2, Encoding::PLAIN,
/*indices=*/{},
/*indices_size=*/0, /*def_levels=*/{1, 1}, descr_->max_definition_level(),
/*rep_levels=*/{1, 1}, descr_->max_repetition_level());
pages.push_back(std::move(page));
}
// Page 3: { ... 20], [30]} continues from previous page.
{
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
descr_, /*values=*/{20, 30}, /*num_values=*/2, Encoding::PLAIN,
/*indices=*/{},
/*indices_size=*/0, /*def_levels=*/{1, 1}, descr_->max_definition_level(),
/*rep_levels=*/{1, 0}, descr_->max_repetition_level());
pages.push_back(std::move(page));
}
auto pager = std::make_unique<MockPageReader>(pages);
record_reader_->SetPageReader(std::move(pager));
{
// Read [10]
int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1);
ASSERT_EQ(records_read, 1);
CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/4,
/*levels_position=*/1);
CheckReadValues(/*expected_values=*/{10},
/*expected_defs=*/{1},
/*expected_reps=*/{0});
}
{
// Read [20, 20, 20, 20, 20, 20] that spans multiple pages.
int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1);
ASSERT_EQ(records_read, 1);
CheckState(/*values_written=*/7, /*null_count=*/0, /*levels_written=*/8,
/*levels_position=*/7);
CheckReadValues(/*expected_values=*/{10, 20, 20, 20, 20, 20, 20},
/*expected_defs=*/{1, 1, 1, 1, 1, 1, 1},
/*expected_reps=*/{0, 0, 1, 1, 1, 1, 1});
}
{
// Read [30]
int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1);
ASSERT_EQ(records_read, 1);
CheckState(/*values_written=*/8, /*null_count=*/0, /*levels_written=*/8,
/*levels_position=*/8);
CheckReadValues(/*expected_values=*/{10, 20, 20, 20, 20, 20, 20, 30},
/*expected_defs=*/{1, 1, 1, 1, 1, 1, 1, 1},
/*expected_reps=*/{0, 0, 1, 1, 1, 1, 1, 0});
}
}
// Test skipping for repeated fields for the case when one record spans multiple
// pages.
TEST_P(RecordReaderPrimitiveTypeTest, SkipPartialRecord) {
Init(schema::Int32("b", Repetition::REPEATED));
std::vector<std::shared_ptr<Page>> pages;
// Page 1: {[10], [20, 20, 20 ... } continues to next page.
{
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
descr_, /*values=*/{10, 20, 20, 20}, /*num_values=*/4, Encoding::PLAIN,
/*indices=*/{},
/*indices_size=*/0, /*def_levels=*/{1, 1, 1, 1}, descr_->max_definition_level(),
/*rep_levels=*/{0, 0, 1, 1}, descr_->max_repetition_level());
pages.push_back(std::move(page));
}
// Page 2: {... 20, 20, ...} continues from previous page and to next page.
{
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
descr_, /*values=*/{20, 20}, /*num_values=*/2, Encoding::PLAIN,
/*indices=*/{},
/*indices_size=*/0, /*def_levels=*/{1, 1}, descr_->max_definition_level(),
/*rep_levels=*/{1, 1}, descr_->max_repetition_level());
pages.push_back(std::move(page));
}
// Page 3: { ... 20, [30]} continues from previous page.
{
std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
descr_, /*values=*/{20, 30}, /*num_values=*/2, Encoding::PLAIN,
/*indices=*/{},
/*indices_size=*/0, /*def_levels=*/{1, 1}, descr_->max_definition_level(),
/*rep_levels=*/{1, 0}, descr_->max_repetition_level());
pages.push_back(std::move(page));
}
auto pager = std::make_unique<MockPageReader>(pages);
record_reader_->SetPageReader(std::move(pager));
{
// Read [10]
int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1);
ASSERT_EQ(records_read, 1);
// There are 4 levels in the first page.
CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/4,
/*levels_position=*/1);
CheckReadValues(/*expected_values=*/{10},
/*expected_defs=*/{1},
/*expected_reps=*/{0});
}
{
// Skip the record that goes across pages.
int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/1);
ASSERT_EQ(records_skipped, 1);
CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/2,
/*levels_position=*/1);
CheckReadValues(/*expected_values=*/{10},
/*expected_defs=*/{1},
/*expected_reps=*/{0});
}
{
// Read [30]
int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1);
ASSERT_EQ(records_read, 1);
CheckState(/*values_written=*/2, /*null_count=*/0, /*levels_written=*/2,
/*levels_position=*/2);
CheckReadValues(/*expected_values=*/{10, 30},
/*expected_defs=*/{1, 1},
/*expected_reps=*/{0, 0});
}
}
INSTANTIATE_TEST_SUITE_P(RecordReaderPrimitiveTypeTests, RecordReaderPrimitiveTypeTest,
::testing::Values(/*read_dense_for_nullable=*/true, false),
testing::PrintToStringParamName());
// Parameterized test for FLBA record reader.
class FLBARecordReaderTest : public ::testing::TestWithParam<bool> {
public:
bool read_dense_for_nullable() { return GetParam(); }
void MakeRecordReader(int levels_per_page, int num_pages, int FLBA_type_length) {
levels_per_page_ = levels_per_page;
FLBA_type_length_ = FLBA_type_length;
internal::LevelInfo level_info;
level_info.def_level = 1;
level_info.rep_level = 0;
NodePtr type = ::parquet::schema::PrimitiveNode::Make(
"b", Repetition::OPTIONAL, Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::NONE,
FLBA_type_length_);
descr_ = std::make_unique<ColumnDescriptor>(type, level_info.def_level,
level_info.rep_level);
MakePages<FLBAType>(descr_.get(), num_pages, levels_per_page, def_levels_,
rep_levels_, values_, buffer_, pages_, Encoding::PLAIN);
auto pager = std::make_unique<MockPageReader>(pages_);
record_reader_ = internal::RecordReader::Make(
descr_.get(), level_info, ::arrow::default_memory_pool(),
/*read_dictionary=*/false, read_dense_for_nullable());
record_reader_->SetPageReader(std::move(pager));
}
// Returns expected values in row range.
// We need this since some values are null.
std::vector<std::string_view> expected_values(int start, int end) {
std::vector<std::string_view> result;
// Find out where in the values_ vector we start from.
size_t values_index = 0;
for (int i = 0; i < start; ++i) {
if (def_levels_[i] != 0) {
++values_index;
}
}
for (int i = start; i < end; ++i) {
if (def_levels_[i] == 0) {
if (!read_dense_for_nullable()) {
result.emplace_back();
}
continue;
}
result.emplace_back(reinterpret_cast<const char*>(values_[values_index].ptr),
FLBA_type_length_);
++values_index;
}
return result;
}
void CheckReadValues(int start, int end) {
auto binary_reader = dynamic_cast<BinaryRecordReader*>(record_reader_.get());
ASSERT_NE(binary_reader, nullptr);
// Chunks are reset after this call.
::arrow::ArrayVector array_vector = binary_reader->GetBuilderChunks();
ASSERT_EQ(array_vector.size(), 1);
auto binary_array =
dynamic_cast<::arrow::FixedSizeBinaryArray*>(array_vector[0].get());
ASSERT_NE(binary_array, nullptr);
ASSERT_EQ(binary_array->length(), record_reader_->values_written());
if (read_dense_for_nullable()) {
ASSERT_EQ(binary_array->null_count(), 0);
ASSERT_EQ(record_reader_->null_count(), 0);
} else {
ASSERT_EQ(binary_array->null_count(), record_reader_->null_count());
}
std::vector<std::string_view> expected = expected_values(start, end);
for (size_t i = 0; i < expected.size(); ++i) {
if (def_levels_[i + start] == 0) {
ASSERT_EQ(!read_dense_for_nullable(), binary_array->IsNull(i));
} else {
ASSERT_EQ(expected[i].compare(binary_array->GetView(i)), 0);
ASSERT_FALSE(binary_array->IsNull(i));
}
}
}
protected:
std::shared_ptr<internal::RecordReader> record_reader_;
private:
int levels_per_page_;
int FLBA_type_length_;
std::vector<std::shared_ptr<Page>> pages_;
std::vector<int16_t> def_levels_;
std::vector<int16_t> rep_levels_;
std::vector<FixedLenByteArray> values_;
std::vector<uint8_t> buffer_;
std::unique_ptr<ColumnDescriptor> descr_;
};
// Similar to above, except for Byte arrays. FLBA and Byte arrays are
// sufficiently different to warrant a separate class for readability.
class ByteArrayRecordReaderTest : public ::testing::TestWithParam<bool> {
public:
bool read_dense_for_nullable() { return GetParam(); }
void MakeRecordReader(int levels_per_page, int num_pages) {
levels_per_page_ = levels_per_page;
internal::LevelInfo level_info;
level_info.def_level = 1;
level_info.rep_level = 0;
NodePtr type = schema::ByteArray("b", Repetition::OPTIONAL);
descr_ = std::make_unique<ColumnDescriptor>(type, level_info.def_level,
level_info.rep_level);
MakePages<ByteArrayType>(descr_.get(), num_pages, levels_per_page, def_levels_,
rep_levels_, values_, buffer_, pages_, Encoding::PLAIN);
auto pager = std::make_unique<MockPageReader>(pages_);
record_reader_ = internal::RecordReader::Make(
descr_.get(), level_info, ::arrow::default_memory_pool(),
/*read_dictionary=*/false, read_dense_for_nullable());
record_reader_->SetPageReader(std::move(pager));
}
// Returns expected values in row range.
// We need this since some values are null.
std::vector<std::string_view> expected_values(int start, int end) {
std::vector<std::string_view> result;
// Find out where in the values_ vector we start from.
size_t values_index = 0;
for (int i = 0; i < start; ++i) {
if (def_levels_[i] != 0) {
++values_index;
}
}
for (int i = start; i < end; ++i) {
if (def_levels_[i] == 0) {
if (!read_dense_for_nullable()) {
result.emplace_back();
}
continue;
}
result.emplace_back(reinterpret_cast<const char*>(values_[values_index].ptr),
values_[values_index].len);
++values_index;
}
return result;
}
void CheckReadValues(int start, int end) {
auto binary_reader = dynamic_cast<BinaryRecordReader*>(record_reader_.get());
ASSERT_NE(binary_reader, nullptr);
// Chunks are reset after this call.
::arrow::ArrayVector array_vector = binary_reader->GetBuilderChunks();
ASSERT_EQ(array_vector.size(), 1);
::arrow::BinaryArray* binary_array =
dynamic_cast<::arrow::BinaryArray*>(array_vector[0].get());
ASSERT_NE(binary_array, nullptr);
ASSERT_EQ(binary_array->length(), record_reader_->values_written());
if (read_dense_for_nullable()) {
ASSERT_EQ(binary_array->null_count(), 0);
ASSERT_EQ(record_reader_->null_count(), 0);
} else {
ASSERT_EQ(binary_array->null_count(), record_reader_->null_count());
}
std::vector<std::string_view> expected = expected_values(start, end);
for (size_t i = 0; i < expected.size(); ++i) {
if (def_levels_[i + start] == 0) {
ASSERT_EQ(!read_dense_for_nullable(), binary_array->IsNull(i));
} else {
ASSERT_EQ(expected[i].compare(binary_array->GetView(i)), 0);
ASSERT_FALSE(binary_array->IsNull(i));
}
}
}
protected:
std::shared_ptr<internal::RecordReader> record_reader_;
private:
int levels_per_page_;
std::vector<std::shared_ptr<Page>> pages_;
std::vector<int16_t> def_levels_;
std::vector<int16_t> rep_levels_;
std::vector<ByteArray> values_;
std::vector<uint8_t> buffer_;
std::unique_ptr<ColumnDescriptor> descr_;
};
// Tests reading and skipping a ByteArray field.
// The binary readers only differ in DecodeDense and DecodeSpaced functions, so
// testing optional is sufficient in exercising those code paths.
TEST_P(ByteArrayRecordReaderTest, ReadAndSkipOptional) {
MakeRecordReader(/*levels_per_page=*/90, /*num_pages=*/1);
// Read one-third of the page.
ASSERT_EQ(record_reader_->ReadRecords(/*num_records=*/30), 30);
CheckReadValues(0, 30);
record_reader_->Reset();
// Skip 30 records.
ASSERT_EQ(record_reader_->SkipRecords(/*num_records=*/30), 30);
// Read 60 more records. Only 30 will be read, since we read 30 and skipped 30,
// so only 30 is left.
ASSERT_EQ(record_reader_->ReadRecords(/*num_records=*/60), 30);
CheckReadValues(60, 90);
record_reader_->Reset();
}
// Test skipping buffered records when reading/skipping more than kMinLevelBatchSize
// levels at a time.
TEST_P(ByteArrayRecordReaderTest, ReadAndBatchSkipOptional) {
MakeRecordReader(/*levels_per_page=*/9000, /*num_pages=*/1);
// Read 100 records and buffer some records.
ASSERT_EQ(record_reader_->ReadRecords(/*num_records=*/100), 100);
CheckReadValues(0, 100);
record_reader_->Reset();
// Skip 3000 records. The buffered records will be skipped.
ASSERT_EQ(record_reader_->SkipRecords(/*num_records=*/3000), 3000);
// Read 900 records and buffer some records again.
ASSERT_EQ(record_reader_->ReadRecords(/*num_records=*/900), 900);
CheckReadValues(3100, 4000);
record_reader_->Reset();
// Skip 3000 records. The buffered records will be skipped.
ASSERT_EQ(record_reader_->SkipRecords(/*num_records=*/3000), 3000);
// Read 3000 records. Only 2000 records are left to be read.
ASSERT_EQ(record_reader_->ReadRecords(/*num_records=*/3000), 2000);
CheckReadValues(7000, 9000);
record_reader_->Reset();
}
// Tests reading and skipping an optional FLBA field.
// The binary readers only differ in DecodeDense and DecodeSpaced functions, so
// testing optional is sufficient in exercising those code paths.
TEST_P(FLBARecordReaderTest, ReadAndSkipOptional) {
MakeRecordReader(/*levels_per_page=*/90, /*num_pages=*/1, /*FLBA_type_length=*/4);
// Read one-third of the page.
ASSERT_EQ(record_reader_->ReadRecords(/*num_records=*/30), 30);
CheckReadValues(0, 30);
record_reader_->Reset();
// Skip 30 records.
ASSERT_EQ(record_reader_->SkipRecords(/*num_records=*/30), 30);
// Read 60 more records. Only 30 will be read, since we read 30 and skipped 30,
// so only 30 is left.
ASSERT_EQ(record_reader_->ReadRecords(/*num_records=*/60), 30);
CheckReadValues(60, 90);
record_reader_->Reset();
}
INSTANTIATE_TEST_SUITE_P(ByteArrayRecordReaderTests, ByteArrayRecordReaderTest,
testing::Bool());
INSTANTIATE_TEST_SUITE_P(FLBARecordReaderTests, FLBARecordReaderTest, testing::Bool());
// Test random combination of ReadRecords and SkipRecords.
class RecordReaderStressTest : public ::testing::TestWithParam<Repetition::type> {};
TEST_P(RecordReaderStressTest, StressTest) {
internal::LevelInfo level_info;
// Define these boolean variables for improving readability below.
bool repeated = false, required = false;
if (GetParam() == Repetition::REQUIRED) {
level_info.def_level = 0;
level_info.rep_level = 0;
required = true;
} else if (GetParam() == Repetition::OPTIONAL) {
level_info.def_level = 1;
level_info.rep_level = 0;
} else {
level_info.def_level = 1;
level_info.rep_level = 1;
repeated = true;
}
NodePtr type = schema::Int32("b", GetParam());
const ColumnDescriptor descr(type, level_info.def_level, level_info.rep_level);
auto seed1 = static_cast<uint32_t>(time(0));
std::default_random_engine gen(seed1);
// Generate random number of pages with random number of values per page.
std::uniform_int_distribution<int> d(0, 2000);
const int num_pages = d(gen);
const int levels_per_page = d(gen);
std::vector<int32_t> values;
std::vector<int16_t> def_levels;
std::vector<int16_t> rep_levels;
std::vector<uint8_t> data_buffer;
std::vector<std::shared_ptr<Page>> pages;
auto seed2 = static_cast<uint32_t>(time(0));
// Uses time(0) as seed so it would run a different test every time it is
// run.
MakePages<Int32Type>(&descr, num_pages, levels_per_page, def_levels, rep_levels, values,
data_buffer, pages, Encoding::PLAIN, seed2);
std::unique_ptr<PageReader> pager;
pager.reset(new test::MockPageReader(pages));
// Set up the RecordReader.
std::shared_ptr<internal::RecordReader> record_reader =
internal::RecordReader::Make(&descr, level_info);
record_reader->SetPageReader(std::move(pager));
// Figure out how many total records.
int total_records = 0;
if (repeated) {
for (int16_t rep : rep_levels) {
if (rep == 0) {
++total_records;
}
}
} else {
total_records = static_cast<int>(def_levels.size());
}
// Generate a sequence of reads and skips.
int records_left = total_records;
// The first element of the pair is 1 if SkipRecords and 0 if ReadRecords.
// The second element indicates the number of records for reading or
// skipping.
std::vector<std::pair<bool, int>> sequence;
while (records_left > 0) {
std::uniform_int_distribution<int> d(0, records_left);
// Generate a number to decide if this is a skip or read.
bool is_skip = d(gen) < records_left / 2;
int num_records = d(gen);
sequence.emplace_back(is_skip, num_records);
records_left -= num_records;
}
// The levels_index and values_index are over the original vectors that have
// all the rep/def values for all the records. In the following loop, we will
// read/skip a number of records and Reset the reader after each iteration.
// This is on-par with how the record reader is used.
size_t levels_index = 0;
size_t values_index = 0;
for (const auto& [is_skip, num_records] : sequence) {
// Reset the reader before the next round of read/skip.
record_reader->Reset();
// Prepare the expected result and do the SkipRecords and ReadRecords.
std::vector<int32_t> expected_values;
std::vector<int16_t> expected_def_levels;
std::vector<int16_t> expected_rep_levels;
bool inside_repeated_field = false;
int read_records = 0;
while (read_records < num_records || inside_repeated_field) {
if (!repeated || (repeated && rep_levels[levels_index] == 0)) {
++read_records;
}
bool has_value =
required || (!required && def_levels[levels_index] == level_info.def_level);
// If we are not skipping, we need to update the expected values and
// rep/defs. If we are skipping, we just keep going.
if (!is_skip) {
if (!required) {
expected_def_levels.push_back(def_levels[levels_index]);
if (!has_value) {
expected_values.push_back(-1);
}
}
if (repeated) {
expected_rep_levels.push_back(rep_levels[levels_index]);
}
if (has_value) {
expected_values.push_back(values[values_index]);
}
}
if (has_value) {
++values_index;
}
// If we are in the middle of a repeated field, we should keep going
// until we consume it all.
if (repeated && levels_index + 1 < rep_levels.size() &&
rep_levels[levels_index + 1] == 1) {
inside_repeated_field = true;
} else {
inside_repeated_field = false;
}
++levels_index;
}
// Print out the seeds with each failing ASSERT to easily reproduce the bug.
std::string seeds = "seeds: " + std::to_string(seed1) + " " + std::to_string(seed2);
// Perform the actual read/skip.
if (is_skip) {
int64_t skipped_records = record_reader->SkipRecords(num_records);
ASSERT_EQ(skipped_records, num_records) << seeds;
} else {
int64_t read_records = record_reader->ReadRecords(num_records);
ASSERT_EQ(read_records, num_records) << seeds;
}
const auto read_values = reinterpret_cast<const int32_t*>(record_reader->values());
if (required) {
ASSERT_EQ(record_reader->null_count(), 0) << seeds;
}
std::vector<int32_t> read_vals(read_values,
read_values + record_reader->values_written());
for (size_t i = 0; i < expected_values.size(); ++i) {
if (expected_values[i] != -1) {
ASSERT_EQ(read_vals[i], expected_values[i]) << seeds;
}
}
if (!required) {
std::vector<int16_t> read_def_levels(
record_reader->def_levels(),
record_reader->def_levels() + record_reader->levels_position());
ASSERT_TRUE(vector_equal(read_def_levels, expected_def_levels)) << seeds;
}
if (repeated) {
std::vector<int16_t> read_rep_levels(
record_reader->rep_levels(),
record_reader->rep_levels() + record_reader->levels_position());
ASSERT_TRUE(vector_equal(read_rep_levels, expected_rep_levels)) << seeds;
}
}
}
INSTANTIATE_TEST_SUITE_P(Repetition_type, RecordReaderStressTest,
::testing::Values(Repetition::REQUIRED, Repetition::OPTIONAL,
Repetition::REPEATED));
} // namespace test
} // namespace parquet