blob: a3af5ac911f148b6afa1acbc37a0b78b49f0e29e [file] [log] [blame]
// licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "parquet/arrow/record_reader.h"
#include <algorithm>
#include <cstdint>
#include <memory>
#include <sstream>
#include <arrow/buffer.h>
#include <arrow/memory_pool.h>
#include <arrow/status.h>
#include <arrow/util/bit-util.h>
#include <arrow/util/rle-encoding.h>
#include "parquet/column_page.h"
#include "parquet/column_reader.h"
#include "parquet/encoding-internal.h"
#include "parquet/exception.h"
#include "parquet/properties.h"
using arrow::MemoryPool;
namespace parquet {
namespace internal {
namespace BitUtil = ::arrow::BitUtil;
template <typename DType>
class TypedRecordReader;
// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
// encoding.
static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
}
class RecordReader::RecordReaderImpl {
public:
RecordReaderImpl(const ColumnDescriptor* descr, MemoryPool* pool)
: descr_(descr),
pool_(pool),
num_buffered_values_(0),
num_decoded_values_(0),
max_def_level_(descr->max_definition_level()),
max_rep_level_(descr->max_repetition_level()),
at_record_start_(false),
records_read_(0),
values_written_(0),
values_capacity_(0),
null_count_(0),
levels_written_(0),
levels_position_(0),
levels_capacity_(0) {
nullable_values_ = internal::HasSpacedValues(descr);
values_ = std::make_shared<PoolBuffer>(pool);
valid_bits_ = std::make_shared<PoolBuffer>(pool);
def_levels_ = std::make_shared<PoolBuffer>(pool);
rep_levels_ = std::make_shared<PoolBuffer>(pool);
if (descr->physical_type() == Type::BYTE_ARRAY) {
builder_.reset(new ::arrow::BinaryBuilder(pool));
} else if (descr->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
int byte_width = descr->type_length();
std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width);
builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, pool));
}
Reset();
}
virtual ~RecordReaderImpl() = default;
virtual int64_t ReadRecords(int64_t num_records) = 0;
// Dictionary decoders must be reset when advancing row groups
virtual void ResetDecoders() = 0;
void SetPageReader(std::unique_ptr<PageReader> reader) {
pager_ = std::move(reader);
ResetDecoders();
}
bool HasMoreData() const { return pager_ != nullptr; }
int16_t* def_levels() const {
return reinterpret_cast<int16_t*>(def_levels_->mutable_data());
}
int16_t* rep_levels() {
return reinterpret_cast<int16_t*>(rep_levels_->mutable_data());
}
uint8_t* values() const { return values_->mutable_data(); }
/// \brief Number of values written including nulls (if any)
int64_t values_written() const { return values_written_; }
int64_t levels_position() const { return levels_position_; }
int64_t levels_written() const { return levels_written_; }
// We may outwardly have the appearance of having exhausted a column chunk
// when in fact we are in the middle of processing the last batch
bool has_values_to_process() const { return levels_position_ < levels_written_; }
int64_t null_count() const { return null_count_; }
bool nullable_values() const { return nullable_values_; }
std::shared_ptr<PoolBuffer> ReleaseValues() {
auto result = values_;
values_ = std::make_shared<PoolBuffer>(pool_);
return result;
}
std::shared_ptr<PoolBuffer> ReleaseIsValid() {
auto result = valid_bits_;
valid_bits_ = std::make_shared<PoolBuffer>(pool_);
return result;
}
::arrow::ArrayBuilder* builder() { return builder_.get(); }
// Process written repetition/definition levels to reach the end of
// records. Process no more levels than necessary to delimit the indicated
// number of logical records. Updates internal state of RecordReader
//
// \return Number of records delimited
int64_t DelimitRecords(int64_t num_records, int64_t* values_seen) {
int64_t values_to_read = 0;
int64_t records_read = 0;
const int16_t* def_levels = this->def_levels() + levels_position_;
const int16_t* rep_levels = this->rep_levels() + levels_position_;
DCHECK_GT(max_rep_level_, 0);
// Count logical records and number of values to read
while (levels_position_ < levels_written_) {
if (*rep_levels++ == 0) {
at_record_start_ = true;
if (records_read == num_records) {
// We've found the number of records we were looking for
break;
} else {
// Continue
++records_read;
}
} else {
at_record_start_ = false;
}
if (*def_levels++ == max_def_level_) {
++values_to_read;
}
++levels_position_;
}
*values_seen = values_to_read;
return records_read;
}
// Read multiple definition levels into preallocated memory
//
// Returns the number of decoded definition levels
int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels) {
if (descr_->max_definition_level() == 0) {
return 0;
}
return definition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
}
int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels) {
if (descr_->max_repetition_level() == 0) {
return 0;
}
return repetition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
}
int64_t available_values_current_page() const {
return num_buffered_values_ - num_decoded_values_;
}
void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; }
Type::type type() const { return descr_->physical_type(); }
const ColumnDescriptor* descr() const { return descr_; }
void Reserve(int64_t capacity) {
ReserveLevels(capacity);
ReserveValues(capacity);
}
void ReserveLevels(int64_t capacity) {
if (descr_->max_definition_level() > 0 &&
(levels_written_ + capacity > levels_capacity_)) {
int64_t new_levels_capacity = BitUtil::NextPower2(levels_capacity_ + 1);
while (levels_written_ + capacity > new_levels_capacity) {
new_levels_capacity = BitUtil::NextPower2(new_levels_capacity + 1);
}
PARQUET_THROW_NOT_OK(
def_levels_->Resize(new_levels_capacity * sizeof(int16_t), false));
if (descr_->max_repetition_level() > 0) {
PARQUET_THROW_NOT_OK(
rep_levels_->Resize(new_levels_capacity * sizeof(int16_t), false));
}
levels_capacity_ = new_levels_capacity;
}
}
void ReserveValues(int64_t capacity) {
if (values_written_ + capacity > values_capacity_) {
int64_t new_values_capacity = BitUtil::NextPower2(values_capacity_ + 1);
while (values_written_ + capacity > new_values_capacity) {
new_values_capacity = BitUtil::NextPower2(new_values_capacity + 1);
}
int type_size = GetTypeByteSize(descr_->physical_type());
PARQUET_THROW_NOT_OK(values_->Resize(new_values_capacity * type_size, false));
values_capacity_ = new_values_capacity;
}
if (nullable_values_) {
int64_t valid_bytes_new = BitUtil::BytesForBits(values_capacity_);
if (valid_bits_->size() < valid_bytes_new) {
int64_t valid_bytes_old = BitUtil::BytesForBits(values_written_);
PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false));
// Avoid valgrind warnings
memset(valid_bits_->mutable_data() + valid_bytes_old, 0,
valid_bytes_new - valid_bytes_old);
}
}
}
void Reset() {
ResetValues();
if (levels_written_ > 0) {
const int64_t levels_remaining = levels_written_ - levels_position_;
// Shift remaining levels to beginning of buffer and trim to only the number
// of decoded levels remaining
int16_t* def_data = def_levels();
int16_t* rep_data = rep_levels();
std::copy(def_data + levels_position_, def_data + levels_written_, def_data);
std::copy(rep_data + levels_position_, rep_data + levels_written_, rep_data);
PARQUET_THROW_NOT_OK(
def_levels_->Resize(levels_remaining * sizeof(int16_t), false));
PARQUET_THROW_NOT_OK(
rep_levels_->Resize(levels_remaining * sizeof(int16_t), false));
levels_written_ -= levels_position_;
levels_position_ = 0;
levels_capacity_ = levels_remaining;
}
records_read_ = 0;
// Calling Finish on the builders also resets them
}
void ResetValues() {
if (values_written_ > 0) {
// Resize to 0, but do not shrink to fit
PARQUET_THROW_NOT_OK(values_->Resize(0, false));
PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false));
values_written_ = 0;
values_capacity_ = 0;
null_count_ = 0;
}
}
protected:
const ColumnDescriptor* descr_;
::arrow::MemoryPool* pool_;
std::unique_ptr<PageReader> pager_;
std::shared_ptr<Page> current_page_;
// Not set if full schema for this field has no optional or repeated elements
LevelDecoder definition_level_decoder_;
// Not set for flat schemas.
LevelDecoder repetition_level_decoder_;
// The total number of values stored in the data page. This is the maximum of
// the number of encoded definition levels or encoded values. For
// non-repeated, required columns, this is equal to the number of encoded
// values. For repeated or optional values, there may be fewer data values
// than levels, and this tells you how many encoded levels there are in that
// case.
int64_t num_buffered_values_;
// The number of values from the current data page that have been decoded
// into memory
int64_t num_decoded_values_;
const int16_t max_def_level_;
const int16_t max_rep_level_;
bool nullable_values_;
bool at_record_start_;
int64_t records_read_;
int64_t values_written_;
int64_t values_capacity_;
int64_t null_count_;
int64_t levels_written_;
int64_t levels_position_;
int64_t levels_capacity_;
// TODO(wesm): ByteArray / FixedLenByteArray types
std::unique_ptr<::arrow::ArrayBuilder> builder_;
std::shared_ptr<::arrow::PoolBuffer> values_;
template <typename T>
T* ValuesHead() {
return reinterpret_cast<T*>(values_->mutable_data()) + values_written_;
}
std::shared_ptr<::arrow::PoolBuffer> valid_bits_;
std::shared_ptr<::arrow::PoolBuffer> def_levels_;
std::shared_ptr<::arrow::PoolBuffer> rep_levels_;
};
// The minimum number of repetition/definition levels to decode at a time, for
// better vectorized performance when doing many smaller record reads
constexpr int64_t kMinLevelBatchSize = 1024;
template <typename DType>
class TypedRecordReader : public RecordReader::RecordReaderImpl {
public:
typedef typename DType::c_type T;
TypedRecordReader(const ColumnDescriptor* schema, ::arrow::MemoryPool* pool)
: RecordReader::RecordReaderImpl(schema, pool), current_decoder_(nullptr) {}
void ResetDecoders() override { decoders_.clear(); }
inline void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) {
uint8_t* valid_bits = valid_bits_->mutable_data();
const int64_t valid_bits_offset = values_written_;
int64_t num_decoded = current_decoder_->DecodeSpaced(
ValuesHead<T>(), static_cast<int>(values_with_nulls),
static_cast<int>(null_count), valid_bits, valid_bits_offset);
DCHECK_EQ(num_decoded, values_with_nulls);
}
inline void ReadValuesDense(int64_t values_to_read) {
int64_t num_decoded =
current_decoder_->Decode(ValuesHead<T>(), static_cast<int>(values_to_read));
DCHECK_EQ(num_decoded, values_to_read);
}
// Return number of logical records read
int64_t ReadRecordData(const int64_t num_records) {
// Conservative upper bound
const int64_t possible_num_values =
std::max(num_records, levels_written_ - levels_position_);
ReserveValues(possible_num_values);
const int64_t start_levels_position = levels_position_;
int64_t values_to_read = 0;
int64_t records_read = 0;
if (max_rep_level_ > 0) {
records_read = DelimitRecords(num_records, &values_to_read);
} else if (max_def_level_ > 0) {
// No repetition levels, skip delimiting logic. Each level represents a
// null or not null entry
records_read = std::min(levels_written_ - levels_position_, num_records);
// This is advanced by DelimitRecords, which we skipped
levels_position_ += records_read;
} else {
records_read = values_to_read = num_records;
}
int64_t null_count = 0;
if (nullable_values_) {
int64_t values_with_nulls = 0;
internal::DefinitionLevelsToBitmap(
def_levels() + start_levels_position, levels_position_ - start_levels_position,
max_def_level_, max_rep_level_, &values_with_nulls, &null_count,
valid_bits_->mutable_data(), values_written_);
values_to_read = values_with_nulls - null_count;
ReadValuesSpaced(values_with_nulls, null_count);
ConsumeBufferedValues(levels_position_ - start_levels_position);
} else {
ReadValuesDense(values_to_read);
ConsumeBufferedValues(values_to_read);
}
// Total values, including null spaces, if any
values_written_ += values_to_read + null_count;
null_count_ += null_count;
return records_read;
}
// Returns true if there are still values in this column.
bool HasNext() {
// Either there is no data page available yet, or the data page has been
// exhausted
if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) {
if (!ReadNewPage() || num_buffered_values_ == 0) {
return false;
}
}
return true;
}
int64_t ReadRecords(int64_t num_records) override {
// Delimit records, then read values at the end
int64_t records_read = 0;
if (levels_position_ < levels_written_) {
records_read += ReadRecordData(num_records);
}
// HasNext invokes ReadNewPage
if (records_read == 0 && !HasNext()) {
return 0;
}
int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records);
// If we are in the middle of a record, we continue until reaching the
// desired number of records or the end of the current record if we've found
// enough records
while (!at_record_start_ || records_read < num_records) {
// Is there more data to read in this row group?
if (!HasNext()) {
break;
}
/// We perform multiple batch reads until we either exhaust the row group
/// or observe the desired number of records
int64_t batch_size = std::min(level_batch_size, available_values_current_page());
// No more data in column
if (batch_size == 0) {
break;
}
if (max_def_level_ > 0) {
ReserveLevels(batch_size);
int16_t* def_levels = this->def_levels() + levels_written_;
int16_t* rep_levels = this->rep_levels() + levels_written_;
// Not present for non-repeated fields
int64_t levels_read = 0;
if (max_rep_level_ > 0) {
levels_read = ReadDefinitionLevels(batch_size, def_levels);
if (ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
throw ParquetException("Number of decoded rep / def levels did not match");
}
} else if (max_def_level_ > 0) {
levels_read = ReadDefinitionLevels(batch_size, def_levels);
}
// Exhausted column chunk
if (levels_read == 0) {
break;
}
levels_written_ += levels_read;
records_read += ReadRecordData(num_records - records_read);
} else {
// No repetition or definition levels
batch_size = std::min(num_records - records_read, batch_size);
records_read += ReadRecordData(batch_size);
}
}
return records_read;
}
private:
typedef Decoder<DType> DecoderType;
// Map of encoding type to the respective decoder object. For example, a
// column chunk's data pages may include both dictionary-encoded and
// plain-encoded data.
std::unordered_map<int, std::shared_ptr<DecoderType>> decoders_;
DecoderType* current_decoder_;
// Advance to the next data page
bool ReadNewPage();
void ConfigureDictionary(const DictionaryPage* page);
};
template <>
inline void TypedRecordReader<ByteArrayType>::ReadValuesDense(int64_t values_to_read) {
auto values = ValuesHead<ByteArray>();
int64_t num_decoded =
current_decoder_->Decode(values, static_cast<int>(values_to_read));
DCHECK_EQ(num_decoded, values_to_read);
auto builder = static_cast<::arrow::BinaryBuilder*>(builder_.get());
for (int64_t i = 0; i < num_decoded; i++) {
PARQUET_THROW_NOT_OK(
builder->Append(values[i].ptr, static_cast<int64_t>(values[i].len)));
}
ResetValues();
}
template <>
inline void TypedRecordReader<FLBAType>::ReadValuesDense(int64_t values_to_read) {
auto values = ValuesHead<FLBA>();
int64_t num_decoded =
current_decoder_->Decode(values, static_cast<int>(values_to_read));
DCHECK_EQ(num_decoded, values_to_read);
auto builder = static_cast<::arrow::FixedSizeBinaryBuilder*>(builder_.get());
for (int64_t i = 0; i < num_decoded; i++) {
PARQUET_THROW_NOT_OK(builder->Append(values[i].ptr));
}
ResetValues();
}
template <>
inline void TypedRecordReader<ByteArrayType>::ReadValuesSpaced(int64_t values_to_read,
int64_t null_count) {
uint8_t* valid_bits = valid_bits_->mutable_data();
const int64_t valid_bits_offset = values_written_;
auto values = ValuesHead<ByteArray>();
int64_t num_decoded = current_decoder_->DecodeSpaced(
values, static_cast<int>(values_to_read), static_cast<int>(null_count), valid_bits,
valid_bits_offset);
DCHECK_EQ(num_decoded, values_to_read);
auto builder = static_cast<::arrow::BinaryBuilder*>(builder_.get());
for (int64_t i = 0; i < num_decoded; i++) {
if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) {
PARQUET_THROW_NOT_OK(
builder->Append(values[i].ptr, static_cast<int64_t>(values[i].len)));
} else {
PARQUET_THROW_NOT_OK(builder->AppendNull());
}
}
ResetValues();
}
template <>
inline void TypedRecordReader<FLBAType>::ReadValuesSpaced(int64_t values_to_read,
int64_t null_count) {
uint8_t* valid_bits = valid_bits_->mutable_data();
const int64_t valid_bits_offset = values_written_;
auto values = ValuesHead<FLBA>();
int64_t num_decoded = current_decoder_->DecodeSpaced(
values, static_cast<int>(values_to_read), static_cast<int>(null_count), valid_bits,
valid_bits_offset);
DCHECK_EQ(num_decoded, values_to_read);
auto builder = static_cast<::arrow::FixedSizeBinaryBuilder*>(builder_.get());
for (int64_t i = 0; i < num_decoded; i++) {
if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) {
PARQUET_THROW_NOT_OK(builder->Append(values[i].ptr));
} else {
PARQUET_THROW_NOT_OK(builder->AppendNull());
}
}
ResetValues();
}
template <typename DType>
inline void TypedRecordReader<DType>::ConfigureDictionary(const DictionaryPage* page) {
int encoding = static_cast<int>(page->encoding());
if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
page->encoding() == Encoding::PLAIN) {
encoding = static_cast<int>(Encoding::RLE_DICTIONARY);
}
auto it = decoders_.find(encoding);
if (it != decoders_.end()) {
throw ParquetException("Column cannot have more than one dictionary.");
}
if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
page->encoding() == Encoding::PLAIN) {
PlainDecoder<DType> dictionary(descr_);
dictionary.SetData(page->num_values(), page->data(), page->size());
// The dictionary is fully decoded during DictionaryDecoder::Init, so the
// DictionaryPage buffer is no longer required after this step
//
// TODO(wesm): investigate whether this all-or-nothing decoding of the
// dictionary makes sense and whether performance can be improved
auto decoder = std::make_shared<DictionaryDecoder<DType>>(descr_, pool_);
decoder->SetDict(&dictionary);
decoders_[encoding] = decoder;
} else {
ParquetException::NYI("only plain dictionary encoding has been implemented");
}
current_decoder_ = decoders_[encoding].get();
}
template <typename DType>
bool TypedRecordReader<DType>::ReadNewPage() {
// Loop until we find the next data page.
const uint8_t* buffer;
while (true) {
current_page_ = pager_->NextPage();
if (!current_page_) {
// EOS
return false;
}
if (current_page_->type() == PageType::DICTIONARY_PAGE) {
ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
continue;
} else if (current_page_->type() == PageType::DATA_PAGE) {
const DataPage* page = static_cast<const DataPage*>(current_page_.get());
// Read a data page.
num_buffered_values_ = page->num_values();
// Have not decoded any values from the data page yet
num_decoded_values_ = 0;
buffer = page->data();
// If the data page includes repetition and definition levels, we
// initialize the level decoder and subtract the encoded level bytes from
// the page size to determine the number of bytes in the encoded data.
int64_t data_size = page->size();
// Data page Layout: Repetition Levels - Definition Levels - encoded values.
// Levels are encoded as rle or bit-packed.
// Init repetition levels
if (descr_->max_repetition_level() > 0) {
int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
page->repetition_level_encoding(), descr_->max_repetition_level(),
static_cast<int>(num_buffered_values_), buffer);
buffer += rep_levels_bytes;
data_size -= rep_levels_bytes;
}
// TODO figure a way to set max_definition_level_ to 0
// if the initial value is invalid
// Init definition levels
if (descr_->max_definition_level() > 0) {
int64_t def_levels_bytes = definition_level_decoder_.SetData(
page->definition_level_encoding(), descr_->max_definition_level(),
static_cast<int>(num_buffered_values_), buffer);
buffer += def_levels_bytes;
data_size -= def_levels_bytes;
}
// Get a decoder object for this page or create a new decoder if this is the
// first page with this encoding.
Encoding::type encoding = page->encoding();
if (IsDictionaryIndexEncoding(encoding)) {
encoding = Encoding::RLE_DICTIONARY;
}
auto it = decoders_.find(static_cast<int>(encoding));
if (it != decoders_.end()) {
if (encoding == Encoding::RLE_DICTIONARY) {
DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
}
current_decoder_ = it->second.get();
} else {
switch (encoding) {
case Encoding::PLAIN: {
std::shared_ptr<DecoderType> decoder(new PlainDecoder<DType>(descr_));
decoders_[static_cast<int>(encoding)] = decoder;
current_decoder_ = decoder.get();
break;
}
case Encoding::RLE_DICTIONARY:
throw ParquetException("Dictionary page must be before data page.");
case Encoding::DELTA_BINARY_PACKED:
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
case Encoding::DELTA_BYTE_ARRAY:
ParquetException::NYI("Unsupported encoding");
default:
throw ParquetException("Unknown encoding type.");
}
}
current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
static_cast<int>(data_size));
return true;
} else {
// We don't know what this page type is. We're allowed to skip non-data
// pages.
continue;
}
}
return true;
}
std::shared_ptr<RecordReader> RecordReader::Make(const ColumnDescriptor* descr,
MemoryPool* pool) {
switch (descr->physical_type()) {
case Type::BOOLEAN:
return std::shared_ptr<RecordReader>(
new RecordReader(new TypedRecordReader<BooleanType>(descr, pool)));
case Type::INT32:
return std::shared_ptr<RecordReader>(
new RecordReader(new TypedRecordReader<Int32Type>(descr, pool)));
case Type::INT64:
return std::shared_ptr<RecordReader>(
new RecordReader(new TypedRecordReader<Int64Type>(descr, pool)));
case Type::INT96:
return std::shared_ptr<RecordReader>(
new RecordReader(new TypedRecordReader<Int96Type>(descr, pool)));
case Type::FLOAT:
return std::shared_ptr<RecordReader>(
new RecordReader(new TypedRecordReader<FloatType>(descr, pool)));
case Type::DOUBLE:
return std::shared_ptr<RecordReader>(
new RecordReader(new TypedRecordReader<DoubleType>(descr, pool)));
case Type::BYTE_ARRAY:
return std::shared_ptr<RecordReader>(
new RecordReader(new TypedRecordReader<ByteArrayType>(descr, pool)));
case Type::FIXED_LEN_BYTE_ARRAY:
return std::shared_ptr<RecordReader>(
new RecordReader(new TypedRecordReader<FLBAType>(descr, pool)));
default:
DCHECK(false);
}
// Unreachable code, but supress compiler warning
return nullptr;
}
// ----------------------------------------------------------------------
// Implement public API
RecordReader::RecordReader(RecordReaderImpl* impl) { impl_.reset(impl); }
RecordReader::~RecordReader() {}
int64_t RecordReader::ReadRecords(int64_t num_records) {
return impl_->ReadRecords(num_records);
}
void RecordReader::Reset() { return impl_->Reset(); }
void RecordReader::Reserve(int64_t num_values) { impl_->Reserve(num_values); }
const int16_t* RecordReader::def_levels() const { return impl_->def_levels(); }
const int16_t* RecordReader::rep_levels() const { return impl_->rep_levels(); }
const uint8_t* RecordReader::values() const { return impl_->values(); }
std::shared_ptr<PoolBuffer> RecordReader::ReleaseValues() {
return impl_->ReleaseValues();
}
std::shared_ptr<PoolBuffer> RecordReader::ReleaseIsValid() {
return impl_->ReleaseIsValid();
}
::arrow::ArrayBuilder* RecordReader::builder() { return impl_->builder(); }
int64_t RecordReader::values_written() const { return impl_->values_written(); }
int64_t RecordReader::levels_position() const { return impl_->levels_position(); }
int64_t RecordReader::levels_written() const { return impl_->levels_written(); }
int64_t RecordReader::null_count() const { return impl_->null_count(); }
bool RecordReader::nullable_values() const { return impl_->nullable_values(); }
bool RecordReader::HasMoreData() const { return impl_->HasMoreData(); }
void RecordReader::SetPageReader(std::unique_ptr<PageReader> reader) {
impl_->SetPageReader(std::move(reader));
}
} // namespace internal
} // namespace parquet