blob: 79b837f755c38cf3f00b4017b742f361069f92a3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "parquet/column_reader.h"
#include <algorithm>
#include <cstdint>
#include <cstring>
#include <exception>
#include <iostream>
#include <memory>
#include <string>
#include <type_traits>
#include <unordered_map>
#include <utility>
#include <vector>
#include "arrow/array.h"
#include "arrow/array/array_binary.h"
#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_dict.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/chunked_array.h"
#include "arrow/type.h"
#include "arrow/util/bit_stream_utils_internal.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/compression.h"
#include "arrow/util/crc32.h"
#include "arrow/util/int_util_overflow.h"
#include "arrow/util/logging.h"
#include "arrow/util/rle_encoding_internal.h"
#include "arrow/util/unreachable.h"
#include "parquet/column_page.h"
#include "parquet/encoding.h"
#include "parquet/encryption/encryption_internal.h"
#include "parquet/encryption/internal_file_decryptor.h"
#include "parquet/exception.h"
#include "parquet/level_comparison.h"
#include "parquet/level_conversion.h"
#include "parquet/properties.h"
#include "parquet/statistics.h"
#include "parquet/thrift_internal.h" // IWYU pragma: keep
#include "parquet/windows_fixup.h" // for OPTIONAL
#ifdef _MSC_VER
// disable warning about inheritance via dominance in the diamond pattern
# pragma warning(disable : 4250)
#endif
using arrow::MemoryPool;
using arrow::internal::AddWithOverflow;
using arrow::internal::checked_cast;
using arrow::internal::MultiplyWithOverflow;
namespace bit_util = arrow::bit_util;
namespace parquet {
namespace {
// The minimum number of repetition/definition levels to decode at a time, for
// better vectorized performance when doing many smaller record reads
constexpr int64_t kMinLevelBatchSize = 1024;
// Batch size for reading and throwing away values during skip.
// Both RecordReader and the ColumnReader use this for skipping.
constexpr int64_t kSkipScratchBatchSize = 1024;
// Throws exception if number_decoded does not match expected.
inline void CheckNumberDecoded(int64_t number_decoded, int64_t expected) {
if (ARROW_PREDICT_FALSE(number_decoded != expected)) {
ParquetException::EofException("Decoded values " + std::to_string(number_decoded) +
" does not match expected " +
std::to_string(expected));
}
}
constexpr std::string_view kErrorRepDefLevelNotMatchesNumValues =
"Number of decoded rep / def levels do not match num_values in page header";
} // namespace
LevelDecoder::LevelDecoder() : num_values_remaining_(0) {}
LevelDecoder::~LevelDecoder() = default;
int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
int num_buffered_values, const uint8_t* data,
int32_t data_size) {
max_level_ = max_level;
int32_t num_bytes = 0;
encoding_ = encoding;
num_values_remaining_ = num_buffered_values;
bit_width_ = bit_util::Log2(max_level + 1);
switch (encoding) {
case Encoding::RLE: {
if (data_size < 4) {
throw ParquetException("Received invalid levels (corrupt data page?)");
}
num_bytes = ::arrow::util::SafeLoadAs<int32_t>(data);
if (num_bytes < 0 || num_bytes > data_size - 4) {
throw ParquetException("Received invalid number of bytes (corrupt data page?)");
}
const uint8_t* decoder_data = data + 4;
if (!rle_decoder_) {
rle_decoder_ = std::make_unique<::arrow::util::RleBitPackedDecoder<int16_t>>(
decoder_data, num_bytes, bit_width_);
} else {
rle_decoder_->Reset(decoder_data, num_bytes, bit_width_);
}
return 4 + num_bytes;
}
case Encoding::BIT_PACKED: {
int num_bits = 0;
if (MultiplyWithOverflow(num_buffered_values, bit_width_, &num_bits)) {
throw ParquetException(
"Number of buffered values too large (corrupt data page?)");
}
num_bytes = static_cast<int32_t>(bit_util::BytesForBits(num_bits));
if (num_bytes < 0 || num_bytes > data_size) {
throw ParquetException("Received invalid number of bytes (corrupt data page?)");
}
if (!bit_packed_decoder_) {
bit_packed_decoder_ =
std::make_unique<::arrow::bit_util::BitReader>(data, num_bytes);
} else {
bit_packed_decoder_->Reset(data, num_bytes);
}
return num_bytes;
}
default:
throw ParquetException("Unknown encoding type for levels.");
}
return -1;
}
void LevelDecoder::SetDataV2(int32_t num_bytes, int16_t max_level,
int num_buffered_values, const uint8_t* data) {
max_level_ = max_level;
// Repetition and definition levels always uses RLE encoding
// in the DataPageV2 format.
if (num_bytes < 0) {
throw ParquetException("Invalid page header (corrupt data page?)");
}
encoding_ = Encoding::RLE;
num_values_remaining_ = num_buffered_values;
bit_width_ = bit_util::Log2(max_level + 1);
if (!rle_decoder_) {
rle_decoder_ = std::make_unique<::arrow::util::RleBitPackedDecoder<int16_t>>(
data, num_bytes, bit_width_);
} else {
rle_decoder_->Reset(data, num_bytes, bit_width_);
}
}
int LevelDecoder::Decode(int batch_size, int16_t* levels) {
int num_decoded = 0;
int num_values = std::min(num_values_remaining_, batch_size);
if (encoding_ == Encoding::RLE) {
num_decoded = rle_decoder_->GetBatch(levels, num_values);
} else {
num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values);
}
if (num_decoded > 0) {
internal::MinMax min_max = internal::FindMinMax(levels, num_decoded);
if (ARROW_PREDICT_FALSE(min_max.min < 0 || min_max.max > max_level_)) {
std::stringstream ss;
ss << "Malformed levels. min: " << min_max.min << " max: " << min_max.max
<< " out of range. Max Level: " << max_level_;
throw ParquetException(ss.str());
}
}
num_values_remaining_ -= num_decoded;
return num_decoded;
}
ReaderProperties default_reader_properties() {
static ReaderProperties default_reader_properties;
return default_reader_properties;
}
namespace {
// Extracts encoded statistics from V1 and V2 data page headers
template <typename H>
EncodedStatistics ExtractStatsFromHeader(const H& header) {
EncodedStatistics page_statistics;
if (header.__isset.statistics) {
page_statistics = FromThrift(header.statistics);
}
return page_statistics;
}
void CheckNumValuesInHeader(int num_values) {
if (num_values < 0) {
throw ParquetException("Invalid page header (negative number of values)");
}
}
// ----------------------------------------------------------------------
// SerializedPageReader deserializes Thrift metadata and pages that have been
// assembled in a serialized stream for storing in a Parquet files
// This subclass delimits pages appearing in a serialized stream, each preceded
// by a serialized Thrift format::PageHeader indicating the type of each page
// and the page metadata.
class SerializedPageReader : public PageReader {
public:
SerializedPageReader(std::shared_ptr<ArrowInputStream> stream, int64_t total_num_values,
Compression::type codec, const ReaderProperties& properties,
const CryptoContext* crypto_ctx, bool always_compressed)
: properties_(properties),
stream_(std::move(stream)),
decompression_buffer_(AllocateBuffer(properties_.memory_pool(), 0)),
page_ordinal_(0),
seen_num_values_(0),
total_num_values_(total_num_values) {
if (crypto_ctx != nullptr) {
crypto_ctx_ = *crypto_ctx;
InitDecryption();
}
max_page_header_size_ = kDefaultMaxPageHeaderSize;
decompressor_ = GetCodec(codec);
always_compressed_ = always_compressed;
}
// Implement the PageReader interface
//
// The returned Page contains references that aren't guaranteed to live
// beyond the next call to NextPage(). SerializedPageReader reuses the
// decompression buffer internally, so if NextPage() is
// called then the content of previous page might be invalidated.
std::shared_ptr<Page> NextPage() override;
void set_max_page_header_size(uint32_t size) override { max_page_header_size_ = size; }
private:
void UpdateDecryption(Decryptor* decryptor, int8_t module_type, std::string* page_aad);
void InitDecryption();
std::shared_ptr<Buffer> DecompressIfNeeded(std::shared_ptr<Buffer> page_buffer,
int compressed_len, int uncompressed_len,
int levels_byte_len = 0);
// Returns true for non-data pages, and if we should skip based on
// data_page_filter_. Performs basic checks on values in the page header.
// Fills in data_page_statistics.
bool ShouldSkipPage(EncodedStatistics* data_page_statistics);
const ReaderProperties properties_;
std::shared_ptr<ArrowInputStream> stream_;
format::PageHeader current_page_header_;
// Compression codec to use.
std::unique_ptr<::arrow::util::Codec> decompressor_;
std::shared_ptr<ResizableBuffer> decompression_buffer_;
bool always_compressed_;
// The fields below are used for calculation of AAD (additional authenticated data)
// suffix which is part of the Parquet Modular Encryption.
// The AAD suffix for a parquet module is built internally by
// concatenating different parts some of which include
// the row group ordinal, column ordinal and page ordinal.
// Please refer to the encryption specification for more details:
// https://github.com/apache/parquet-format/blob/encryption/Encryption.md#44-additional-authenticated-data
// The CryptoContext used by this PageReader.
CryptoContext crypto_ctx_;
// This PageReader has its own Decryptor instances in order to be thread-safe.
std::unique_ptr<Decryptor> meta_decryptor_;
std::unique_ptr<Decryptor> data_decryptor_;
// The ordinal fields in the context below are used for AAD suffix calculation.
int32_t page_ordinal_; // page ordinal does not count the dictionary page
// Maximum allowed page size
uint32_t max_page_header_size_;
// Number of values read in data pages so far
int64_t seen_num_values_;
// Number of values in all the data pages
int64_t total_num_values_;
// data_page_aad_ and data_page_header_aad_ contain the AAD for data page and data page
// header in a single column respectively.
// While calculating AAD for different pages in a single column the pages AAD is
// updated by only the page ordinal.
std::string data_page_aad_;
std::string data_page_header_aad_;
};
void SerializedPageReader::InitDecryption() {
// Prepare the AAD for quick update later.
if (crypto_ctx_.data_decryptor_factory) {
data_decryptor_ = crypto_ctx_.data_decryptor_factory();
if (data_decryptor_) {
ARROW_DCHECK(!data_decryptor_->file_aad().empty());
data_page_aad_ = encryption::CreateModuleAad(
data_decryptor_->file_aad(), encryption::kDataPage,
crypto_ctx_.row_group_ordinal, crypto_ctx_.column_ordinal, kNonPageOrdinal);
}
}
if (crypto_ctx_.meta_decryptor_factory) {
meta_decryptor_ = crypto_ctx_.meta_decryptor_factory();
if (meta_decryptor_) {
ARROW_DCHECK(!meta_decryptor_->file_aad().empty());
data_page_header_aad_ = encryption::CreateModuleAad(
meta_decryptor_->file_aad(), encryption::kDataPageHeader,
crypto_ctx_.row_group_ordinal, crypto_ctx_.column_ordinal, kNonPageOrdinal);
}
}
}
void SerializedPageReader::UpdateDecryption(Decryptor* decryptor, int8_t module_type,
std::string* page_aad) {
ARROW_DCHECK(decryptor != nullptr);
if (crypto_ctx_.start_decrypt_with_dictionary_page) {
UpdateDecryptor(decryptor, crypto_ctx_.row_group_ordinal, crypto_ctx_.column_ordinal,
module_type);
} else {
encryption::QuickUpdatePageAad(page_ordinal_, page_aad);
decryptor->UpdateAad(*page_aad);
}
}
bool SerializedPageReader::ShouldSkipPage(EncodedStatistics* data_page_statistics) {
const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
if (page_type == PageType::DATA_PAGE) {
const format::DataPageHeader& header = current_page_header_.data_page_header;
CheckNumValuesInHeader(header.num_values);
*data_page_statistics = ExtractStatsFromHeader(header);
seen_num_values_ += header.num_values;
if (data_page_filter_) {
const EncodedStatistics* filter_statistics =
data_page_statistics->is_set() ? data_page_statistics : nullptr;
DataPageStats data_page_stats(filter_statistics, header.num_values,
/*num_rows=*/std::nullopt);
if (data_page_filter_(data_page_stats)) {
return true;
}
}
} else if (page_type == PageType::DATA_PAGE_V2) {
const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
CheckNumValuesInHeader(header.num_values);
if (header.num_rows < 0) {
throw ParquetException("Invalid page header (negative number of rows)");
}
if (header.definition_levels_byte_length < 0 ||
header.repetition_levels_byte_length < 0) {
throw ParquetException("Invalid page header (negative levels byte length)");
}
*data_page_statistics = ExtractStatsFromHeader(header);
seen_num_values_ += header.num_values;
if (data_page_filter_) {
const EncodedStatistics* filter_statistics =
data_page_statistics->is_set() ? data_page_statistics : nullptr;
DataPageStats data_page_stats(filter_statistics, header.num_values,
header.num_rows);
if (data_page_filter_(data_page_stats)) {
return true;
}
}
} else if (page_type == PageType::DICTIONARY_PAGE) {
const format::DictionaryPageHeader& dict_header =
current_page_header_.dictionary_page_header;
CheckNumValuesInHeader(dict_header.num_values);
} else {
// We don't know what this page type is. We're allowed to skip non-data
// pages.
return true;
}
return false;
}
std::shared_ptr<Page> SerializedPageReader::NextPage() {
ThriftDeserializer deserializer(properties_);
// Loop here because there may be unhandled page types that we skip until
// finding a page that we do know what to do with
while (seen_num_values_ < total_num_values_) {
uint32_t header_size = 0;
uint32_t allowed_page_size = kDefaultPageHeaderSize;
// Page headers can be very large because of page statistics
// We try to deserialize a larger buffer progressively
// until a maximum allowed header limit
while (true) {
PARQUET_ASSIGN_OR_THROW(auto view, stream_->Peek(allowed_page_size));
if (view.size() == 0) return nullptr;
// This gets used, then set by DeserializeThriftMsg
header_size = static_cast<uint32_t>(view.size());
try {
if (meta_decryptor_ != nullptr) {
UpdateDecryption(meta_decryptor_.get(), encryption::kDictionaryPageHeader,
&data_page_header_aad_);
}
// Reset current page header to avoid unclearing the __isset flag.
current_page_header_ = format::PageHeader();
deserializer.DeserializeMessage(reinterpret_cast<const uint8_t*>(view.data()),
&header_size, &current_page_header_,
meta_decryptor_.get());
break;
} catch (std::exception& e) {
// Failed to deserialize. Double the allowed page header size and try again
std::stringstream ss;
ss << e.what();
allowed_page_size *= 2;
if (allowed_page_size > max_page_header_size_) {
ss << "Deserializing page header failed.\n";
throw ParquetException(ss.str());
}
}
}
// Advance the stream offset
PARQUET_THROW_NOT_OK(stream_->Advance(header_size));
int32_t compressed_len = current_page_header_.compressed_page_size;
int32_t uncompressed_len = current_page_header_.uncompressed_page_size;
if (compressed_len < 0 || uncompressed_len < 0) {
throw ParquetException("Invalid page header");
}
EncodedStatistics data_page_statistics;
if (ShouldSkipPage(&data_page_statistics)) {
PARQUET_THROW_NOT_OK(stream_->Advance(compressed_len));
continue;
}
if (data_decryptor_ != nullptr) {
UpdateDecryption(data_decryptor_.get(), encryption::kDictionaryPage,
&data_page_aad_);
}
// Read the compressed data page.
PARQUET_ASSIGN_OR_THROW(auto page_buffer, stream_->Read(compressed_len));
if (page_buffer->size() != compressed_len) {
std::stringstream ss;
ss << "Page was smaller (" << page_buffer->size() << ") than expected ("
<< compressed_len << ")";
ParquetException::EofException(ss.str());
}
const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
if (properties_.page_checksum_verification() && current_page_header_.__isset.crc &&
PageCanUseChecksum(page_type)) {
// verify crc
uint32_t checksum =
::arrow::internal::crc32(/* prev */ 0, page_buffer->data(), compressed_len);
if (static_cast<int32_t>(checksum) != current_page_header_.crc) {
throw ParquetException(
"could not verify page integrity, CRC checksum verification failed for "
"page_ordinal " +
std::to_string(page_ordinal_));
}
}
// Decrypt it if we need to
if (data_decryptor_ != nullptr) {
auto decryption_buffer = AllocateBuffer(
properties_.memory_pool(), data_decryptor_->PlaintextLength(compressed_len));
compressed_len = data_decryptor_->Decrypt(
page_buffer->span_as<uint8_t>(), decryption_buffer->mutable_span_as<uint8_t>());
page_buffer = decryption_buffer;
}
if (page_type == PageType::DICTIONARY_PAGE) {
crypto_ctx_.start_decrypt_with_dictionary_page = false;
const format::DictionaryPageHeader& dict_header =
current_page_header_.dictionary_page_header;
bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false;
page_buffer =
DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len);
return std::make_shared<DictionaryPage>(page_buffer, dict_header.num_values,
LoadEnumSafe(&dict_header.encoding),
is_sorted);
} else if (page_type == PageType::DATA_PAGE) {
++page_ordinal_;
const format::DataPageHeader& header = current_page_header_.data_page_header;
page_buffer =
DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len);
return std::make_shared<DataPageV1>(
page_buffer, header.num_values, LoadEnumSafe(&header.encoding),
LoadEnumSafe(&header.definition_level_encoding),
LoadEnumSafe(&header.repetition_level_encoding), uncompressed_len,
std::move(data_page_statistics));
} else if (page_type == PageType::DATA_PAGE_V2) {
++page_ordinal_;
const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
// Arrow prior to 3.0.0 set is_compressed to false but still compressed.
bool is_compressed =
(header.__isset.is_compressed ? header.is_compressed : false) ||
always_compressed_;
// Uncompress if needed
int levels_byte_len;
if (AddWithOverflow(header.definition_levels_byte_length,
header.repetition_levels_byte_length, &levels_byte_len)) {
throw ParquetException("Levels size too large (corrupt file?)");
}
// DecompressIfNeeded doesn't take `is_compressed` into account as
// it's page type-agnostic.
if (is_compressed) {
page_buffer = DecompressIfNeeded(std::move(page_buffer), compressed_len,
uncompressed_len, levels_byte_len);
}
return std::make_shared<DataPageV2>(
page_buffer, header.num_values, header.num_nulls, header.num_rows,
LoadEnumSafe(&header.encoding), header.definition_levels_byte_length,
header.repetition_levels_byte_length, uncompressed_len, is_compressed,
std::move(data_page_statistics));
} else {
throw ParquetException(
"Internal error, we have already skipped non-data pages in ShouldSkipPage()");
}
}
return std::shared_ptr<Page>(nullptr);
}
std::shared_ptr<Buffer> SerializedPageReader::DecompressIfNeeded(
std::shared_ptr<Buffer> page_buffer, int compressed_len, int uncompressed_len,
int levels_byte_len) {
if (decompressor_ == nullptr) {
return page_buffer;
}
if (compressed_len < levels_byte_len || uncompressed_len < levels_byte_len) {
throw ParquetException("Invalid page header");
}
// Grow the uncompressed buffer if we need to.
PARQUET_THROW_NOT_OK(
decompression_buffer_->Resize(uncompressed_len, /*shrink_to_fit=*/false));
if (levels_byte_len > 0) {
// First copy the levels as-is
uint8_t* decompressed = decompression_buffer_->mutable_data();
memcpy(decompressed, page_buffer->data(), levels_byte_len);
}
// GH-31992: DataPageV2 may store only levels and no values when all
// values are null. In this case, Parquet java is known to produce a
// 0-len compressed area (which is invalid compressed input).
// See https://github.com/apache/parquet-java/issues/3122
int64_t decompressed_len = 0;
if (uncompressed_len - levels_byte_len != 0) {
// Decompress the values
PARQUET_ASSIGN_OR_THROW(
decompressed_len,
decompressor_->Decompress(
compressed_len - levels_byte_len, page_buffer->data() + levels_byte_len,
uncompressed_len - levels_byte_len,
decompression_buffer_->mutable_data() + levels_byte_len));
}
if (decompressed_len != uncompressed_len - levels_byte_len) {
throw ParquetException("Page didn't decompress to expected size, expected: " +
std::to_string(uncompressed_len - levels_byte_len) +
", but got:" + std::to_string(decompressed_len));
}
return decompression_buffer_;
}
} // namespace
std::unique_ptr<PageReader> PageReader::Open(std::shared_ptr<ArrowInputStream> stream,
int64_t total_num_values,
Compression::type codec,
const ReaderProperties& properties,
bool always_compressed,
const CryptoContext* ctx) {
return std::unique_ptr<PageReader>(new SerializedPageReader(
std::move(stream), total_num_values, codec, properties, ctx, always_compressed));
}
std::unique_ptr<PageReader> PageReader::Open(std::shared_ptr<ArrowInputStream> stream,
int64_t total_num_values,
Compression::type codec,
bool always_compressed,
::arrow::MemoryPool* pool,
const CryptoContext* ctx) {
return std::unique_ptr<PageReader>(
new SerializedPageReader(std::move(stream), total_num_values, codec,
ReaderProperties(pool), ctx, always_compressed));
}
namespace {
// ----------------------------------------------------------------------
// Impl base class for TypedColumnReader and RecordReader
template <typename DType>
class ColumnReaderImplBase {
public:
using T = typename DType::c_type;
ColumnReaderImplBase(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
: descr_(descr),
max_def_level_(descr->max_definition_level()),
max_rep_level_(descr->max_repetition_level()),
num_buffered_values_(0),
num_decoded_values_(0),
pool_(pool),
current_decoder_(nullptr),
current_encoding_(Encoding::UNKNOWN) {}
virtual ~ColumnReaderImplBase() = default;
protected:
// Read up to batch_size values from the current data page into the
// pre-allocated memory T*
//
// @returns: the number of values read into the out buffer
int64_t ReadValues(int64_t batch_size, T* out) {
int64_t num_decoded = current_decoder_->Decode(out, static_cast<int>(batch_size));
return num_decoded;
}
// Read up to batch_size values from the current data page into the
// pre-allocated memory T*, leaving spaces for null entries according
// to the def_levels.
//
// @returns: the number of values read into the out buffer
int64_t ReadValuesSpaced(int64_t batch_size, T* out, int64_t null_count,
uint8_t* valid_bits, int64_t valid_bits_offset) {
return current_decoder_->DecodeSpaced(out, static_cast<int>(batch_size),
static_cast<int>(null_count), valid_bits,
valid_bits_offset);
}
// Read multiple definition levels into preallocated memory
//
// Returns the number of decoded definition levels
int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels) {
if (max_def_level_ == 0) {
return 0;
}
return definition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
}
bool HasNextInternal() {
// Either there is no data page available yet, or the data page has been
// exhausted
if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) {
if (!ReadNewPage() || num_buffered_values_ == 0) {
return false;
}
}
return true;
}
// Read multiple repetition levels into preallocated memory
// Returns the number of decoded repetition levels
int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels) {
if (max_rep_level_ == 0) {
return 0;
}
return repetition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
}
// Advance to the next data page
bool ReadNewPage() {
// Loop until we find the next data page.
while (true) {
current_page_ = pager_->NextPage();
if (!current_page_) {
// EOS
return false;
}
if (current_page_->type() == PageType::DICTIONARY_PAGE) {
ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
continue;
} else if (current_page_->type() == PageType::DATA_PAGE) {
const auto* page = static_cast<const DataPageV1*>(current_page_.get());
const int64_t levels_byte_size = InitializeLevelDecoders(
*page, page->repetition_level_encoding(), page->definition_level_encoding());
InitializeDataDecoder(*page, levels_byte_size);
return true;
} else if (current_page_->type() == PageType::DATA_PAGE_V2) {
const auto* page = static_cast<const DataPageV2*>(current_page_.get());
int64_t levels_byte_size = InitializeLevelDecodersV2(*page);
InitializeDataDecoder(*page, levels_byte_size);
return true;
} else {
// We don't know what this page type is. We're allowed to skip non-data
// pages.
continue;
}
}
return true;
}
void ConfigureDictionary(const DictionaryPage* page) {
int encoding = static_cast<int>(page->encoding());
if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
page->encoding() == Encoding::PLAIN) {
encoding = static_cast<int>(Encoding::RLE_DICTIONARY);
}
auto it = decoders_.find(encoding);
if (it != decoders_.end()) {
throw ParquetException("Column cannot have more than one dictionary.");
}
if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
page->encoding() == Encoding::PLAIN) {
auto dictionary = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_, pool_);
dictionary->SetData(page->num_values(), page->data(), page->size());
// The dictionary is fully decoded during DictionaryDecoder::Init, so the
// DictionaryPage buffer is no longer required after this step
//
// TODO(wesm): investigate whether this all-or-nothing decoding of the
// dictionary makes sense and whether performance can be improved
std::unique_ptr<DictDecoder<DType>> decoder = MakeDictDecoder<DType>(descr_, pool_);
decoder->SetDict(dictionary.get());
decoders_[encoding] =
std::unique_ptr<DecoderType>(dynamic_cast<DecoderType*>(decoder.release()));
} else {
ParquetException::NYI("only plain dictionary encoding has been implemented");
}
new_dictionary_ = true;
current_decoder_ = decoders_[encoding].get();
ARROW_DCHECK(current_decoder_);
}
// Initialize repetition and definition level decoders on the next data page.
// If the data page includes repetition and definition levels, we
// initialize the level decoders and return the number of encoded level bytes.
// The return value helps determine the number of bytes in the encoded data.
int64_t InitializeLevelDecoders(const DataPage& page,
Encoding::type repetition_level_encoding,
Encoding::type definition_level_encoding) {
// Read a data page.
num_buffered_values_ = page.num_values();
// Have not decoded any values from the data page yet
num_decoded_values_ = 0;
const uint8_t* buffer = page.data();
int32_t levels_byte_size = 0;
int32_t max_size = page.size();
// Data page Layout: Repetition Levels - Definition Levels - encoded values.
// Levels are encoded as rle or bit-packed.
// Init repetition levels
if (max_rep_level_ > 0) {
int32_t rep_levels_bytes = repetition_level_decoder_.SetData(
repetition_level_encoding, max_rep_level_,
static_cast<int>(num_buffered_values_), buffer, max_size);
buffer += rep_levels_bytes;
levels_byte_size += rep_levels_bytes;
max_size -= rep_levels_bytes;
}
// TODO figure a way to set max_def_level_ to 0
// if the initial value is invalid
// Init definition levels
if (max_def_level_ > 0) {
int32_t def_levels_bytes = definition_level_decoder_.SetData(
definition_level_encoding, max_def_level_,
static_cast<int>(num_buffered_values_), buffer, max_size);
levels_byte_size += def_levels_bytes;
max_size -= def_levels_bytes;
}
return levels_byte_size;
}
int64_t InitializeLevelDecodersV2(const DataPageV2& page) {
// Read a data page.
num_buffered_values_ = page.num_values();
// Have not decoded any values from the data page yet
num_decoded_values_ = 0;
const uint8_t* buffer = page.data();
const int64_t total_levels_length =
static_cast<int64_t>(page.repetition_levels_byte_length()) +
page.definition_levels_byte_length();
if (total_levels_length > page.size()) {
throw ParquetException("Data page too small for levels (corrupt header?)");
}
if (max_rep_level_ > 0) {
repetition_level_decoder_.SetDataV2(page.repetition_levels_byte_length(),
max_rep_level_,
static_cast<int>(num_buffered_values_), buffer);
}
// ARROW-17453: Even if max_rep_level_ is 0, there may still be
// repetition level bytes written and/or reported in the header by
// some writers (e.g. Athena)
buffer += page.repetition_levels_byte_length();
if (max_def_level_ > 0) {
definition_level_decoder_.SetDataV2(page.definition_levels_byte_length(),
max_def_level_,
static_cast<int>(num_buffered_values_), buffer);
}
return total_levels_length;
}
// Get a decoder object for this page or create a new decoder if this is the
// first page with this encoding.
void InitializeDataDecoder(const DataPage& page, int64_t levels_byte_size) {
const uint8_t* buffer = page.data() + levels_byte_size;
const int64_t data_size = page.size() - levels_byte_size;
if (data_size < 0) {
throw ParquetException("Page smaller than size of encoded levels");
}
Encoding::type encoding = page.encoding();
if (IsDictionaryIndexEncoding(encoding)) {
// Normalizing the PLAIN_DICTIONARY to RLE_DICTIONARY encoding
// in decoder.
encoding = Encoding::RLE_DICTIONARY;
}
auto it = decoders_.find(static_cast<int>(encoding));
if (it != decoders_.end()) {
ARROW_DCHECK(it->second.get() != nullptr);
current_decoder_ = it->second.get();
} else {
switch (encoding) {
case Encoding::PLAIN:
case Encoding::BYTE_STREAM_SPLIT:
case Encoding::RLE:
case Encoding::DELTA_BINARY_PACKED:
case Encoding::DELTA_BYTE_ARRAY:
case Encoding::DELTA_LENGTH_BYTE_ARRAY: {
auto decoder = MakeTypedDecoder<DType>(encoding, descr_, pool_);
current_decoder_ = decoder.get();
decoders_[static_cast<int>(encoding)] = std::move(decoder);
break;
}
case Encoding::RLE_DICTIONARY:
throw ParquetException("Dictionary page must be before data page.");
default:
throw ParquetException("Unknown encoding type.");
}
}
current_encoding_ = encoding;
current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
static_cast<int>(data_size));
}
// Available values in the current data page, value includes repeated values
// and nulls.
int64_t available_values_current_page() const {
return num_buffered_values_ - num_decoded_values_;
}
const ColumnDescriptor* descr_;
const int16_t max_def_level_;
const int16_t max_rep_level_;
std::unique_ptr<PageReader> pager_;
std::shared_ptr<Page> current_page_;
// Not set if full schema for this field has no optional or repeated elements
LevelDecoder definition_level_decoder_;
// Not set for flat schemas.
LevelDecoder repetition_level_decoder_;
// The total number of values stored in the data page. This is the maximum of
// the number of encoded definition levels or encoded values. For
// non-repeated, required columns, this is equal to the number of encoded
// values. For repeated or optional values, there may be fewer data values
// than levels, and this tells you how many encoded levels there are in that
// case.
int64_t num_buffered_values_;
// The number of values from the current data page that have been decoded
// into memory or skipped over.
int64_t num_decoded_values_;
::arrow::MemoryPool* pool_;
using DecoderType = TypedDecoder<DType>;
DecoderType* current_decoder_;
Encoding::type current_encoding_;
/// Flag to signal when a new dictionary has been set, for the benefit of
/// DictionaryRecordReader
bool new_dictionary_ = false;
// The exposed encoding
ExposedEncoding exposed_encoding_ = ExposedEncoding::NO_ENCODING;
// Map of encoding type to the respective decoder object. For example, a
// column chunk's data pages may include both dictionary-encoded and
// plain-encoded data.
std::unordered_map<int, std::unique_ptr<DecoderType>> decoders_;
void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; }
};
// ----------------------------------------------------------------------
// TypedColumnReader implementations
template <typename DType>
class TypedColumnReaderImpl : public TypedColumnReader<DType>,
public ColumnReaderImplBase<DType> {
public:
using T = typename DType::c_type;
TypedColumnReaderImpl(const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager,
::arrow::MemoryPool* pool)
: ColumnReaderImplBase<DType>(descr, pool) {
this->pager_ = std::move(pager);
}
bool HasNext() override { return this->HasNextInternal(); }
int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
T* values, int64_t* values_read) override;
int64_t Skip(int64_t num_values_to_skip) override;
Type::type type() const override { return this->descr_->physical_type(); }
const ColumnDescriptor* descr() const override { return this->descr_; }
ExposedEncoding GetExposedEncoding() override { return this->exposed_encoding_; };
int64_t ReadBatchWithDictionary(int64_t batch_size, int16_t* def_levels,
int16_t* rep_levels, int32_t* indices,
int64_t* indices_read, const T** dict,
int32_t* dict_len) override;
protected:
void SetExposedEncoding(ExposedEncoding encoding) override {
this->exposed_encoding_ = encoding;
}
// Allocate enough scratch space to accommodate skipping 16-bit levels or any
// value type.
void InitScratchForSkip();
// Scratch space for reading and throwing away rep/def levels and values when
// skipping.
std::shared_ptr<ResizableBuffer> scratch_for_skip_;
private:
// Read dictionary indices. Similar to ReadValues but decode data to dictionary indices.
// This function is called only by ReadBatchWithDictionary().
int64_t ReadDictionaryIndices(int64_t indices_to_read, int32_t* indices) {
auto decoder = dynamic_cast<DictDecoder<DType>*>(this->current_decoder_);
return decoder->DecodeIndices(static_cast<int>(indices_to_read), indices);
}
// Get dictionary. The dictionary should have been set by SetDict(). The dictionary is
// owned by the internal decoder and is destroyed when the reader is destroyed. This
// function is called only by ReadBatchWithDictionary() after dictionary is configured.
void GetDictionary(const T** dictionary, int32_t* dictionary_length) {
auto decoder = dynamic_cast<DictDecoder<DType>*>(this->current_decoder_);
decoder->GetDictionary(dictionary, dictionary_length);
}
// Read definition and repetition levels. Also return the number of definition levels
// and number of values to read. This function is called before reading values.
//
// ReadLevels will throw exception when any num-levels read is not equal to the number
// of the levels can be read.
void ReadLevels(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
int64_t* num_def_levels, int64_t* non_null_values_to_read) {
batch_size = std::min(batch_size, this->available_values_current_page());
// If the field is required and non-repeated, there are no definition levels
if (this->max_def_level_ > 0 && def_levels != nullptr) {
*num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels);
if (ARROW_PREDICT_FALSE(*num_def_levels != batch_size)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
// TODO(wesm): this tallying of values-to-decode can be performed with better
// cache-efficiency if fused with the level decoding.
*non_null_values_to_read +=
std::count(def_levels, def_levels + *num_def_levels, this->max_def_level_);
} else {
// Required field, read all values
if (num_def_levels != nullptr) {
*num_def_levels = 0;
}
*non_null_values_to_read = batch_size;
}
// Not present for non-repeated fields
if (this->max_rep_level_ > 0 && rep_levels != nullptr) {
int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels);
if (batch_size != num_rep_levels) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
}
}
};
template <typename DType>
int64_t TypedColumnReaderImpl<DType>::ReadBatchWithDictionary(
int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, int32_t* indices,
int64_t* indices_read, const T** dict, int32_t* dict_len) {
bool has_dict_output = dict != nullptr && dict_len != nullptr;
// Similar logic as ReadValues to get pages.
if (!HasNext()) {
*indices_read = 0;
if (has_dict_output) {
*dict = nullptr;
*dict_len = 0;
}
return 0;
}
// Verify the current data page is dictionary encoded.
if (this->current_encoding_ != Encoding::RLE_DICTIONARY) {
std::stringstream ss;
ss << "Data page is not dictionary encoded. Encoding: "
<< EncodingToString(this->current_encoding_);
throw ParquetException(ss.str());
}
// Get dictionary pointer and length.
if (has_dict_output) {
GetDictionary(dict, dict_len);
}
// Similar logic as ReadValues to get def levels and rep levels.
int64_t num_def_levels = 0;
int64_t indices_to_read = 0;
ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels, &indices_to_read);
// Read dictionary indices.
*indices_read = ReadDictionaryIndices(indices_to_read, indices);
int64_t total_indices = std::max<int64_t>(num_def_levels, *indices_read);
// Some callers use a batch size of 0 just to get the dictionary.
int64_t expected_values = std::min(batch_size, this->available_values_current_page());
if (total_indices == 0 && expected_values > 0) {
std::stringstream ss;
ss << "Read 0 values, expected " << expected_values;
ParquetException::EofException(ss.str());
}
this->ConsumeBufferedValues(total_indices);
return total_indices;
}
template <typename DType>
int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* def_levels,
int16_t* rep_levels, T* values,
int64_t* values_read) {
// HasNext might invoke ReadNewPage until a data page with
// `available_values_current_page() > 0` is found.
if (!HasNext()) {
*values_read = 0;
return 0;
}
// TODO(wesm): keep reading data pages until batch_size is reached, or the
// row group is finished
int64_t num_def_levels = 0;
// Number of non-null values to read within `num_def_levels`.
int64_t non_null_values_to_read = 0;
ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels,
&non_null_values_to_read);
// Should not return more values than available in the current data page,
// since currently, ReadLevels would only consume level from current
// data page.
if (ARROW_PREDICT_FALSE(num_def_levels > this->available_values_current_page())) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
if (non_null_values_to_read != 0) {
*values_read = this->ReadValues(non_null_values_to_read, values);
} else {
*values_read = 0;
}
// Adjust total_values, since if max_def_level_ == 0, num_def_levels would
// be 0 and `values_read` would adjust to `available_values_current_page()`.
int64_t total_values = std::max<int64_t>(num_def_levels, *values_read);
int64_t expected_values = std::min(batch_size, this->available_values_current_page());
if (total_values == 0 && expected_values > 0) {
std::stringstream ss;
ss << "Read 0 values, expected " << expected_values;
ParquetException::EofException(ss.str());
}
this->ConsumeBufferedValues(total_values);
return total_values;
}
template <typename DType>
void TypedColumnReaderImpl<DType>::InitScratchForSkip() {
if (this->scratch_for_skip_ == nullptr) {
int value_size = type_traits<DType::type_num>::value_byte_size;
this->scratch_for_skip_ = AllocateBuffer(
this->pool_, kSkipScratchBatchSize * std::max<int>(sizeof(int16_t), value_size));
}
}
template <typename DType>
int64_t TypedColumnReaderImpl<DType>::Skip(int64_t num_values_to_skip) {
int64_t values_to_skip = num_values_to_skip;
// Optimization: Do not call HasNext() when values_to_skip == 0.
while (values_to_skip > 0 && HasNext()) {
// If the number of values to skip is more than the number of undecoded values, skip
// the Page.
const int64_t available_values = this->available_values_current_page();
if (values_to_skip >= available_values) {
values_to_skip -= available_values;
this->ConsumeBufferedValues(available_values);
} else {
// We need to read this Page
// Jump to the right offset in the Page
int64_t values_read = 0;
InitScratchForSkip();
ARROW_DCHECK_NE(this->scratch_for_skip_, nullptr);
do {
int64_t batch_size = std::min(kSkipScratchBatchSize, values_to_skip);
values_read = ReadBatch(static_cast<int>(batch_size),
scratch_for_skip_->mutable_data_as<int16_t>(),
scratch_for_skip_->mutable_data_as<int16_t>(),
scratch_for_skip_->mutable_data_as<T>(), &values_read);
values_to_skip -= values_read;
} while (values_read > 0 && values_to_skip > 0);
}
}
return num_values_to_skip - values_to_skip;
}
} // namespace
// ----------------------------------------------------------------------
// Dynamic column reader constructor
std::shared_ptr<ColumnReader> ColumnReader::Make(const ColumnDescriptor* descr,
std::unique_ptr<PageReader> pager,
MemoryPool* pool) {
switch (descr->physical_type()) {
case Type::BOOLEAN:
return std::make_shared<TypedColumnReaderImpl<BooleanType>>(descr, std::move(pager),
pool);
case Type::INT32:
return std::make_shared<TypedColumnReaderImpl<Int32Type>>(descr, std::move(pager),
pool);
case Type::INT64:
return std::make_shared<TypedColumnReaderImpl<Int64Type>>(descr, std::move(pager),
pool);
case Type::INT96:
return std::make_shared<TypedColumnReaderImpl<Int96Type>>(descr, std::move(pager),
pool);
case Type::FLOAT:
return std::make_shared<TypedColumnReaderImpl<FloatType>>(descr, std::move(pager),
pool);
case Type::DOUBLE:
return std::make_shared<TypedColumnReaderImpl<DoubleType>>(descr, std::move(pager),
pool);
case Type::BYTE_ARRAY:
return std::make_shared<TypedColumnReaderImpl<ByteArrayType>>(
descr, std::move(pager), pool);
case Type::FIXED_LEN_BYTE_ARRAY:
return std::make_shared<TypedColumnReaderImpl<FLBAType>>(descr, std::move(pager),
pool);
default:
ParquetException::NYI("type reader not implemented");
}
::arrow::Unreachable();
}
// ----------------------------------------------------------------------
// RecordReader
namespace internal {
namespace {
template <typename DType>
class TypedRecordReader : public TypedColumnReaderImpl<DType>,
virtual public RecordReader {
public:
using T = typename DType::c_type;
using BASE = TypedColumnReaderImpl<DType>;
TypedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, MemoryPool* pool,
bool read_dense_for_nullable)
// Pager must be set using SetPageReader.
: BASE(descr, /* pager = */ nullptr, pool) {
leaf_info_ = leaf_info;
nullable_values_ = leaf_info_.HasNullableValues();
at_record_start_ = true;
values_written_ = 0;
null_count_ = 0;
values_capacity_ = 0;
levels_written_ = 0;
levels_position_ = 0;
levels_capacity_ = 0;
read_dense_for_nullable_ = read_dense_for_nullable;
// FIXED_LEN_BYTE_ARRAY and BYTE_ARRAY values are not stored in the `values_` buffer,
// they are read directly as Arrow.
uses_values_ = (descr->physical_type() != Type::BYTE_ARRAY &&
descr->physical_type() != Type::FIXED_LEN_BYTE_ARRAY);
if (uses_values_) {
values_ = AllocateBuffer(pool);
}
valid_bits_ = AllocateBuffer(pool);
def_levels_ = AllocateBuffer(pool);
rep_levels_ = AllocateBuffer(pool);
TypedRecordReader::Reset();
}
// Compute the values capacity in bytes for the given number of elements
int64_t bytes_for_values(int64_t nitems) const {
int64_t type_size = GetTypeByteSize(this->descr_->physical_type());
int64_t bytes_for_values = -1;
if (MultiplyWithOverflow(nitems, type_size, &bytes_for_values)) {
throw ParquetException("Total size of items too large");
}
return bytes_for_values;
}
const void* ReadDictionary(int32_t* dictionary_length) override {
if (this->current_decoder_ == nullptr && !this->HasNextInternal()) {
*dictionary_length = 0;
return nullptr;
}
// Verify the current data page is dictionary encoded. The current_encoding_ should
// have been set as RLE_DICTIONARY if the page encoding is RLE_DICTIONARY or
// PLAIN_DICTIONARY.
if (this->current_encoding_ != Encoding::RLE_DICTIONARY) {
std::stringstream ss;
ss << "Data page is not dictionary encoded. Encoding: "
<< EncodingToString(this->current_encoding_);
throw ParquetException(ss.str());
}
auto decoder = dynamic_cast<DictDecoder<DType>*>(this->current_decoder_);
const T* dictionary = nullptr;
decoder->GetDictionary(&dictionary, dictionary_length);
return reinterpret_cast<const void*>(dictionary);
}
int64_t ReadRecords(int64_t num_records) override {
if (num_records == 0) return 0;
// Delimit records, then read values at the end
int64_t records_read = 0;
if (has_values_to_process()) {
records_read += ReadRecordData(num_records);
}
int64_t level_batch_size = std::max<int64_t>(kMinLevelBatchSize, num_records);
// If we are in the middle of a record, we continue until reaching the
// desired number of records or the end of the current record if we've found
// enough records
while (!at_record_start_ || records_read < num_records) {
// Is there more data to read in this row group?
if (!this->HasNextInternal()) {
if (!at_record_start_) {
// We ended the row group while inside a record that we haven't seen
// the end of yet. So increment the record count for the last record in
// the row group
++records_read;
at_record_start_ = true;
}
break;
}
/// We perform multiple batch reads until we either exhaust the row group
/// or observe the desired number of records
int64_t batch_size =
std::min(level_batch_size, this->available_values_current_page());
// No more data in column
if (batch_size == 0) {
break;
}
if (this->max_def_level_ > 0) {
ReserveLevels(batch_size);
int16_t* def_levels = this->def_levels() + levels_written_;
int16_t* rep_levels = this->rep_levels() + levels_written_;
if (ARROW_PREDICT_FALSE(this->ReadDefinitionLevels(batch_size, def_levels) !=
batch_size)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
if (this->max_rep_level_ > 0) {
int64_t rep_levels_read = this->ReadRepetitionLevels(batch_size, rep_levels);
if (ARROW_PREDICT_FALSE(rep_levels_read != batch_size)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
}
levels_written_ += batch_size;
records_read += ReadRecordData(num_records - records_read);
} else {
// No repetition and definition levels, we can read values directly
batch_size = std::min(num_records - records_read, batch_size);
records_read += ReadRecordData(batch_size);
}
}
return records_read;
}
// Throw away levels from start_levels_position to levels_position_.
// Will update levels_position_, levels_written_, and levels_capacity_
// accordingly and move the levels to left to fill in the gap.
// It will resize the buffer without releasing the memory allocation.
void ThrowAwayLevels(int64_t start_levels_position) {
ARROW_DCHECK_LE(levels_position_, levels_written_);
ARROW_DCHECK_LE(start_levels_position, levels_position_);
ARROW_DCHECK_GT(this->max_def_level_, 0);
ARROW_DCHECK_NE(def_levels_, nullptr);
int64_t gap = levels_position_ - start_levels_position;
if (gap == 0) return;
int64_t levels_remaining = levels_written_ - gap;
auto left_shift = [&](::arrow::ResizableBuffer* buffer) {
auto* data = buffer->mutable_data_as<int16_t>();
std::copy(data + levels_position_, data + levels_written_,
data + start_levels_position);
PARQUET_THROW_NOT_OK(buffer->Resize(levels_remaining * sizeof(int16_t),
/*shrink_to_fit=*/false));
};
left_shift(def_levels_.get());
if (this->max_rep_level_ > 0) {
ARROW_DCHECK_NE(rep_levels_, nullptr);
left_shift(rep_levels_.get());
}
levels_written_ -= gap;
levels_position_ -= gap;
levels_capacity_ -= gap;
}
// Skip records that we have in our buffer. This function is only for
// non-repeated fields.
int64_t SkipRecordsInBufferNonRepeated(int64_t num_records) {
ARROW_DCHECK_EQ(this->max_rep_level_, 0);
if (!this->has_values_to_process() || num_records == 0) return 0;
int64_t remaining_records = levels_written_ - levels_position_;
int64_t skipped_records = std::min(num_records, remaining_records);
int64_t start_levels_position = levels_position_;
// Since there is no repetition, number of levels equals number of records.
levels_position_ += skipped_records;
// We skipped the levels by incrementing 'levels_position_'. For values
// we do not have a buffer, so we need to read them and throw them away.
// First we need to figure out how many present/not-null values there are.
int64_t values_to_read =
std::count(def_levels() + start_levels_position, def_levels() + levels_position_,
this->max_def_level_);
// Now that we have figured out number of values to read, we do not need
// these levels anymore. We will remove these values from the buffer.
// This requires shifting the levels in the buffer to left. So this will
// update levels_position_ and levels_written_.
ThrowAwayLevels(start_levels_position);
// For values, we do not have them in buffer, so we will read them and
// throw them away.
ReadAndThrowAwayValues(values_to_read);
// Mark the levels as read in the underlying column reader.
this->ConsumeBufferedValues(skipped_records);
return skipped_records;
}
// Attempts to skip num_records from the buffer. Will throw away levels
// and corresponding values for the records it skipped and consumes them from the
// underlying decoder. Will advance levels_position_ and update
// at_record_start_.
// Returns how many records were skipped.
int64_t DelimitAndSkipRecordsInBuffer(int64_t num_records) {
if (num_records == 0) return 0;
// Look at the buffered levels, delimit them based on
// (rep_level == 0), report back how many records are in there, and
// fill in how many not-null values (def_level == max_def_level_).
// DelimitRecords updates levels_position_.
int64_t start_levels_position = levels_position_;
int64_t values_seen = 0;
int64_t skipped_records = DelimitRecords(num_records, &values_seen);
ReadAndThrowAwayValues(values_seen);
// Mark those levels and values as consumed in the underlying page.
// This must be done before we throw away levels since it updates
// levels_position_ and levels_written_.
this->ConsumeBufferedValues(levels_position_ - start_levels_position);
// Updated levels_position_ and levels_written_.
ThrowAwayLevels(start_levels_position);
return skipped_records;
}
// Skip records for repeated fields. For repeated fields, we are technically
// reading and throwing away the levels and values since we do not know the record
// boundaries in advance. Keep filling the buffer and skipping until we reach the
// desired number of records or we run out of values in the column chunk.
// Returns number of skipped records.
int64_t SkipRecordsRepeated(int64_t num_records) {
ARROW_DCHECK_GT(this->max_rep_level_, 0);
int64_t skipped_records = 0;
// First consume what is in the buffer.
if (levels_position_ < levels_written_) {
// This updates at_record_start_.
skipped_records = DelimitAndSkipRecordsInBuffer(num_records);
}
int64_t level_batch_size =
std::max<int64_t>(kMinLevelBatchSize, num_records - skipped_records);
// If 'at_record_start_' is false, but (skipped_records == num_records), it
// means that for the last record that was counted, we have not seen all
// of its values yet.
while (!at_record_start_ || skipped_records < num_records) {
// Is there more data to read in this row group?
// HasNextInternal() will advance to the next page if necessary.
if (!this->HasNextInternal()) {
if (!at_record_start_) {
// We ended the row group while inside a record that we haven't seen
// the end of yet. So increment the record count for the last record
// in the row group
++skipped_records;
at_record_start_ = true;
}
break;
}
// Read some more levels.
int64_t batch_size =
std::min(level_batch_size, this->available_values_current_page());
// No more data in column. This must be an empty page.
// If we had exhausted the last page, HasNextInternal() must have advanced
// to the next page. So there must be available values to process.
if (batch_size == 0) {
break;
}
// For skipping we will read the levels and append them to the end
// of the def_levels and rep_levels just like for read.
ReserveLevels(batch_size);
int16_t* def_levels = this->def_levels() + levels_written_;
int16_t* rep_levels = this->rep_levels() + levels_written_;
if (this->ReadDefinitionLevels(batch_size, def_levels) != batch_size) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
if (this->ReadRepetitionLevels(batch_size, rep_levels) != batch_size) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
levels_written_ += batch_size;
int64_t remaining_records = num_records - skipped_records;
// This updates at_record_start_.
skipped_records += DelimitAndSkipRecordsInBuffer(remaining_records);
}
return skipped_records;
}
// Read 'num_values' values and throw them away.
// Throws an error if it could not read 'num_values'.
void ReadAndThrowAwayValues(int64_t num_values) {
int64_t values_left = num_values;
int64_t values_read = 0;
// Allocate enough scratch space to accommodate 16-bit levels or any
// value type
this->InitScratchForSkip();
ARROW_DCHECK_NE(this->scratch_for_skip_, nullptr);
do {
int64_t batch_size = std::min<int64_t>(kSkipScratchBatchSize, values_left);
values_read = this->ReadValues(
batch_size, this->scratch_for_skip_->template mutable_data_as<T>());
values_left -= values_read;
} while (values_read > 0 && values_left > 0);
if (values_left > 0) {
std::stringstream ss;
ss << "Could not read and throw away " << num_values << " values";
throw ParquetException(ss.str());
}
}
int64_t SkipRecords(int64_t num_records) override {
if (num_records == 0) return 0;
// Top level required field. Number of records equals to number of levels,
// and there is not read-ahead for levels.
if (this->max_rep_level_ == 0 && this->max_def_level_ == 0) {
return this->Skip(num_records);
}
int64_t skipped_records = 0;
if (this->max_rep_level_ == 0) {
// Non-repeated optional field.
// First consume whatever is in the buffer.
skipped_records = SkipRecordsInBufferNonRepeated(num_records);
ARROW_DCHECK_LE(skipped_records, num_records);
// For records that we have not buffered, we will use the column
// reader's Skip to do the remaining Skip. Since the field is not
// repeated number of levels to skip is the same as number of records
// to skip.
skipped_records += this->Skip(num_records - skipped_records);
} else {
skipped_records += this->SkipRecordsRepeated(num_records);
}
return skipped_records;
}
// We may outwardly have the appearance of having exhausted a column chunk
// when in fact we are in the middle of processing the last batch
bool has_values_to_process() const { return levels_position_ < levels_written_; }
std::shared_ptr<ResizableBuffer> ReleaseValues() override {
if (uses_values_) {
auto result = values_;
PARQUET_THROW_NOT_OK(
result->Resize(bytes_for_values(values_written_), /*shrink_to_fit=*/true));
values_ = AllocateBuffer(this->pool_);
values_capacity_ = 0;
return result;
} else {
return nullptr;
}
}
std::shared_ptr<ResizableBuffer> ReleaseIsValid() override {
if (nullable_values()) {
auto result = valid_bits_;
PARQUET_THROW_NOT_OK(result->Resize(bit_util::BytesForBits(values_written_),
/*shrink_to_fit=*/true));
valid_bits_ = AllocateBuffer(this->pool_);
return result;
} else {
return nullptr;
}
}
// Process written repetition/definition levels to reach the end of
// records. Only used for repeated fields.
// Process no more levels than necessary to delimit the indicated
// number of logical records. Updates internal state of RecordReader
//
// \return Number of records delimited
int64_t DelimitRecords(int64_t num_records, int64_t* values_seen) {
if (ARROW_PREDICT_FALSE(num_records == 0 || levels_position_ == levels_written_)) {
*values_seen = 0;
return 0;
}
int64_t records_read = 0;
const int16_t* const rep_levels = this->rep_levels();
const int16_t* const def_levels = this->def_levels();
ARROW_DCHECK_GT(this->max_rep_level_, 0);
// If at_record_start_ is true, we are seeing the start of a record
// for the second time, such as after repeated calls to
// DelimitRecords. In this case we must continue until we find
// another record start or exhausting the ColumnChunk
int64_t level = levels_position_;
if (at_record_start_) {
if (ARROW_PREDICT_FALSE(rep_levels[levels_position_] != 0)) {
std::stringstream ss;
ss << "The repetition level at the start of a record must be 0 but got "
<< rep_levels[levels_position_];
throw ParquetException(ss.str());
}
++levels_position_;
// We have decided to consume the level at this position; therefore we
// must advance until we find another record boundary
at_record_start_ = false;
}
// Count logical records and number of non-null values to read
ARROW_DCHECK(!at_record_start_);
// Scan repetition levels to find record end
while (levels_position_ < levels_written_) {
// We use an estimated batch size to simplify branching and
// improve performance in the common case. This might slow
// things down a bit if a single long record remains, though.
int64_t stride =
std::min(levels_written_ - levels_position_, num_records - records_read);
const int64_t position_end = levels_position_ + stride;
for (int64_t i = levels_position_; i < position_end; ++i) {
records_read += rep_levels[i] == 0;
}
levels_position_ = position_end;
if (records_read == num_records) {
// Check last rep_level reaches the boundary and
// pop the last level.
ARROW_CHECK_EQ(rep_levels[levels_position_ - 1], 0);
--levels_position_;
// We've found the number of records we were looking for. Set
// at_record_start_ to true and break
at_record_start_ = true;
break;
}
}
// Scan definition levels to find number of physical values
*values_seen = std::count(def_levels + level, def_levels + levels_position_,
this->max_def_level_);
return records_read;
}
void Reserve(int64_t capacity) override {
ReserveLevels(capacity);
ReserveValues(capacity);
}
int64_t UpdateCapacity(int64_t capacity, int64_t size, int64_t extra_size) {
if (extra_size < 0) {
throw ParquetException("Negative size (corrupt file?)");
}
int64_t target_size = -1;
if (AddWithOverflow(size, extra_size, &target_size)) {
throw ParquetException("Allocation size too large (corrupt file?)");
}
if (target_size >= (1LL << 62)) {
throw ParquetException("Allocation size too large (corrupt file?)");
}
if (capacity >= target_size) {
return capacity;
}
return bit_util::NextPower2(target_size);
}
void ReserveLevels(int64_t extra_levels) {
if (this->max_def_level_ > 0) {
const int64_t new_levels_capacity =
UpdateCapacity(levels_capacity_, levels_written_, extra_levels);
if (new_levels_capacity > levels_capacity_) {
constexpr auto kItemSize = static_cast<int64_t>(sizeof(int16_t));
int64_t capacity_in_bytes = -1;
if (MultiplyWithOverflow(new_levels_capacity, kItemSize, &capacity_in_bytes)) {
throw ParquetException("Allocation size too large (corrupt file?)");
}
PARQUET_THROW_NOT_OK(
def_levels_->Resize(capacity_in_bytes, /*shrink_to_fit=*/false));
if (this->max_rep_level_ > 0) {
PARQUET_THROW_NOT_OK(
rep_levels_->Resize(capacity_in_bytes, /*shrink_to_fit=*/false));
}
levels_capacity_ = new_levels_capacity;
}
}
}
virtual void ReserveValues(int64_t extra_values) {
const int64_t new_values_capacity =
UpdateCapacity(values_capacity_, values_written_, extra_values);
if (new_values_capacity > values_capacity_) {
// XXX(wesm): A hack to avoid memory allocation when reading directly
// into builder classes
if (uses_values_) {
PARQUET_THROW_NOT_OK(values_->Resize(bytes_for_values(new_values_capacity),
/*shrink_to_fit=*/false));
}
values_capacity_ = new_values_capacity;
}
if (nullable_values() && !read_dense_for_nullable_) {
int64_t valid_bytes_new = bit_util::BytesForBits(values_capacity_);
if (valid_bits_->size() < valid_bytes_new) {
int64_t valid_bytes_old = bit_util::BytesForBits(values_written_);
PARQUET_THROW_NOT_OK(
valid_bits_->Resize(valid_bytes_new, /*shrink_to_fit=*/false));
// Avoid valgrind warnings
memset(valid_bits_->mutable_data() + valid_bytes_old, 0,
static_cast<size_t>(valid_bytes_new - valid_bytes_old));
}
}
}
void Reset() override {
ResetValues();
if (levels_written_ > 0) {
// Throw away levels from 0 to levels_position_.
ThrowAwayLevels(0);
}
// Call Finish on the binary builders to reset them
}
void SetPageReader(std::unique_ptr<PageReader> reader) override {
at_record_start_ = true;
this->pager_ = std::move(reader);
ResetDecoders();
}
bool HasMoreData() const override { return this->pager_ != nullptr; }
const ColumnDescriptor* descr() const override { return this->descr_; }
// Dictionary decoders must be reset when advancing row groups
void ResetDecoders() { this->decoders_.clear(); }
virtual void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) {
uint8_t* valid_bits = valid_bits_->mutable_data();
const int64_t valid_bits_offset = values_written_;
int64_t num_decoded = this->current_decoder_->DecodeSpaced(
ValuesHead<T>(), static_cast<int>(values_with_nulls),
static_cast<int>(null_count), valid_bits, valid_bits_offset);
CheckNumberDecoded(num_decoded, values_with_nulls);
}
virtual void ReadValuesDense(int64_t values_to_read) {
int64_t num_decoded =
this->current_decoder_->Decode(ValuesHead<T>(), static_cast<int>(values_to_read));
CheckNumberDecoded(num_decoded, values_to_read);
}
// Reads repeated records and returns number of records read. Fills in
// values_to_read and null_count.
int64_t ReadRepeatedRecords(int64_t num_records, int64_t* values_to_read,
int64_t* null_count) {
const int64_t start_levels_position = levels_position_;
// Note that repeated records may be required or nullable. If they have
// an optional parent in the path, they will be nullable, otherwise,
// they are required. We use leaf_info_->HasNullableValues() that looks
// at repeated_ancestor_def_level to determine if it is required or
// nullable. Even if they are required, we may have to read ahead and
// delimit the records to get the right number of values and they will
// have associated levels.
int64_t records_read = DelimitRecords(num_records, values_to_read);
if (!nullable_values() || read_dense_for_nullable_) {
ReadValuesDense(*values_to_read);
// null_count is always 0 for required.
ARROW_DCHECK_EQ(*null_count, 0);
} else {
ReadSpacedForOptionalOrRepeated(start_levels_position, values_to_read, null_count);
}
return records_read;
}
// Reads optional records and returns number of records read. Fills in
// values_to_read and null_count.
int64_t ReadOptionalRecords(int64_t num_records, int64_t* values_to_read,
int64_t* null_count) {
const int64_t start_levels_position = levels_position_;
// No repetition levels, skip delimiting logic. Each level represents a
// null or not null entry
int64_t records_read =
std::min<int64_t>(levels_written_ - levels_position_, num_records);
// This is advanced by DelimitRecords for the repeated field case above.
levels_position_ += records_read;
// Optional fields are always nullable.
if (read_dense_for_nullable_) {
ReadDenseForOptional(start_levels_position, values_to_read);
// We don't need to update null_count when reading dense. It should be
// already set to 0.
ARROW_DCHECK_EQ(*null_count, 0);
} else {
ReadSpacedForOptionalOrRepeated(start_levels_position, values_to_read, null_count);
}
return records_read;
}
// Reads required records and returns number of records read. Fills in
// values_to_read.
int64_t ReadRequiredRecords(int64_t num_records, int64_t* values_to_read) {
*values_to_read = num_records;
ReadValuesDense(*values_to_read);
return num_records;
}
// Reads dense for optional records. First it figures out how many values to
// read.
void ReadDenseForOptional(int64_t start_levels_position, int64_t* values_to_read) {
// levels_position_ must already be incremented based on number of records
// read.
ARROW_DCHECK_GE(levels_position_, start_levels_position);
// When reading dense we need to figure out number of values to read.
const int16_t* def_levels = this->def_levels();
*values_to_read += std::count(def_levels + start_levels_position,
def_levels + levels_position_, this->max_def_level_);
ReadValuesDense(*values_to_read);
}
// Reads spaced for optional or repeated fields.
void ReadSpacedForOptionalOrRepeated(int64_t start_levels_position,
int64_t* values_to_read, int64_t* null_count) {
// levels_position_ must already be incremented based on number of records
// read.
ARROW_DCHECK_GE(levels_position_, start_levels_position);
ValidityBitmapInputOutput validity_io;
validity_io.values_read_upper_bound = levels_position_ - start_levels_position;
validity_io.valid_bits = valid_bits_->mutable_data();
validity_io.valid_bits_offset = values_written_;
DefLevelsToBitmap(def_levels() + start_levels_position,
levels_position_ - start_levels_position, leaf_info_, &validity_io);
*values_to_read = validity_io.values_read - validity_io.null_count;
*null_count = validity_io.null_count;
ARROW_DCHECK_GE(*values_to_read, 0);
ARROW_DCHECK_GE(*null_count, 0);
ReadValuesSpaced(validity_io.values_read, *null_count);
}
// Return number of logical records read.
// Updates levels_position_, values_written_, and null_count_.
int64_t ReadRecordData(int64_t num_records) {
// Conservative upper bound
const int64_t possible_num_values =
std::max<int64_t>(num_records, levels_written_ - levels_position_);
ReserveValues(static_cast<size_t>(possible_num_values));
const int64_t start_levels_position = levels_position_;
// To be updated by the function calls below for each of the repetition
// types.
int64_t records_read = 0;
int64_t values_to_read = 0;
int64_t null_count = 0;
if (this->max_rep_level_ > 0) {
// Repeated fields may be nullable or not.
// This call updates levels_position_.
records_read = ReadRepeatedRecords(num_records, &values_to_read, &null_count);
} else if (this->max_def_level_ > 0) {
// Non-repeated optional values are always nullable.
// This call updates levels_position_.
ARROW_DCHECK(nullable_values());
records_read = ReadOptionalRecords(num_records, &values_to_read, &null_count);
} else {
ARROW_DCHECK(!nullable_values());
records_read = ReadRequiredRecords(num_records, &values_to_read);
// We don't need to update null_count, since it is 0.
}
ARROW_DCHECK_GE(records_read, 0);
ARROW_DCHECK_GE(values_to_read, 0);
ARROW_DCHECK_GE(null_count, 0);
if (read_dense_for_nullable_) {
values_written_ += values_to_read;
ARROW_DCHECK_EQ(null_count, 0);
} else {
values_written_ += values_to_read + null_count;
null_count_ += null_count;
}
// Total values, including null spaces, if any
if (this->max_def_level_ > 0) {
// Optional, repeated, or some mix thereof
this->ConsumeBufferedValues(levels_position_ - start_levels_position);
} else {
// Flat, non-repeated
this->ConsumeBufferedValues(values_to_read);
}
return records_read;
}
void DebugPrintState() override {
const int16_t* def_levels = this->def_levels();
const int16_t* rep_levels = this->rep_levels();
const int64_t total_levels_read = levels_position_;
const T* vals = reinterpret_cast<const T*>(this->values());
if (leaf_info_.def_level > 0) {
std::cout << "def levels: ";
for (int64_t i = 0; i < total_levels_read; ++i) {
std::cout << def_levels[i] << " ";
}
std::cout << std::endl;
}
if (leaf_info_.rep_level > 0) {
std::cout << "rep levels: ";
for (int64_t i = 0; i < total_levels_read; ++i) {
std::cout << rep_levels[i] << " ";
}
std::cout << std::endl;
}
std::cout << "values: ";
for (int64_t i = 0; i < this->values_written(); ++i) {
std::cout << vals[i] << " ";
}
std::cout << std::endl;
}
void ResetValues() {
if (values_written_ > 0) {
// Resize to 0, but do not shrink to fit
if (uses_values_) {
PARQUET_THROW_NOT_OK(values_->Resize(0, /*shrink_to_fit=*/false));
}
PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, /*shrink_to_fit=*/false));
values_written_ = 0;
values_capacity_ = 0;
null_count_ = 0;
}
}
protected:
template <typename T>
T* ValuesHead() {
return values_->mutable_data_as<T>() + values_written_;
}
LevelInfo leaf_info_;
};
/// In FLBARecordReader, we read fixed length byte array values.
///
/// Unlike other fixed length types, the `values_` buffer is not used to store
/// values, instead we use `data_builder_` to store the values, and `null_bitmap_builder_`
/// is used to store the null bitmap.
///
/// The `values_` buffer is used to store the temporary values for `Decode`, and it would
/// be Reset after each `Decode` call. The `valid_bits_` buffer is never used.
class FLBARecordReader final : public TypedRecordReader<FLBAType>,
virtual public BinaryRecordReader {
public:
FLBARecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
::arrow::MemoryPool* pool, bool read_dense_for_nullable)
: TypedRecordReader<FLBAType>(descr, leaf_info, pool, read_dense_for_nullable),
byte_width_(descr_->type_length()),
type_(::arrow::fixed_size_binary(byte_width_)),
array_builder_(type_, pool) {
ARROW_DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY);
}
::arrow::ArrayVector GetBuilderChunks() override {
PARQUET_ASSIGN_OR_THROW(auto chunk, array_builder_.Finish());
return ::arrow::ArrayVector{std::move(chunk)};
}
void ReserveValues(int64_t extra_values) override {
ARROW_DCHECK(!uses_values_);
TypedRecordReader::ReserveValues(extra_values);
PARQUET_THROW_NOT_OK(array_builder_.Reserve(extra_values));
}
void ReadValuesDense(int64_t values_to_read) override {
int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull(
static_cast<int>(values_to_read), &array_builder_);
CheckNumberDecoded(num_decoded, values_to_read);
ResetValues();
}
void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
int64_t num_decoded = this->current_decoder_->DecodeArrow(
static_cast<int>(values_to_read), static_cast<int>(null_count),
valid_bits_->mutable_data(), values_written_, &array_builder_);
CheckNumberDecoded(num_decoded, values_to_read - null_count);
ResetValues();
}
private:
const int byte_width_;
std::shared_ptr<::arrow::DataType> type_;
::arrow::FixedSizeBinaryBuilder array_builder_;
};
/// ByteArrayRecordReader reads variable length byte array values.
///
/// It only calls `DecodeArrowNonNull` and `DecodeArrow` to read values, and
/// `Decode` and `DecodeSpaced` are not used.
///
/// The `values_` buffers are never used, and the `accumulator_`
/// is used to store the values.
class ByteArrayChunkedRecordReader final : public TypedRecordReader<ByteArrayType>,
virtual public BinaryRecordReader {
public:
ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
::arrow::MemoryPool* pool, bool read_dense_for_nullable,
const std::shared_ptr<::arrow::DataType>& arrow_type)
: TypedRecordReader<ByteArrayType>(descr, leaf_info, pool,
read_dense_for_nullable) {
ARROW_DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY);
auto arrow_binary_type = arrow_type ? arrow_type->id() : ::arrow::Type::BINARY;
switch (arrow_binary_type) {
case ::arrow::Type::BINARY:
accumulator_.builder = std::make_unique<::arrow::BinaryBuilder>(pool);
break;
case ::arrow::Type::STRING:
accumulator_.builder = std::make_unique<::arrow::StringBuilder>(pool);
break;
case ::arrow::Type::LARGE_BINARY:
accumulator_.builder = std::make_unique<::arrow::LargeBinaryBuilder>(pool);
break;
case ::arrow::Type::LARGE_STRING:
accumulator_.builder = std::make_unique<::arrow::LargeStringBuilder>(pool);
break;
case ::arrow::Type::BINARY_VIEW:
accumulator_.builder = std::make_unique<::arrow::BinaryViewBuilder>(pool);
break;
case ::arrow::Type::STRING_VIEW:
accumulator_.builder = std::make_unique<::arrow::StringViewBuilder>(pool);
break;
default:
throw ParquetException("cannot read Parquet BYTE_ARRAY as Arrow " +
arrow_type->ToString());
}
}
::arrow::ArrayVector GetBuilderChunks() override {
::arrow::ArrayVector result = accumulator_.chunks;
if (result.empty() || accumulator_.builder->length() > 0) {
std::shared_ptr<::arrow::Array> last_chunk;
PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk));
result.push_back(std::move(last_chunk));
}
accumulator_.chunks = {};
return result;
}
void ReserveValues(int64_t extra_values) override {
ARROW_DCHECK(!uses_values_);
TypedRecordReader::ReserveValues(extra_values);
PARQUET_THROW_NOT_OK(accumulator_.builder->Reserve(extra_values));
}
void ReadValuesDense(int64_t values_to_read) override {
int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull(
static_cast<int>(values_to_read), &accumulator_);
CheckNumberDecoded(num_decoded, values_to_read);
ResetValues();
}
void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
int64_t num_decoded = this->current_decoder_->DecodeArrow(
static_cast<int>(values_to_read), static_cast<int>(null_count),
valid_bits_->mutable_data(), values_written_, &accumulator_);
CheckNumberDecoded(num_decoded, values_to_read - null_count);
ResetValues();
}
private:
// Helper data structure for accumulating builder chunks
typename EncodingTraits<ByteArrayType>::Accumulator accumulator_;
};
/// ByteArrayDictionaryRecordReader reads into ::arrow::dictionary(index: int32,
/// values: binary).
///
/// If underlying column is dictionary encoded, it will call `DecodeIndices` to read,
/// otherwise it will call `DecodeArrowNonNull` to read.
class ByteArrayDictionaryRecordReader final : public TypedRecordReader<ByteArrayType>,
virtual public DictionaryRecordReader {
public:
ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
::arrow::MemoryPool* pool, bool read_dense_for_nullable)
: TypedRecordReader<ByteArrayType>(descr, leaf_info, pool, read_dense_for_nullable),
builder_(pool) {
this->read_dictionary_ = true;
}
std::shared_ptr<::arrow::ChunkedArray> GetResult() override {
FlushBuilder();
std::vector<std::shared_ptr<::arrow::Array>> result;
std::swap(result, result_chunks_);
return std::make_shared<::arrow::ChunkedArray>(std::move(result), builder_.type());
}
void FlushBuilder() {
if (builder_.length() > 0) {
std::shared_ptr<::arrow::Array> chunk;
PARQUET_THROW_NOT_OK(builder_.Finish(&chunk));
result_chunks_.emplace_back(std::move(chunk));
// Also clears the dictionary memo table
builder_.Reset();
}
}
void MaybeWriteNewDictionary() {
if (this->new_dictionary_) {
/// If there is a new dictionary, we may need to flush the builder, then
/// insert the new dictionary values
FlushBuilder();
builder_.ResetFull();
auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_);
decoder->InsertDictionary(&builder_);
this->new_dictionary_ = false;
}
}
void ReadValuesDense(int64_t values_to_read) override {
int64_t num_decoded = 0;
if (current_encoding_ == Encoding::RLE_DICTIONARY) {
MaybeWriteNewDictionary();
auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_);
num_decoded = decoder->DecodeIndices(static_cast<int>(values_to_read), &builder_);
} else {
num_decoded = this->current_decoder_->DecodeArrowNonNull(
static_cast<int>(values_to_read), &builder_);
}
// Flush values since they have been copied into the builder
ResetValues();
CheckNumberDecoded(num_decoded, values_to_read);
}
void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
int64_t num_decoded = 0;
if (current_encoding_ == Encoding::RLE_DICTIONARY) {
MaybeWriteNewDictionary();
auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_);
num_decoded = decoder->DecodeIndicesSpaced(
static_cast<int>(values_to_read), static_cast<int>(null_count),
valid_bits_->mutable_data(), values_written_, &builder_);
} else {
num_decoded = this->current_decoder_->DecodeArrow(
static_cast<int>(values_to_read), static_cast<int>(null_count),
valid_bits_->mutable_data(), values_written_, &builder_);
}
ARROW_DCHECK_EQ(num_decoded, values_to_read - null_count);
// Flush values since they have been copied into the builder
ResetValues();
}
private:
using BinaryDictDecoder = DictDecoder<ByteArrayType>;
::arrow::BinaryDictionary32Builder builder_;
std::vector<std::shared_ptr<::arrow::Array>> result_chunks_;
};
// TODO(wesm): Implement these to some satisfaction
template <>
void TypedRecordReader<Int96Type>::DebugPrintState() {}
template <>
void TypedRecordReader<ByteArrayType>::DebugPrintState() {}
template <>
void TypedRecordReader<FLBAType>::DebugPrintState() {}
std::shared_ptr<RecordReader> MakeByteArrayRecordReader(
const ColumnDescriptor* descr, LevelInfo leaf_info, ::arrow::MemoryPool* pool,
bool read_dictionary, bool read_dense_for_nullable,
const std::shared_ptr<::arrow::DataType>& arrow_type) {
if (read_dictionary) {
return std::make_shared<ByteArrayDictionaryRecordReader>(descr, leaf_info, pool,
read_dense_for_nullable);
} else {
return std::make_shared<ByteArrayChunkedRecordReader>(
descr, leaf_info, pool, read_dense_for_nullable, arrow_type);
}
}
} // namespace
std::shared_ptr<RecordReader> RecordReader::Make(
const ColumnDescriptor* descr, LevelInfo leaf_info, MemoryPool* pool,
bool read_dictionary, bool read_dense_for_nullable,
const std::shared_ptr<::arrow::DataType>& arrow_type) {
switch (descr->physical_type()) {
case Type::BOOLEAN:
return std::make_shared<TypedRecordReader<BooleanType>>(descr, leaf_info, pool,
read_dense_for_nullable);
case Type::INT32:
return std::make_shared<TypedRecordReader<Int32Type>>(descr, leaf_info, pool,
read_dense_for_nullable);
case Type::INT64:
return std::make_shared<TypedRecordReader<Int64Type>>(descr, leaf_info, pool,
read_dense_for_nullable);
case Type::INT96:
return std::make_shared<TypedRecordReader<Int96Type>>(descr, leaf_info, pool,
read_dense_for_nullable);
case Type::FLOAT:
return std::make_shared<TypedRecordReader<FloatType>>(descr, leaf_info, pool,
read_dense_for_nullable);
case Type::DOUBLE:
return std::make_shared<TypedRecordReader<DoubleType>>(descr, leaf_info, pool,
read_dense_for_nullable);
case Type::BYTE_ARRAY: {
return MakeByteArrayRecordReader(descr, leaf_info, pool, read_dictionary,
read_dense_for_nullable, arrow_type);
}
case Type::FIXED_LEN_BYTE_ARRAY:
return std::make_shared<FLBARecordReader>(descr, leaf_info, pool,
read_dense_for_nullable);
default: {
// PARQUET-1481: This can occur if the file is corrupt
std::stringstream ss;
ss << "Invalid physical column type: " << static_cast<int>(descr->physical_type());
throw ParquetException(ss.str());
}
}
// Unreachable code, but suppress compiler warning
return nullptr;
}
} // namespace internal
} // namespace parquet