| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "parquet/column_reader.h" |
| |
| #include <algorithm> |
| #include <cstdint> |
| #include <cstring> |
| #include <exception> |
| #include <iostream> |
| #include <memory> |
| #include <string> |
| #include <type_traits> |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include "arrow/array.h" |
| #include "arrow/array/array_binary.h" |
| #include "arrow/array/builder_binary.h" |
| #include "arrow/array/builder_dict.h" |
| #include "arrow/array/builder_primitive.h" |
| #include "arrow/chunked_array.h" |
| #include "arrow/type.h" |
| #include "arrow/util/bit_stream_utils_internal.h" |
| #include "arrow/util/bit_util.h" |
| #include "arrow/util/checked_cast.h" |
| #include "arrow/util/compression.h" |
| #include "arrow/util/crc32.h" |
| #include "arrow/util/int_util_overflow.h" |
| #include "arrow/util/logging.h" |
| #include "arrow/util/rle_encoding_internal.h" |
| #include "arrow/util/unreachable.h" |
| #include "parquet/column_page.h" |
| #include "parquet/encoding.h" |
| #include "parquet/encryption/encryption_internal.h" |
| #include "parquet/encryption/internal_file_decryptor.h" |
| #include "parquet/exception.h" |
| #include "parquet/level_comparison.h" |
| #include "parquet/level_conversion.h" |
| #include "parquet/properties.h" |
| #include "parquet/statistics.h" |
| #include "parquet/thrift_internal.h" // IWYU pragma: keep |
| #include "parquet/windows_fixup.h" // for OPTIONAL |
| |
| #ifdef _MSC_VER |
| // disable warning about inheritance via dominance in the diamond pattern |
| # pragma warning(disable : 4250) |
| #endif |
| |
| using arrow::MemoryPool; |
| using arrow::internal::AddWithOverflow; |
| using arrow::internal::checked_cast; |
| using arrow::internal::MultiplyWithOverflow; |
| |
| namespace bit_util = arrow::bit_util; |
| |
| namespace parquet { |
| |
| namespace { |
| |
| // The minimum number of repetition/definition levels to decode at a time, for |
| // better vectorized performance when doing many smaller record reads |
| constexpr int64_t kMinLevelBatchSize = 1024; |
| |
| // Batch size for reading and throwing away values during skip. |
| // Both RecordReader and the ColumnReader use this for skipping. |
| constexpr int64_t kSkipScratchBatchSize = 1024; |
| |
| // Throws exception if number_decoded does not match expected. |
| inline void CheckNumberDecoded(int64_t number_decoded, int64_t expected) { |
| if (ARROW_PREDICT_FALSE(number_decoded != expected)) { |
| ParquetException::EofException("Decoded values " + std::to_string(number_decoded) + |
| " does not match expected " + |
| std::to_string(expected)); |
| } |
| } |
| |
| constexpr std::string_view kErrorRepDefLevelNotMatchesNumValues = |
| "Number of decoded rep / def levels do not match num_values in page header"; |
| |
| } // namespace |
| |
| LevelDecoder::LevelDecoder() : num_values_remaining_(0) {} |
| |
| LevelDecoder::~LevelDecoder() = default; |
| |
| int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level, |
| int num_buffered_values, const uint8_t* data, |
| int32_t data_size) { |
| max_level_ = max_level; |
| int32_t num_bytes = 0; |
| encoding_ = encoding; |
| num_values_remaining_ = num_buffered_values; |
| bit_width_ = bit_util::Log2(max_level + 1); |
| switch (encoding) { |
| case Encoding::RLE: { |
| if (data_size < 4) { |
| throw ParquetException("Received invalid levels (corrupt data page?)"); |
| } |
| num_bytes = ::arrow::util::SafeLoadAs<int32_t>(data); |
| if (num_bytes < 0 || num_bytes > data_size - 4) { |
| throw ParquetException("Received invalid number of bytes (corrupt data page?)"); |
| } |
| const uint8_t* decoder_data = data + 4; |
| if (!rle_decoder_) { |
| rle_decoder_ = std::make_unique<::arrow::util::RleBitPackedDecoder<int16_t>>( |
| decoder_data, num_bytes, bit_width_); |
| } else { |
| rle_decoder_->Reset(decoder_data, num_bytes, bit_width_); |
| } |
| return 4 + num_bytes; |
| } |
| case Encoding::BIT_PACKED: { |
| int num_bits = 0; |
| if (MultiplyWithOverflow(num_buffered_values, bit_width_, &num_bits)) { |
| throw ParquetException( |
| "Number of buffered values too large (corrupt data page?)"); |
| } |
| num_bytes = static_cast<int32_t>(bit_util::BytesForBits(num_bits)); |
| if (num_bytes < 0 || num_bytes > data_size) { |
| throw ParquetException("Received invalid number of bytes (corrupt data page?)"); |
| } |
| if (!bit_packed_decoder_) { |
| bit_packed_decoder_ = |
| std::make_unique<::arrow::bit_util::BitReader>(data, num_bytes); |
| } else { |
| bit_packed_decoder_->Reset(data, num_bytes); |
| } |
| return num_bytes; |
| } |
| default: |
| throw ParquetException("Unknown encoding type for levels."); |
| } |
| return -1; |
| } |
| |
| void LevelDecoder::SetDataV2(int32_t num_bytes, int16_t max_level, |
| int num_buffered_values, const uint8_t* data) { |
| max_level_ = max_level; |
| // Repetition and definition levels always uses RLE encoding |
| // in the DataPageV2 format. |
| if (num_bytes < 0) { |
| throw ParquetException("Invalid page header (corrupt data page?)"); |
| } |
| encoding_ = Encoding::RLE; |
| num_values_remaining_ = num_buffered_values; |
| bit_width_ = bit_util::Log2(max_level + 1); |
| |
| if (!rle_decoder_) { |
| rle_decoder_ = std::make_unique<::arrow::util::RleBitPackedDecoder<int16_t>>( |
| data, num_bytes, bit_width_); |
| } else { |
| rle_decoder_->Reset(data, num_bytes, bit_width_); |
| } |
| } |
| |
| int LevelDecoder::Decode(int batch_size, int16_t* levels) { |
| int num_decoded = 0; |
| |
| int num_values = std::min(num_values_remaining_, batch_size); |
| if (encoding_ == Encoding::RLE) { |
| num_decoded = rle_decoder_->GetBatch(levels, num_values); |
| } else { |
| num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values); |
| } |
| if (num_decoded > 0) { |
| internal::MinMax min_max = internal::FindMinMax(levels, num_decoded); |
| if (ARROW_PREDICT_FALSE(min_max.min < 0 || min_max.max > max_level_)) { |
| std::stringstream ss; |
| ss << "Malformed levels. min: " << min_max.min << " max: " << min_max.max |
| << " out of range. Max Level: " << max_level_; |
| throw ParquetException(ss.str()); |
| } |
| } |
| num_values_remaining_ -= num_decoded; |
| return num_decoded; |
| } |
| |
| ReaderProperties default_reader_properties() { |
| static ReaderProperties default_reader_properties; |
| return default_reader_properties; |
| } |
| |
| namespace { |
| |
| // Extracts encoded statistics from V1 and V2 data page headers |
| template <typename H> |
| EncodedStatistics ExtractStatsFromHeader(const H& header) { |
| EncodedStatistics page_statistics; |
| if (header.__isset.statistics) { |
| page_statistics = FromThrift(header.statistics); |
| } |
| return page_statistics; |
| } |
| |
| void CheckNumValuesInHeader(int num_values) { |
| if (num_values < 0) { |
| throw ParquetException("Invalid page header (negative number of values)"); |
| } |
| } |
| |
| // ---------------------------------------------------------------------- |
| // SerializedPageReader deserializes Thrift metadata and pages that have been |
| // assembled in a serialized stream for storing in a Parquet files |
| |
| // This subclass delimits pages appearing in a serialized stream, each preceded |
| // by a serialized Thrift format::PageHeader indicating the type of each page |
| // and the page metadata. |
| class SerializedPageReader : public PageReader { |
| public: |
| SerializedPageReader(std::shared_ptr<ArrowInputStream> stream, int64_t total_num_values, |
| Compression::type codec, const ReaderProperties& properties, |
| const CryptoContext* crypto_ctx, bool always_compressed) |
| : properties_(properties), |
| stream_(std::move(stream)), |
| decompression_buffer_(AllocateBuffer(properties_.memory_pool(), 0)), |
| page_ordinal_(0), |
| seen_num_values_(0), |
| total_num_values_(total_num_values) { |
| if (crypto_ctx != nullptr) { |
| crypto_ctx_ = *crypto_ctx; |
| InitDecryption(); |
| } |
| max_page_header_size_ = kDefaultMaxPageHeaderSize; |
| decompressor_ = GetCodec(codec); |
| always_compressed_ = always_compressed; |
| } |
| |
| // Implement the PageReader interface |
| // |
| // The returned Page contains references that aren't guaranteed to live |
| // beyond the next call to NextPage(). SerializedPageReader reuses the |
| // decompression buffer internally, so if NextPage() is |
| // called then the content of previous page might be invalidated. |
| std::shared_ptr<Page> NextPage() override; |
| |
| void set_max_page_header_size(uint32_t size) override { max_page_header_size_ = size; } |
| |
| private: |
| void UpdateDecryption(Decryptor* decryptor, int8_t module_type, std::string* page_aad); |
| |
| void InitDecryption(); |
| |
| std::shared_ptr<Buffer> DecompressIfNeeded(std::shared_ptr<Buffer> page_buffer, |
| int compressed_len, int uncompressed_len, |
| int levels_byte_len = 0); |
| |
| // Returns true for non-data pages, and if we should skip based on |
| // data_page_filter_. Performs basic checks on values in the page header. |
| // Fills in data_page_statistics. |
| bool ShouldSkipPage(EncodedStatistics* data_page_statistics); |
| |
| const ReaderProperties properties_; |
| std::shared_ptr<ArrowInputStream> stream_; |
| |
| format::PageHeader current_page_header_; |
| |
| // Compression codec to use. |
| std::unique_ptr<::arrow::util::Codec> decompressor_; |
| std::shared_ptr<ResizableBuffer> decompression_buffer_; |
| |
| bool always_compressed_; |
| |
| // The fields below are used for calculation of AAD (additional authenticated data) |
| // suffix which is part of the Parquet Modular Encryption. |
| // The AAD suffix for a parquet module is built internally by |
| // concatenating different parts some of which include |
| // the row group ordinal, column ordinal and page ordinal. |
| // Please refer to the encryption specification for more details: |
| // https://github.com/apache/parquet-format/blob/encryption/Encryption.md#44-additional-authenticated-data |
| |
| // The CryptoContext used by this PageReader. |
| CryptoContext crypto_ctx_; |
| // This PageReader has its own Decryptor instances in order to be thread-safe. |
| std::unique_ptr<Decryptor> meta_decryptor_; |
| std::unique_ptr<Decryptor> data_decryptor_; |
| |
| // The ordinal fields in the context below are used for AAD suffix calculation. |
| int32_t page_ordinal_; // page ordinal does not count the dictionary page |
| |
| // Maximum allowed page size |
| uint32_t max_page_header_size_; |
| |
| // Number of values read in data pages so far |
| int64_t seen_num_values_; |
| |
| // Number of values in all the data pages |
| int64_t total_num_values_; |
| |
| // data_page_aad_ and data_page_header_aad_ contain the AAD for data page and data page |
| // header in a single column respectively. |
| // While calculating AAD for different pages in a single column the pages AAD is |
| // updated by only the page ordinal. |
| std::string data_page_aad_; |
| std::string data_page_header_aad_; |
| }; |
| |
| void SerializedPageReader::InitDecryption() { |
| // Prepare the AAD for quick update later. |
| if (crypto_ctx_.data_decryptor_factory) { |
| data_decryptor_ = crypto_ctx_.data_decryptor_factory(); |
| if (data_decryptor_) { |
| ARROW_DCHECK(!data_decryptor_->file_aad().empty()); |
| data_page_aad_ = encryption::CreateModuleAad( |
| data_decryptor_->file_aad(), encryption::kDataPage, |
| crypto_ctx_.row_group_ordinal, crypto_ctx_.column_ordinal, kNonPageOrdinal); |
| } |
| } |
| if (crypto_ctx_.meta_decryptor_factory) { |
| meta_decryptor_ = crypto_ctx_.meta_decryptor_factory(); |
| if (meta_decryptor_) { |
| ARROW_DCHECK(!meta_decryptor_->file_aad().empty()); |
| data_page_header_aad_ = encryption::CreateModuleAad( |
| meta_decryptor_->file_aad(), encryption::kDataPageHeader, |
| crypto_ctx_.row_group_ordinal, crypto_ctx_.column_ordinal, kNonPageOrdinal); |
| } |
| } |
| } |
| |
| void SerializedPageReader::UpdateDecryption(Decryptor* decryptor, int8_t module_type, |
| std::string* page_aad) { |
| ARROW_DCHECK(decryptor != nullptr); |
| if (crypto_ctx_.start_decrypt_with_dictionary_page) { |
| UpdateDecryptor(decryptor, crypto_ctx_.row_group_ordinal, crypto_ctx_.column_ordinal, |
| module_type); |
| } else { |
| encryption::QuickUpdatePageAad(page_ordinal_, page_aad); |
| decryptor->UpdateAad(*page_aad); |
| } |
| } |
| |
| bool SerializedPageReader::ShouldSkipPage(EncodedStatistics* data_page_statistics) { |
| const PageType::type page_type = LoadEnumSafe(¤t_page_header_.type); |
| if (page_type == PageType::DATA_PAGE) { |
| const format::DataPageHeader& header = current_page_header_.data_page_header; |
| CheckNumValuesInHeader(header.num_values); |
| *data_page_statistics = ExtractStatsFromHeader(header); |
| seen_num_values_ += header.num_values; |
| if (data_page_filter_) { |
| const EncodedStatistics* filter_statistics = |
| data_page_statistics->is_set() ? data_page_statistics : nullptr; |
| DataPageStats data_page_stats(filter_statistics, header.num_values, |
| /*num_rows=*/std::nullopt); |
| if (data_page_filter_(data_page_stats)) { |
| return true; |
| } |
| } |
| } else if (page_type == PageType::DATA_PAGE_V2) { |
| const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2; |
| CheckNumValuesInHeader(header.num_values); |
| if (header.num_rows < 0) { |
| throw ParquetException("Invalid page header (negative number of rows)"); |
| } |
| if (header.definition_levels_byte_length < 0 || |
| header.repetition_levels_byte_length < 0) { |
| throw ParquetException("Invalid page header (negative levels byte length)"); |
| } |
| *data_page_statistics = ExtractStatsFromHeader(header); |
| seen_num_values_ += header.num_values; |
| if (data_page_filter_) { |
| const EncodedStatistics* filter_statistics = |
| data_page_statistics->is_set() ? data_page_statistics : nullptr; |
| DataPageStats data_page_stats(filter_statistics, header.num_values, |
| header.num_rows); |
| if (data_page_filter_(data_page_stats)) { |
| return true; |
| } |
| } |
| } else if (page_type == PageType::DICTIONARY_PAGE) { |
| const format::DictionaryPageHeader& dict_header = |
| current_page_header_.dictionary_page_header; |
| CheckNumValuesInHeader(dict_header.num_values); |
| } else { |
| // We don't know what this page type is. We're allowed to skip non-data |
| // pages. |
| return true; |
| } |
| return false; |
| } |
| |
| std::shared_ptr<Page> SerializedPageReader::NextPage() { |
| ThriftDeserializer deserializer(properties_); |
| |
| // Loop here because there may be unhandled page types that we skip until |
| // finding a page that we do know what to do with |
| while (seen_num_values_ < total_num_values_) { |
| uint32_t header_size = 0; |
| uint32_t allowed_page_size = kDefaultPageHeaderSize; |
| |
| // Page headers can be very large because of page statistics |
| // We try to deserialize a larger buffer progressively |
| // until a maximum allowed header limit |
| while (true) { |
| PARQUET_ASSIGN_OR_THROW(auto view, stream_->Peek(allowed_page_size)); |
| if (view.size() == 0) return nullptr; |
| |
| // This gets used, then set by DeserializeThriftMsg |
| header_size = static_cast<uint32_t>(view.size()); |
| try { |
| if (meta_decryptor_ != nullptr) { |
| UpdateDecryption(meta_decryptor_.get(), encryption::kDictionaryPageHeader, |
| &data_page_header_aad_); |
| } |
| // Reset current page header to avoid unclearing the __isset flag. |
| current_page_header_ = format::PageHeader(); |
| deserializer.DeserializeMessage(reinterpret_cast<const uint8_t*>(view.data()), |
| &header_size, ¤t_page_header_, |
| meta_decryptor_.get()); |
| break; |
| } catch (std::exception& e) { |
| // Failed to deserialize. Double the allowed page header size and try again |
| std::stringstream ss; |
| ss << e.what(); |
| allowed_page_size *= 2; |
| if (allowed_page_size > max_page_header_size_) { |
| ss << "Deserializing page header failed.\n"; |
| throw ParquetException(ss.str()); |
| } |
| } |
| } |
| // Advance the stream offset |
| PARQUET_THROW_NOT_OK(stream_->Advance(header_size)); |
| |
| int32_t compressed_len = current_page_header_.compressed_page_size; |
| int32_t uncompressed_len = current_page_header_.uncompressed_page_size; |
| if (compressed_len < 0 || uncompressed_len < 0) { |
| throw ParquetException("Invalid page header"); |
| } |
| |
| EncodedStatistics data_page_statistics; |
| if (ShouldSkipPage(&data_page_statistics)) { |
| PARQUET_THROW_NOT_OK(stream_->Advance(compressed_len)); |
| continue; |
| } |
| |
| if (data_decryptor_ != nullptr) { |
| UpdateDecryption(data_decryptor_.get(), encryption::kDictionaryPage, |
| &data_page_aad_); |
| } |
| |
| // Read the compressed data page. |
| PARQUET_ASSIGN_OR_THROW(auto page_buffer, stream_->Read(compressed_len)); |
| if (page_buffer->size() != compressed_len) { |
| std::stringstream ss; |
| ss << "Page was smaller (" << page_buffer->size() << ") than expected (" |
| << compressed_len << ")"; |
| ParquetException::EofException(ss.str()); |
| } |
| |
| const PageType::type page_type = LoadEnumSafe(¤t_page_header_.type); |
| |
| if (properties_.page_checksum_verification() && current_page_header_.__isset.crc && |
| PageCanUseChecksum(page_type)) { |
| // verify crc |
| uint32_t checksum = |
| ::arrow::internal::crc32(/* prev */ 0, page_buffer->data(), compressed_len); |
| if (static_cast<int32_t>(checksum) != current_page_header_.crc) { |
| throw ParquetException( |
| "could not verify page integrity, CRC checksum verification failed for " |
| "page_ordinal " + |
| std::to_string(page_ordinal_)); |
| } |
| } |
| |
| // Decrypt it if we need to |
| if (data_decryptor_ != nullptr) { |
| auto decryption_buffer = AllocateBuffer( |
| properties_.memory_pool(), data_decryptor_->PlaintextLength(compressed_len)); |
| compressed_len = data_decryptor_->Decrypt( |
| page_buffer->span_as<uint8_t>(), decryption_buffer->mutable_span_as<uint8_t>()); |
| |
| page_buffer = decryption_buffer; |
| } |
| |
| if (page_type == PageType::DICTIONARY_PAGE) { |
| crypto_ctx_.start_decrypt_with_dictionary_page = false; |
| const format::DictionaryPageHeader& dict_header = |
| current_page_header_.dictionary_page_header; |
| bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false; |
| |
| page_buffer = |
| DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len); |
| |
| return std::make_shared<DictionaryPage>(page_buffer, dict_header.num_values, |
| LoadEnumSafe(&dict_header.encoding), |
| is_sorted); |
| } else if (page_type == PageType::DATA_PAGE) { |
| ++page_ordinal_; |
| const format::DataPageHeader& header = current_page_header_.data_page_header; |
| page_buffer = |
| DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len); |
| |
| return std::make_shared<DataPageV1>( |
| page_buffer, header.num_values, LoadEnumSafe(&header.encoding), |
| LoadEnumSafe(&header.definition_level_encoding), |
| LoadEnumSafe(&header.repetition_level_encoding), uncompressed_len, |
| std::move(data_page_statistics)); |
| } else if (page_type == PageType::DATA_PAGE_V2) { |
| ++page_ordinal_; |
| const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2; |
| |
| // Arrow prior to 3.0.0 set is_compressed to false but still compressed. |
| bool is_compressed = |
| (header.__isset.is_compressed ? header.is_compressed : false) || |
| always_compressed_; |
| |
| // Uncompress if needed |
| int levels_byte_len; |
| if (AddWithOverflow(header.definition_levels_byte_length, |
| header.repetition_levels_byte_length, &levels_byte_len)) { |
| throw ParquetException("Levels size too large (corrupt file?)"); |
| } |
| // DecompressIfNeeded doesn't take `is_compressed` into account as |
| // it's page type-agnostic. |
| if (is_compressed) { |
| page_buffer = DecompressIfNeeded(std::move(page_buffer), compressed_len, |
| uncompressed_len, levels_byte_len); |
| } |
| |
| return std::make_shared<DataPageV2>( |
| page_buffer, header.num_values, header.num_nulls, header.num_rows, |
| LoadEnumSafe(&header.encoding), header.definition_levels_byte_length, |
| header.repetition_levels_byte_length, uncompressed_len, is_compressed, |
| std::move(data_page_statistics)); |
| } else { |
| throw ParquetException( |
| "Internal error, we have already skipped non-data pages in ShouldSkipPage()"); |
| } |
| } |
| return std::shared_ptr<Page>(nullptr); |
| } |
| |
| std::shared_ptr<Buffer> SerializedPageReader::DecompressIfNeeded( |
| std::shared_ptr<Buffer> page_buffer, int compressed_len, int uncompressed_len, |
| int levels_byte_len) { |
| if (decompressor_ == nullptr) { |
| return page_buffer; |
| } |
| if (compressed_len < levels_byte_len || uncompressed_len < levels_byte_len) { |
| throw ParquetException("Invalid page header"); |
| } |
| |
| // Grow the uncompressed buffer if we need to. |
| PARQUET_THROW_NOT_OK( |
| decompression_buffer_->Resize(uncompressed_len, /*shrink_to_fit=*/false)); |
| |
| if (levels_byte_len > 0) { |
| // First copy the levels as-is |
| uint8_t* decompressed = decompression_buffer_->mutable_data(); |
| memcpy(decompressed, page_buffer->data(), levels_byte_len); |
| } |
| |
| // GH-31992: DataPageV2 may store only levels and no values when all |
| // values are null. In this case, Parquet java is known to produce a |
| // 0-len compressed area (which is invalid compressed input). |
| // See https://github.com/apache/parquet-java/issues/3122 |
| int64_t decompressed_len = 0; |
| if (uncompressed_len - levels_byte_len != 0) { |
| // Decompress the values |
| PARQUET_ASSIGN_OR_THROW( |
| decompressed_len, |
| decompressor_->Decompress( |
| compressed_len - levels_byte_len, page_buffer->data() + levels_byte_len, |
| uncompressed_len - levels_byte_len, |
| decompression_buffer_->mutable_data() + levels_byte_len)); |
| } |
| |
| if (decompressed_len != uncompressed_len - levels_byte_len) { |
| throw ParquetException("Page didn't decompress to expected size, expected: " + |
| std::to_string(uncompressed_len - levels_byte_len) + |
| ", but got:" + std::to_string(decompressed_len)); |
| } |
| |
| return decompression_buffer_; |
| } |
| |
| } // namespace |
| |
| std::unique_ptr<PageReader> PageReader::Open(std::shared_ptr<ArrowInputStream> stream, |
| int64_t total_num_values, |
| Compression::type codec, |
| const ReaderProperties& properties, |
| bool always_compressed, |
| const CryptoContext* ctx) { |
| return std::unique_ptr<PageReader>(new SerializedPageReader( |
| std::move(stream), total_num_values, codec, properties, ctx, always_compressed)); |
| } |
| |
| std::unique_ptr<PageReader> PageReader::Open(std::shared_ptr<ArrowInputStream> stream, |
| int64_t total_num_values, |
| Compression::type codec, |
| bool always_compressed, |
| ::arrow::MemoryPool* pool, |
| const CryptoContext* ctx) { |
| return std::unique_ptr<PageReader>( |
| new SerializedPageReader(std::move(stream), total_num_values, codec, |
| ReaderProperties(pool), ctx, always_compressed)); |
| } |
| |
| namespace { |
| |
| // ---------------------------------------------------------------------- |
| // Impl base class for TypedColumnReader and RecordReader |
| |
| template <typename DType> |
| class ColumnReaderImplBase { |
| public: |
| using T = typename DType::c_type; |
| |
| ColumnReaderImplBase(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) |
| : descr_(descr), |
| max_def_level_(descr->max_definition_level()), |
| max_rep_level_(descr->max_repetition_level()), |
| num_buffered_values_(0), |
| num_decoded_values_(0), |
| pool_(pool), |
| current_decoder_(nullptr), |
| current_encoding_(Encoding::UNKNOWN) {} |
| |
| virtual ~ColumnReaderImplBase() = default; |
| |
| protected: |
| // Read up to batch_size values from the current data page into the |
| // pre-allocated memory T* |
| // |
| // @returns: the number of values read into the out buffer |
| int64_t ReadValues(int64_t batch_size, T* out) { |
| int64_t num_decoded = current_decoder_->Decode(out, static_cast<int>(batch_size)); |
| return num_decoded; |
| } |
| |
| // Read up to batch_size values from the current data page into the |
| // pre-allocated memory T*, leaving spaces for null entries according |
| // to the def_levels. |
| // |
| // @returns: the number of values read into the out buffer |
| int64_t ReadValuesSpaced(int64_t batch_size, T* out, int64_t null_count, |
| uint8_t* valid_bits, int64_t valid_bits_offset) { |
| return current_decoder_->DecodeSpaced(out, static_cast<int>(batch_size), |
| static_cast<int>(null_count), valid_bits, |
| valid_bits_offset); |
| } |
| |
| // Read multiple definition levels into preallocated memory |
| // |
| // Returns the number of decoded definition levels |
| int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels) { |
| if (max_def_level_ == 0) { |
| return 0; |
| } |
| return definition_level_decoder_.Decode(static_cast<int>(batch_size), levels); |
| } |
| |
| bool HasNextInternal() { |
| // Either there is no data page available yet, or the data page has been |
| // exhausted |
| if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) { |
| if (!ReadNewPage() || num_buffered_values_ == 0) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| // Read multiple repetition levels into preallocated memory |
| // Returns the number of decoded repetition levels |
| int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels) { |
| if (max_rep_level_ == 0) { |
| return 0; |
| } |
| return repetition_level_decoder_.Decode(static_cast<int>(batch_size), levels); |
| } |
| |
| // Advance to the next data page |
| bool ReadNewPage() { |
| // Loop until we find the next data page. |
| while (true) { |
| current_page_ = pager_->NextPage(); |
| if (!current_page_) { |
| // EOS |
| return false; |
| } |
| |
| if (current_page_->type() == PageType::DICTIONARY_PAGE) { |
| ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get())); |
| continue; |
| } else if (current_page_->type() == PageType::DATA_PAGE) { |
| const auto* page = static_cast<const DataPageV1*>(current_page_.get()); |
| const int64_t levels_byte_size = InitializeLevelDecoders( |
| *page, page->repetition_level_encoding(), page->definition_level_encoding()); |
| InitializeDataDecoder(*page, levels_byte_size); |
| return true; |
| } else if (current_page_->type() == PageType::DATA_PAGE_V2) { |
| const auto* page = static_cast<const DataPageV2*>(current_page_.get()); |
| int64_t levels_byte_size = InitializeLevelDecodersV2(*page); |
| InitializeDataDecoder(*page, levels_byte_size); |
| return true; |
| } else { |
| // We don't know what this page type is. We're allowed to skip non-data |
| // pages. |
| continue; |
| } |
| } |
| return true; |
| } |
| |
| void ConfigureDictionary(const DictionaryPage* page) { |
| int encoding = static_cast<int>(page->encoding()); |
| if (page->encoding() == Encoding::PLAIN_DICTIONARY || |
| page->encoding() == Encoding::PLAIN) { |
| encoding = static_cast<int>(Encoding::RLE_DICTIONARY); |
| } |
| |
| auto it = decoders_.find(encoding); |
| if (it != decoders_.end()) { |
| throw ParquetException("Column cannot have more than one dictionary."); |
| } |
| |
| if (page->encoding() == Encoding::PLAIN_DICTIONARY || |
| page->encoding() == Encoding::PLAIN) { |
| auto dictionary = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_, pool_); |
| dictionary->SetData(page->num_values(), page->data(), page->size()); |
| |
| // The dictionary is fully decoded during DictionaryDecoder::Init, so the |
| // DictionaryPage buffer is no longer required after this step |
| // |
| // TODO(wesm): investigate whether this all-or-nothing decoding of the |
| // dictionary makes sense and whether performance can be improved |
| |
| std::unique_ptr<DictDecoder<DType>> decoder = MakeDictDecoder<DType>(descr_, pool_); |
| decoder->SetDict(dictionary.get()); |
| decoders_[encoding] = |
| std::unique_ptr<DecoderType>(dynamic_cast<DecoderType*>(decoder.release())); |
| } else { |
| ParquetException::NYI("only plain dictionary encoding has been implemented"); |
| } |
| |
| new_dictionary_ = true; |
| current_decoder_ = decoders_[encoding].get(); |
| ARROW_DCHECK(current_decoder_); |
| } |
| |
| // Initialize repetition and definition level decoders on the next data page. |
| |
| // If the data page includes repetition and definition levels, we |
| // initialize the level decoders and return the number of encoded level bytes. |
| // The return value helps determine the number of bytes in the encoded data. |
| int64_t InitializeLevelDecoders(const DataPage& page, |
| Encoding::type repetition_level_encoding, |
| Encoding::type definition_level_encoding) { |
| // Read a data page. |
| num_buffered_values_ = page.num_values(); |
| |
| // Have not decoded any values from the data page yet |
| num_decoded_values_ = 0; |
| |
| const uint8_t* buffer = page.data(); |
| int32_t levels_byte_size = 0; |
| int32_t max_size = page.size(); |
| |
| // Data page Layout: Repetition Levels - Definition Levels - encoded values. |
| // Levels are encoded as rle or bit-packed. |
| // Init repetition levels |
| if (max_rep_level_ > 0) { |
| int32_t rep_levels_bytes = repetition_level_decoder_.SetData( |
| repetition_level_encoding, max_rep_level_, |
| static_cast<int>(num_buffered_values_), buffer, max_size); |
| buffer += rep_levels_bytes; |
| levels_byte_size += rep_levels_bytes; |
| max_size -= rep_levels_bytes; |
| } |
| // TODO figure a way to set max_def_level_ to 0 |
| // if the initial value is invalid |
| |
| // Init definition levels |
| if (max_def_level_ > 0) { |
| int32_t def_levels_bytes = definition_level_decoder_.SetData( |
| definition_level_encoding, max_def_level_, |
| static_cast<int>(num_buffered_values_), buffer, max_size); |
| levels_byte_size += def_levels_bytes; |
| max_size -= def_levels_bytes; |
| } |
| |
| return levels_byte_size; |
| } |
| |
| int64_t InitializeLevelDecodersV2(const DataPageV2& page) { |
| // Read a data page. |
| num_buffered_values_ = page.num_values(); |
| |
| // Have not decoded any values from the data page yet |
| num_decoded_values_ = 0; |
| const uint8_t* buffer = page.data(); |
| |
| const int64_t total_levels_length = |
| static_cast<int64_t>(page.repetition_levels_byte_length()) + |
| page.definition_levels_byte_length(); |
| |
| if (total_levels_length > page.size()) { |
| throw ParquetException("Data page too small for levels (corrupt header?)"); |
| } |
| |
| if (max_rep_level_ > 0) { |
| repetition_level_decoder_.SetDataV2(page.repetition_levels_byte_length(), |
| max_rep_level_, |
| static_cast<int>(num_buffered_values_), buffer); |
| } |
| // ARROW-17453: Even if max_rep_level_ is 0, there may still be |
| // repetition level bytes written and/or reported in the header by |
| // some writers (e.g. Athena) |
| buffer += page.repetition_levels_byte_length(); |
| |
| if (max_def_level_ > 0) { |
| definition_level_decoder_.SetDataV2(page.definition_levels_byte_length(), |
| max_def_level_, |
| static_cast<int>(num_buffered_values_), buffer); |
| } |
| |
| return total_levels_length; |
| } |
| |
| // Get a decoder object for this page or create a new decoder if this is the |
| // first page with this encoding. |
| void InitializeDataDecoder(const DataPage& page, int64_t levels_byte_size) { |
| const uint8_t* buffer = page.data() + levels_byte_size; |
| const int64_t data_size = page.size() - levels_byte_size; |
| |
| if (data_size < 0) { |
| throw ParquetException("Page smaller than size of encoded levels"); |
| } |
| |
| Encoding::type encoding = page.encoding(); |
| if (IsDictionaryIndexEncoding(encoding)) { |
| // Normalizing the PLAIN_DICTIONARY to RLE_DICTIONARY encoding |
| // in decoder. |
| encoding = Encoding::RLE_DICTIONARY; |
| } |
| |
| auto it = decoders_.find(static_cast<int>(encoding)); |
| if (it != decoders_.end()) { |
| ARROW_DCHECK(it->second.get() != nullptr); |
| current_decoder_ = it->second.get(); |
| } else { |
| switch (encoding) { |
| case Encoding::PLAIN: |
| case Encoding::BYTE_STREAM_SPLIT: |
| case Encoding::RLE: |
| case Encoding::DELTA_BINARY_PACKED: |
| case Encoding::DELTA_BYTE_ARRAY: |
| case Encoding::DELTA_LENGTH_BYTE_ARRAY: { |
| auto decoder = MakeTypedDecoder<DType>(encoding, descr_, pool_); |
| current_decoder_ = decoder.get(); |
| decoders_[static_cast<int>(encoding)] = std::move(decoder); |
| break; |
| } |
| |
| case Encoding::RLE_DICTIONARY: |
| throw ParquetException("Dictionary page must be before data page."); |
| |
| default: |
| throw ParquetException("Unknown encoding type."); |
| } |
| } |
| current_encoding_ = encoding; |
| current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer, |
| static_cast<int>(data_size)); |
| } |
| |
| // Available values in the current data page, value includes repeated values |
| // and nulls. |
| int64_t available_values_current_page() const { |
| return num_buffered_values_ - num_decoded_values_; |
| } |
| |
| const ColumnDescriptor* descr_; |
| const int16_t max_def_level_; |
| const int16_t max_rep_level_; |
| |
| std::unique_ptr<PageReader> pager_; |
| std::shared_ptr<Page> current_page_; |
| |
| // Not set if full schema for this field has no optional or repeated elements |
| LevelDecoder definition_level_decoder_; |
| |
| // Not set for flat schemas. |
| LevelDecoder repetition_level_decoder_; |
| |
| // The total number of values stored in the data page. This is the maximum of |
| // the number of encoded definition levels or encoded values. For |
| // non-repeated, required columns, this is equal to the number of encoded |
| // values. For repeated or optional values, there may be fewer data values |
| // than levels, and this tells you how many encoded levels there are in that |
| // case. |
| int64_t num_buffered_values_; |
| |
| // The number of values from the current data page that have been decoded |
| // into memory or skipped over. |
| int64_t num_decoded_values_; |
| |
| ::arrow::MemoryPool* pool_; |
| |
| using DecoderType = TypedDecoder<DType>; |
| DecoderType* current_decoder_; |
| Encoding::type current_encoding_; |
| |
| /// Flag to signal when a new dictionary has been set, for the benefit of |
| /// DictionaryRecordReader |
| bool new_dictionary_ = false; |
| |
| // The exposed encoding |
| ExposedEncoding exposed_encoding_ = ExposedEncoding::NO_ENCODING; |
| |
| // Map of encoding type to the respective decoder object. For example, a |
| // column chunk's data pages may include both dictionary-encoded and |
| // plain-encoded data. |
| std::unordered_map<int, std::unique_ptr<DecoderType>> decoders_; |
| |
| void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; } |
| }; |
| |
| // ---------------------------------------------------------------------- |
| // TypedColumnReader implementations |
| |
| template <typename DType> |
| class TypedColumnReaderImpl : public TypedColumnReader<DType>, |
| public ColumnReaderImplBase<DType> { |
| public: |
| using T = typename DType::c_type; |
| |
| TypedColumnReaderImpl(const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager, |
| ::arrow::MemoryPool* pool) |
| : ColumnReaderImplBase<DType>(descr, pool) { |
| this->pager_ = std::move(pager); |
| } |
| |
| bool HasNext() override { return this->HasNextInternal(); } |
| |
| int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, |
| T* values, int64_t* values_read) override; |
| |
| int64_t Skip(int64_t num_values_to_skip) override; |
| |
| Type::type type() const override { return this->descr_->physical_type(); } |
| |
| const ColumnDescriptor* descr() const override { return this->descr_; } |
| |
| ExposedEncoding GetExposedEncoding() override { return this->exposed_encoding_; }; |
| |
| int64_t ReadBatchWithDictionary(int64_t batch_size, int16_t* def_levels, |
| int16_t* rep_levels, int32_t* indices, |
| int64_t* indices_read, const T** dict, |
| int32_t* dict_len) override; |
| |
| protected: |
| void SetExposedEncoding(ExposedEncoding encoding) override { |
| this->exposed_encoding_ = encoding; |
| } |
| |
| // Allocate enough scratch space to accommodate skipping 16-bit levels or any |
| // value type. |
| void InitScratchForSkip(); |
| |
| // Scratch space for reading and throwing away rep/def levels and values when |
| // skipping. |
| std::shared_ptr<ResizableBuffer> scratch_for_skip_; |
| |
| private: |
| // Read dictionary indices. Similar to ReadValues but decode data to dictionary indices. |
| // This function is called only by ReadBatchWithDictionary(). |
| int64_t ReadDictionaryIndices(int64_t indices_to_read, int32_t* indices) { |
| auto decoder = dynamic_cast<DictDecoder<DType>*>(this->current_decoder_); |
| return decoder->DecodeIndices(static_cast<int>(indices_to_read), indices); |
| } |
| |
| // Get dictionary. The dictionary should have been set by SetDict(). The dictionary is |
| // owned by the internal decoder and is destroyed when the reader is destroyed. This |
| // function is called only by ReadBatchWithDictionary() after dictionary is configured. |
| void GetDictionary(const T** dictionary, int32_t* dictionary_length) { |
| auto decoder = dynamic_cast<DictDecoder<DType>*>(this->current_decoder_); |
| decoder->GetDictionary(dictionary, dictionary_length); |
| } |
| |
| // Read definition and repetition levels. Also return the number of definition levels |
| // and number of values to read. This function is called before reading values. |
| // |
| // ReadLevels will throw exception when any num-levels read is not equal to the number |
| // of the levels can be read. |
| void ReadLevels(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, |
| int64_t* num_def_levels, int64_t* non_null_values_to_read) { |
| batch_size = std::min(batch_size, this->available_values_current_page()); |
| |
| // If the field is required and non-repeated, there are no definition levels |
| if (this->max_def_level_ > 0 && def_levels != nullptr) { |
| *num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels); |
| if (ARROW_PREDICT_FALSE(*num_def_levels != batch_size)) { |
| throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); |
| } |
| // TODO(wesm): this tallying of values-to-decode can be performed with better |
| // cache-efficiency if fused with the level decoding. |
| *non_null_values_to_read += |
| std::count(def_levels, def_levels + *num_def_levels, this->max_def_level_); |
| } else { |
| // Required field, read all values |
| if (num_def_levels != nullptr) { |
| *num_def_levels = 0; |
| } |
| *non_null_values_to_read = batch_size; |
| } |
| |
| // Not present for non-repeated fields |
| if (this->max_rep_level_ > 0 && rep_levels != nullptr) { |
| int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels); |
| if (batch_size != num_rep_levels) { |
| throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); |
| } |
| } |
| } |
| }; |
| |
| template <typename DType> |
| int64_t TypedColumnReaderImpl<DType>::ReadBatchWithDictionary( |
| int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, int32_t* indices, |
| int64_t* indices_read, const T** dict, int32_t* dict_len) { |
| bool has_dict_output = dict != nullptr && dict_len != nullptr; |
| // Similar logic as ReadValues to get pages. |
| if (!HasNext()) { |
| *indices_read = 0; |
| if (has_dict_output) { |
| *dict = nullptr; |
| *dict_len = 0; |
| } |
| return 0; |
| } |
| |
| // Verify the current data page is dictionary encoded. |
| if (this->current_encoding_ != Encoding::RLE_DICTIONARY) { |
| std::stringstream ss; |
| ss << "Data page is not dictionary encoded. Encoding: " |
| << EncodingToString(this->current_encoding_); |
| throw ParquetException(ss.str()); |
| } |
| |
| // Get dictionary pointer and length. |
| if (has_dict_output) { |
| GetDictionary(dict, dict_len); |
| } |
| |
| // Similar logic as ReadValues to get def levels and rep levels. |
| int64_t num_def_levels = 0; |
| int64_t indices_to_read = 0; |
| ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels, &indices_to_read); |
| |
| // Read dictionary indices. |
| *indices_read = ReadDictionaryIndices(indices_to_read, indices); |
| int64_t total_indices = std::max<int64_t>(num_def_levels, *indices_read); |
| // Some callers use a batch size of 0 just to get the dictionary. |
| int64_t expected_values = std::min(batch_size, this->available_values_current_page()); |
| if (total_indices == 0 && expected_values > 0) { |
| std::stringstream ss; |
| ss << "Read 0 values, expected " << expected_values; |
| ParquetException::EofException(ss.str()); |
| } |
| this->ConsumeBufferedValues(total_indices); |
| |
| return total_indices; |
| } |
| |
| template <typename DType> |
| int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* def_levels, |
| int16_t* rep_levels, T* values, |
| int64_t* values_read) { |
| // HasNext might invoke ReadNewPage until a data page with |
| // `available_values_current_page() > 0` is found. |
| if (!HasNext()) { |
| *values_read = 0; |
| return 0; |
| } |
| |
| // TODO(wesm): keep reading data pages until batch_size is reached, or the |
| // row group is finished |
| int64_t num_def_levels = 0; |
| // Number of non-null values to read within `num_def_levels`. |
| int64_t non_null_values_to_read = 0; |
| ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels, |
| &non_null_values_to_read); |
| // Should not return more values than available in the current data page, |
| // since currently, ReadLevels would only consume level from current |
| // data page. |
| if (ARROW_PREDICT_FALSE(num_def_levels > this->available_values_current_page())) { |
| throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); |
| } |
| if (non_null_values_to_read != 0) { |
| *values_read = this->ReadValues(non_null_values_to_read, values); |
| } else { |
| *values_read = 0; |
| } |
| // Adjust total_values, since if max_def_level_ == 0, num_def_levels would |
| // be 0 and `values_read` would adjust to `available_values_current_page()`. |
| int64_t total_values = std::max<int64_t>(num_def_levels, *values_read); |
| int64_t expected_values = std::min(batch_size, this->available_values_current_page()); |
| if (total_values == 0 && expected_values > 0) { |
| std::stringstream ss; |
| ss << "Read 0 values, expected " << expected_values; |
| ParquetException::EofException(ss.str()); |
| } |
| this->ConsumeBufferedValues(total_values); |
| return total_values; |
| } |
| |
| template <typename DType> |
| void TypedColumnReaderImpl<DType>::InitScratchForSkip() { |
| if (this->scratch_for_skip_ == nullptr) { |
| int value_size = type_traits<DType::type_num>::value_byte_size; |
| this->scratch_for_skip_ = AllocateBuffer( |
| this->pool_, kSkipScratchBatchSize * std::max<int>(sizeof(int16_t), value_size)); |
| } |
| } |
| |
| template <typename DType> |
| int64_t TypedColumnReaderImpl<DType>::Skip(int64_t num_values_to_skip) { |
| int64_t values_to_skip = num_values_to_skip; |
| // Optimization: Do not call HasNext() when values_to_skip == 0. |
| while (values_to_skip > 0 && HasNext()) { |
| // If the number of values to skip is more than the number of undecoded values, skip |
| // the Page. |
| const int64_t available_values = this->available_values_current_page(); |
| if (values_to_skip >= available_values) { |
| values_to_skip -= available_values; |
| this->ConsumeBufferedValues(available_values); |
| } else { |
| // We need to read this Page |
| // Jump to the right offset in the Page |
| int64_t values_read = 0; |
| InitScratchForSkip(); |
| ARROW_DCHECK_NE(this->scratch_for_skip_, nullptr); |
| do { |
| int64_t batch_size = std::min(kSkipScratchBatchSize, values_to_skip); |
| values_read = ReadBatch(static_cast<int>(batch_size), |
| scratch_for_skip_->mutable_data_as<int16_t>(), |
| scratch_for_skip_->mutable_data_as<int16_t>(), |
| scratch_for_skip_->mutable_data_as<T>(), &values_read); |
| values_to_skip -= values_read; |
| } while (values_read > 0 && values_to_skip > 0); |
| } |
| } |
| return num_values_to_skip - values_to_skip; |
| } |
| |
| } // namespace |
| |
| // ---------------------------------------------------------------------- |
| // Dynamic column reader constructor |
| |
| std::shared_ptr<ColumnReader> ColumnReader::Make(const ColumnDescriptor* descr, |
| std::unique_ptr<PageReader> pager, |
| MemoryPool* pool) { |
| switch (descr->physical_type()) { |
| case Type::BOOLEAN: |
| return std::make_shared<TypedColumnReaderImpl<BooleanType>>(descr, std::move(pager), |
| pool); |
| case Type::INT32: |
| return std::make_shared<TypedColumnReaderImpl<Int32Type>>(descr, std::move(pager), |
| pool); |
| case Type::INT64: |
| return std::make_shared<TypedColumnReaderImpl<Int64Type>>(descr, std::move(pager), |
| pool); |
| case Type::INT96: |
| return std::make_shared<TypedColumnReaderImpl<Int96Type>>(descr, std::move(pager), |
| pool); |
| case Type::FLOAT: |
| return std::make_shared<TypedColumnReaderImpl<FloatType>>(descr, std::move(pager), |
| pool); |
| case Type::DOUBLE: |
| return std::make_shared<TypedColumnReaderImpl<DoubleType>>(descr, std::move(pager), |
| pool); |
| case Type::BYTE_ARRAY: |
| return std::make_shared<TypedColumnReaderImpl<ByteArrayType>>( |
| descr, std::move(pager), pool); |
| case Type::FIXED_LEN_BYTE_ARRAY: |
| return std::make_shared<TypedColumnReaderImpl<FLBAType>>(descr, std::move(pager), |
| pool); |
| default: |
| ParquetException::NYI("type reader not implemented"); |
| } |
| ::arrow::Unreachable(); |
| } |
| |
| // ---------------------------------------------------------------------- |
| // RecordReader |
| |
| namespace internal { |
| |
| namespace { |
| |
| template <typename DType> |
| class TypedRecordReader : public TypedColumnReaderImpl<DType>, |
| virtual public RecordReader { |
| public: |
| using T = typename DType::c_type; |
| using BASE = TypedColumnReaderImpl<DType>; |
| TypedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, MemoryPool* pool, |
| bool read_dense_for_nullable) |
| // Pager must be set using SetPageReader. |
| : BASE(descr, /* pager = */ nullptr, pool) { |
| leaf_info_ = leaf_info; |
| nullable_values_ = leaf_info_.HasNullableValues(); |
| at_record_start_ = true; |
| values_written_ = 0; |
| null_count_ = 0; |
| values_capacity_ = 0; |
| levels_written_ = 0; |
| levels_position_ = 0; |
| levels_capacity_ = 0; |
| read_dense_for_nullable_ = read_dense_for_nullable; |
| // FIXED_LEN_BYTE_ARRAY and BYTE_ARRAY values are not stored in the `values_` buffer, |
| // they are read directly as Arrow. |
| uses_values_ = (descr->physical_type() != Type::BYTE_ARRAY && |
| descr->physical_type() != Type::FIXED_LEN_BYTE_ARRAY); |
| |
| if (uses_values_) { |
| values_ = AllocateBuffer(pool); |
| } |
| valid_bits_ = AllocateBuffer(pool); |
| def_levels_ = AllocateBuffer(pool); |
| rep_levels_ = AllocateBuffer(pool); |
| TypedRecordReader::Reset(); |
| } |
| |
| // Compute the values capacity in bytes for the given number of elements |
| int64_t bytes_for_values(int64_t nitems) const { |
| int64_t type_size = GetTypeByteSize(this->descr_->physical_type()); |
| int64_t bytes_for_values = -1; |
| if (MultiplyWithOverflow(nitems, type_size, &bytes_for_values)) { |
| throw ParquetException("Total size of items too large"); |
| } |
| return bytes_for_values; |
| } |
| |
| const void* ReadDictionary(int32_t* dictionary_length) override { |
| if (this->current_decoder_ == nullptr && !this->HasNextInternal()) { |
| *dictionary_length = 0; |
| return nullptr; |
| } |
| // Verify the current data page is dictionary encoded. The current_encoding_ should |
| // have been set as RLE_DICTIONARY if the page encoding is RLE_DICTIONARY or |
| // PLAIN_DICTIONARY. |
| if (this->current_encoding_ != Encoding::RLE_DICTIONARY) { |
| std::stringstream ss; |
| ss << "Data page is not dictionary encoded. Encoding: " |
| << EncodingToString(this->current_encoding_); |
| throw ParquetException(ss.str()); |
| } |
| auto decoder = dynamic_cast<DictDecoder<DType>*>(this->current_decoder_); |
| const T* dictionary = nullptr; |
| decoder->GetDictionary(&dictionary, dictionary_length); |
| return reinterpret_cast<const void*>(dictionary); |
| } |
| |
| int64_t ReadRecords(int64_t num_records) override { |
| if (num_records == 0) return 0; |
| // Delimit records, then read values at the end |
| int64_t records_read = 0; |
| |
| if (has_values_to_process()) { |
| records_read += ReadRecordData(num_records); |
| } |
| |
| int64_t level_batch_size = std::max<int64_t>(kMinLevelBatchSize, num_records); |
| |
| // If we are in the middle of a record, we continue until reaching the |
| // desired number of records or the end of the current record if we've found |
| // enough records |
| while (!at_record_start_ || records_read < num_records) { |
| // Is there more data to read in this row group? |
| if (!this->HasNextInternal()) { |
| if (!at_record_start_) { |
| // We ended the row group while inside a record that we haven't seen |
| // the end of yet. So increment the record count for the last record in |
| // the row group |
| ++records_read; |
| at_record_start_ = true; |
| } |
| break; |
| } |
| |
| /// We perform multiple batch reads until we either exhaust the row group |
| /// or observe the desired number of records |
| int64_t batch_size = |
| std::min(level_batch_size, this->available_values_current_page()); |
| |
| // No more data in column |
| if (batch_size == 0) { |
| break; |
| } |
| |
| if (this->max_def_level_ > 0) { |
| ReserveLevels(batch_size); |
| |
| int16_t* def_levels = this->def_levels() + levels_written_; |
| int16_t* rep_levels = this->rep_levels() + levels_written_; |
| |
| if (ARROW_PREDICT_FALSE(this->ReadDefinitionLevels(batch_size, def_levels) != |
| batch_size)) { |
| throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); |
| } |
| if (this->max_rep_level_ > 0) { |
| int64_t rep_levels_read = this->ReadRepetitionLevels(batch_size, rep_levels); |
| if (ARROW_PREDICT_FALSE(rep_levels_read != batch_size)) { |
| throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); |
| } |
| } |
| |
| levels_written_ += batch_size; |
| records_read += ReadRecordData(num_records - records_read); |
| } else { |
| // No repetition and definition levels, we can read values directly |
| batch_size = std::min(num_records - records_read, batch_size); |
| records_read += ReadRecordData(batch_size); |
| } |
| } |
| |
| return records_read; |
| } |
| |
| // Throw away levels from start_levels_position to levels_position_. |
| // Will update levels_position_, levels_written_, and levels_capacity_ |
| // accordingly and move the levels to left to fill in the gap. |
| // It will resize the buffer without releasing the memory allocation. |
| void ThrowAwayLevels(int64_t start_levels_position) { |
| ARROW_DCHECK_LE(levels_position_, levels_written_); |
| ARROW_DCHECK_LE(start_levels_position, levels_position_); |
| ARROW_DCHECK_GT(this->max_def_level_, 0); |
| ARROW_DCHECK_NE(def_levels_, nullptr); |
| |
| int64_t gap = levels_position_ - start_levels_position; |
| if (gap == 0) return; |
| |
| int64_t levels_remaining = levels_written_ - gap; |
| |
| auto left_shift = [&](::arrow::ResizableBuffer* buffer) { |
| auto* data = buffer->mutable_data_as<int16_t>(); |
| std::copy(data + levels_position_, data + levels_written_, |
| data + start_levels_position); |
| PARQUET_THROW_NOT_OK(buffer->Resize(levels_remaining * sizeof(int16_t), |
| /*shrink_to_fit=*/false)); |
| }; |
| |
| left_shift(def_levels_.get()); |
| |
| if (this->max_rep_level_ > 0) { |
| ARROW_DCHECK_NE(rep_levels_, nullptr); |
| left_shift(rep_levels_.get()); |
| } |
| |
| levels_written_ -= gap; |
| levels_position_ -= gap; |
| levels_capacity_ -= gap; |
| } |
| |
| // Skip records that we have in our buffer. This function is only for |
| // non-repeated fields. |
| int64_t SkipRecordsInBufferNonRepeated(int64_t num_records) { |
| ARROW_DCHECK_EQ(this->max_rep_level_, 0); |
| if (!this->has_values_to_process() || num_records == 0) return 0; |
| |
| int64_t remaining_records = levels_written_ - levels_position_; |
| int64_t skipped_records = std::min(num_records, remaining_records); |
| int64_t start_levels_position = levels_position_; |
| // Since there is no repetition, number of levels equals number of records. |
| levels_position_ += skipped_records; |
| |
| // We skipped the levels by incrementing 'levels_position_'. For values |
| // we do not have a buffer, so we need to read them and throw them away. |
| // First we need to figure out how many present/not-null values there are. |
| int64_t values_to_read = |
| std::count(def_levels() + start_levels_position, def_levels() + levels_position_, |
| this->max_def_level_); |
| |
| // Now that we have figured out number of values to read, we do not need |
| // these levels anymore. We will remove these values from the buffer. |
| // This requires shifting the levels in the buffer to left. So this will |
| // update levels_position_ and levels_written_. |
| ThrowAwayLevels(start_levels_position); |
| // For values, we do not have them in buffer, so we will read them and |
| // throw them away. |
| ReadAndThrowAwayValues(values_to_read); |
| |
| // Mark the levels as read in the underlying column reader. |
| this->ConsumeBufferedValues(skipped_records); |
| |
| return skipped_records; |
| } |
| |
| // Attempts to skip num_records from the buffer. Will throw away levels |
| // and corresponding values for the records it skipped and consumes them from the |
| // underlying decoder. Will advance levels_position_ and update |
| // at_record_start_. |
| // Returns how many records were skipped. |
| int64_t DelimitAndSkipRecordsInBuffer(int64_t num_records) { |
| if (num_records == 0) return 0; |
| // Look at the buffered levels, delimit them based on |
| // (rep_level == 0), report back how many records are in there, and |
| // fill in how many not-null values (def_level == max_def_level_). |
| // DelimitRecords updates levels_position_. |
| int64_t start_levels_position = levels_position_; |
| int64_t values_seen = 0; |
| int64_t skipped_records = DelimitRecords(num_records, &values_seen); |
| ReadAndThrowAwayValues(values_seen); |
| // Mark those levels and values as consumed in the underlying page. |
| // This must be done before we throw away levels since it updates |
| // levels_position_ and levels_written_. |
| this->ConsumeBufferedValues(levels_position_ - start_levels_position); |
| // Updated levels_position_ and levels_written_. |
| ThrowAwayLevels(start_levels_position); |
| return skipped_records; |
| } |
| |
| // Skip records for repeated fields. For repeated fields, we are technically |
| // reading and throwing away the levels and values since we do not know the record |
| // boundaries in advance. Keep filling the buffer and skipping until we reach the |
| // desired number of records or we run out of values in the column chunk. |
| // Returns number of skipped records. |
| int64_t SkipRecordsRepeated(int64_t num_records) { |
| ARROW_DCHECK_GT(this->max_rep_level_, 0); |
| int64_t skipped_records = 0; |
| |
| // First consume what is in the buffer. |
| if (levels_position_ < levels_written_) { |
| // This updates at_record_start_. |
| skipped_records = DelimitAndSkipRecordsInBuffer(num_records); |
| } |
| |
| int64_t level_batch_size = |
| std::max<int64_t>(kMinLevelBatchSize, num_records - skipped_records); |
| |
| // If 'at_record_start_' is false, but (skipped_records == num_records), it |
| // means that for the last record that was counted, we have not seen all |
| // of its values yet. |
| while (!at_record_start_ || skipped_records < num_records) { |
| // Is there more data to read in this row group? |
| // HasNextInternal() will advance to the next page if necessary. |
| if (!this->HasNextInternal()) { |
| if (!at_record_start_) { |
| // We ended the row group while inside a record that we haven't seen |
| // the end of yet. So increment the record count for the last record |
| // in the row group |
| ++skipped_records; |
| at_record_start_ = true; |
| } |
| break; |
| } |
| |
| // Read some more levels. |
| int64_t batch_size = |
| std::min(level_batch_size, this->available_values_current_page()); |
| // No more data in column. This must be an empty page. |
| // If we had exhausted the last page, HasNextInternal() must have advanced |
| // to the next page. So there must be available values to process. |
| if (batch_size == 0) { |
| break; |
| } |
| |
| // For skipping we will read the levels and append them to the end |
| // of the def_levels and rep_levels just like for read. |
| ReserveLevels(batch_size); |
| |
| int16_t* def_levels = this->def_levels() + levels_written_; |
| int16_t* rep_levels = this->rep_levels() + levels_written_; |
| |
| if (this->ReadDefinitionLevels(batch_size, def_levels) != batch_size) { |
| throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); |
| } |
| if (this->ReadRepetitionLevels(batch_size, rep_levels) != batch_size) { |
| throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); |
| } |
| |
| levels_written_ += batch_size; |
| int64_t remaining_records = num_records - skipped_records; |
| // This updates at_record_start_. |
| skipped_records += DelimitAndSkipRecordsInBuffer(remaining_records); |
| } |
| |
| return skipped_records; |
| } |
| |
| // Read 'num_values' values and throw them away. |
| // Throws an error if it could not read 'num_values'. |
| void ReadAndThrowAwayValues(int64_t num_values) { |
| int64_t values_left = num_values; |
| int64_t values_read = 0; |
| |
| // Allocate enough scratch space to accommodate 16-bit levels or any |
| // value type |
| this->InitScratchForSkip(); |
| ARROW_DCHECK_NE(this->scratch_for_skip_, nullptr); |
| do { |
| int64_t batch_size = std::min<int64_t>(kSkipScratchBatchSize, values_left); |
| values_read = this->ReadValues( |
| batch_size, this->scratch_for_skip_->template mutable_data_as<T>()); |
| values_left -= values_read; |
| } while (values_read > 0 && values_left > 0); |
| if (values_left > 0) { |
| std::stringstream ss; |
| ss << "Could not read and throw away " << num_values << " values"; |
| throw ParquetException(ss.str()); |
| } |
| } |
| |
| int64_t SkipRecords(int64_t num_records) override { |
| if (num_records == 0) return 0; |
| |
| // Top level required field. Number of records equals to number of levels, |
| // and there is not read-ahead for levels. |
| if (this->max_rep_level_ == 0 && this->max_def_level_ == 0) { |
| return this->Skip(num_records); |
| } |
| int64_t skipped_records = 0; |
| if (this->max_rep_level_ == 0) { |
| // Non-repeated optional field. |
| // First consume whatever is in the buffer. |
| skipped_records = SkipRecordsInBufferNonRepeated(num_records); |
| |
| ARROW_DCHECK_LE(skipped_records, num_records); |
| |
| // For records that we have not buffered, we will use the column |
| // reader's Skip to do the remaining Skip. Since the field is not |
| // repeated number of levels to skip is the same as number of records |
| // to skip. |
| skipped_records += this->Skip(num_records - skipped_records); |
| } else { |
| skipped_records += this->SkipRecordsRepeated(num_records); |
| } |
| return skipped_records; |
| } |
| |
| // We may outwardly have the appearance of having exhausted a column chunk |
| // when in fact we are in the middle of processing the last batch |
| bool has_values_to_process() const { return levels_position_ < levels_written_; } |
| |
| std::shared_ptr<ResizableBuffer> ReleaseValues() override { |
| if (uses_values_) { |
| auto result = values_; |
| PARQUET_THROW_NOT_OK( |
| result->Resize(bytes_for_values(values_written_), /*shrink_to_fit=*/true)); |
| values_ = AllocateBuffer(this->pool_); |
| values_capacity_ = 0; |
| return result; |
| } else { |
| return nullptr; |
| } |
| } |
| |
| std::shared_ptr<ResizableBuffer> ReleaseIsValid() override { |
| if (nullable_values()) { |
| auto result = valid_bits_; |
| PARQUET_THROW_NOT_OK(result->Resize(bit_util::BytesForBits(values_written_), |
| /*shrink_to_fit=*/true)); |
| valid_bits_ = AllocateBuffer(this->pool_); |
| return result; |
| } else { |
| return nullptr; |
| } |
| } |
| |
| // Process written repetition/definition levels to reach the end of |
| // records. Only used for repeated fields. |
| // Process no more levels than necessary to delimit the indicated |
| // number of logical records. Updates internal state of RecordReader |
| // |
| // \return Number of records delimited |
| int64_t DelimitRecords(int64_t num_records, int64_t* values_seen) { |
| if (ARROW_PREDICT_FALSE(num_records == 0 || levels_position_ == levels_written_)) { |
| *values_seen = 0; |
| return 0; |
| } |
| int64_t records_read = 0; |
| const int16_t* const rep_levels = this->rep_levels(); |
| const int16_t* const def_levels = this->def_levels(); |
| ARROW_DCHECK_GT(this->max_rep_level_, 0); |
| // If at_record_start_ is true, we are seeing the start of a record |
| // for the second time, such as after repeated calls to |
| // DelimitRecords. In this case we must continue until we find |
| // another record start or exhausting the ColumnChunk |
| int64_t level = levels_position_; |
| if (at_record_start_) { |
| if (ARROW_PREDICT_FALSE(rep_levels[levels_position_] != 0)) { |
| std::stringstream ss; |
| ss << "The repetition level at the start of a record must be 0 but got " |
| << rep_levels[levels_position_]; |
| throw ParquetException(ss.str()); |
| } |
| ++levels_position_; |
| // We have decided to consume the level at this position; therefore we |
| // must advance until we find another record boundary |
| at_record_start_ = false; |
| } |
| |
| // Count logical records and number of non-null values to read |
| ARROW_DCHECK(!at_record_start_); |
| // Scan repetition levels to find record end |
| while (levels_position_ < levels_written_) { |
| // We use an estimated batch size to simplify branching and |
| // improve performance in the common case. This might slow |
| // things down a bit if a single long record remains, though. |
| int64_t stride = |
| std::min(levels_written_ - levels_position_, num_records - records_read); |
| const int64_t position_end = levels_position_ + stride; |
| for (int64_t i = levels_position_; i < position_end; ++i) { |
| records_read += rep_levels[i] == 0; |
| } |
| levels_position_ = position_end; |
| if (records_read == num_records) { |
| // Check last rep_level reaches the boundary and |
| // pop the last level. |
| ARROW_CHECK_EQ(rep_levels[levels_position_ - 1], 0); |
| --levels_position_; |
| // We've found the number of records we were looking for. Set |
| // at_record_start_ to true and break |
| at_record_start_ = true; |
| break; |
| } |
| } |
| // Scan definition levels to find number of physical values |
| *values_seen = std::count(def_levels + level, def_levels + levels_position_, |
| this->max_def_level_); |
| return records_read; |
| } |
| |
| void Reserve(int64_t capacity) override { |
| ReserveLevels(capacity); |
| ReserveValues(capacity); |
| } |
| |
| int64_t UpdateCapacity(int64_t capacity, int64_t size, int64_t extra_size) { |
| if (extra_size < 0) { |
| throw ParquetException("Negative size (corrupt file?)"); |
| } |
| int64_t target_size = -1; |
| if (AddWithOverflow(size, extra_size, &target_size)) { |
| throw ParquetException("Allocation size too large (corrupt file?)"); |
| } |
| if (target_size >= (1LL << 62)) { |
| throw ParquetException("Allocation size too large (corrupt file?)"); |
| } |
| if (capacity >= target_size) { |
| return capacity; |
| } |
| return bit_util::NextPower2(target_size); |
| } |
| |
| void ReserveLevels(int64_t extra_levels) { |
| if (this->max_def_level_ > 0) { |
| const int64_t new_levels_capacity = |
| UpdateCapacity(levels_capacity_, levels_written_, extra_levels); |
| if (new_levels_capacity > levels_capacity_) { |
| constexpr auto kItemSize = static_cast<int64_t>(sizeof(int16_t)); |
| int64_t capacity_in_bytes = -1; |
| if (MultiplyWithOverflow(new_levels_capacity, kItemSize, &capacity_in_bytes)) { |
| throw ParquetException("Allocation size too large (corrupt file?)"); |
| } |
| PARQUET_THROW_NOT_OK( |
| def_levels_->Resize(capacity_in_bytes, /*shrink_to_fit=*/false)); |
| if (this->max_rep_level_ > 0) { |
| PARQUET_THROW_NOT_OK( |
| rep_levels_->Resize(capacity_in_bytes, /*shrink_to_fit=*/false)); |
| } |
| levels_capacity_ = new_levels_capacity; |
| } |
| } |
| } |
| |
| virtual void ReserveValues(int64_t extra_values) { |
| const int64_t new_values_capacity = |
| UpdateCapacity(values_capacity_, values_written_, extra_values); |
| if (new_values_capacity > values_capacity_) { |
| // XXX(wesm): A hack to avoid memory allocation when reading directly |
| // into builder classes |
| if (uses_values_) { |
| PARQUET_THROW_NOT_OK(values_->Resize(bytes_for_values(new_values_capacity), |
| /*shrink_to_fit=*/false)); |
| } |
| values_capacity_ = new_values_capacity; |
| } |
| if (nullable_values() && !read_dense_for_nullable_) { |
| int64_t valid_bytes_new = bit_util::BytesForBits(values_capacity_); |
| if (valid_bits_->size() < valid_bytes_new) { |
| int64_t valid_bytes_old = bit_util::BytesForBits(values_written_); |
| PARQUET_THROW_NOT_OK( |
| valid_bits_->Resize(valid_bytes_new, /*shrink_to_fit=*/false)); |
| |
| // Avoid valgrind warnings |
| memset(valid_bits_->mutable_data() + valid_bytes_old, 0, |
| static_cast<size_t>(valid_bytes_new - valid_bytes_old)); |
| } |
| } |
| } |
| |
| void Reset() override { |
| ResetValues(); |
| |
| if (levels_written_ > 0) { |
| // Throw away levels from 0 to levels_position_. |
| ThrowAwayLevels(0); |
| } |
| |
| // Call Finish on the binary builders to reset them |
| } |
| |
| void SetPageReader(std::unique_ptr<PageReader> reader) override { |
| at_record_start_ = true; |
| this->pager_ = std::move(reader); |
| ResetDecoders(); |
| } |
| |
| bool HasMoreData() const override { return this->pager_ != nullptr; } |
| |
| const ColumnDescriptor* descr() const override { return this->descr_; } |
| |
| // Dictionary decoders must be reset when advancing row groups |
| void ResetDecoders() { this->decoders_.clear(); } |
| |
| virtual void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) { |
| uint8_t* valid_bits = valid_bits_->mutable_data(); |
| const int64_t valid_bits_offset = values_written_; |
| |
| int64_t num_decoded = this->current_decoder_->DecodeSpaced( |
| ValuesHead<T>(), static_cast<int>(values_with_nulls), |
| static_cast<int>(null_count), valid_bits, valid_bits_offset); |
| CheckNumberDecoded(num_decoded, values_with_nulls); |
| } |
| |
| virtual void ReadValuesDense(int64_t values_to_read) { |
| int64_t num_decoded = |
| this->current_decoder_->Decode(ValuesHead<T>(), static_cast<int>(values_to_read)); |
| CheckNumberDecoded(num_decoded, values_to_read); |
| } |
| |
| // Reads repeated records and returns number of records read. Fills in |
| // values_to_read and null_count. |
| int64_t ReadRepeatedRecords(int64_t num_records, int64_t* values_to_read, |
| int64_t* null_count) { |
| const int64_t start_levels_position = levels_position_; |
| // Note that repeated records may be required or nullable. If they have |
| // an optional parent in the path, they will be nullable, otherwise, |
| // they are required. We use leaf_info_->HasNullableValues() that looks |
| // at repeated_ancestor_def_level to determine if it is required or |
| // nullable. Even if they are required, we may have to read ahead and |
| // delimit the records to get the right number of values and they will |
| // have associated levels. |
| int64_t records_read = DelimitRecords(num_records, values_to_read); |
| if (!nullable_values() || read_dense_for_nullable_) { |
| ReadValuesDense(*values_to_read); |
| // null_count is always 0 for required. |
| ARROW_DCHECK_EQ(*null_count, 0); |
| } else { |
| ReadSpacedForOptionalOrRepeated(start_levels_position, values_to_read, null_count); |
| } |
| return records_read; |
| } |
| |
| // Reads optional records and returns number of records read. Fills in |
| // values_to_read and null_count. |
| int64_t ReadOptionalRecords(int64_t num_records, int64_t* values_to_read, |
| int64_t* null_count) { |
| const int64_t start_levels_position = levels_position_; |
| // No repetition levels, skip delimiting logic. Each level represents a |
| // null or not null entry |
| int64_t records_read = |
| std::min<int64_t>(levels_written_ - levels_position_, num_records); |
| // This is advanced by DelimitRecords for the repeated field case above. |
| levels_position_ += records_read; |
| |
| // Optional fields are always nullable. |
| if (read_dense_for_nullable_) { |
| ReadDenseForOptional(start_levels_position, values_to_read); |
| // We don't need to update null_count when reading dense. It should be |
| // already set to 0. |
| ARROW_DCHECK_EQ(*null_count, 0); |
| } else { |
| ReadSpacedForOptionalOrRepeated(start_levels_position, values_to_read, null_count); |
| } |
| return records_read; |
| } |
| |
| // Reads required records and returns number of records read. Fills in |
| // values_to_read. |
| int64_t ReadRequiredRecords(int64_t num_records, int64_t* values_to_read) { |
| *values_to_read = num_records; |
| ReadValuesDense(*values_to_read); |
| return num_records; |
| } |
| |
| // Reads dense for optional records. First it figures out how many values to |
| // read. |
| void ReadDenseForOptional(int64_t start_levels_position, int64_t* values_to_read) { |
| // levels_position_ must already be incremented based on number of records |
| // read. |
| ARROW_DCHECK_GE(levels_position_, start_levels_position); |
| |
| // When reading dense we need to figure out number of values to read. |
| const int16_t* def_levels = this->def_levels(); |
| *values_to_read += std::count(def_levels + start_levels_position, |
| def_levels + levels_position_, this->max_def_level_); |
| ReadValuesDense(*values_to_read); |
| } |
| |
| // Reads spaced for optional or repeated fields. |
| void ReadSpacedForOptionalOrRepeated(int64_t start_levels_position, |
| int64_t* values_to_read, int64_t* null_count) { |
| // levels_position_ must already be incremented based on number of records |
| // read. |
| ARROW_DCHECK_GE(levels_position_, start_levels_position); |
| ValidityBitmapInputOutput validity_io; |
| validity_io.values_read_upper_bound = levels_position_ - start_levels_position; |
| validity_io.valid_bits = valid_bits_->mutable_data(); |
| validity_io.valid_bits_offset = values_written_; |
| |
| DefLevelsToBitmap(def_levels() + start_levels_position, |
| levels_position_ - start_levels_position, leaf_info_, &validity_io); |
| *values_to_read = validity_io.values_read - validity_io.null_count; |
| *null_count = validity_io.null_count; |
| ARROW_DCHECK_GE(*values_to_read, 0); |
| ARROW_DCHECK_GE(*null_count, 0); |
| ReadValuesSpaced(validity_io.values_read, *null_count); |
| } |
| |
| // Return number of logical records read. |
| // Updates levels_position_, values_written_, and null_count_. |
| int64_t ReadRecordData(int64_t num_records) { |
| // Conservative upper bound |
| const int64_t possible_num_values = |
| std::max<int64_t>(num_records, levels_written_ - levels_position_); |
| ReserveValues(static_cast<size_t>(possible_num_values)); |
| |
| const int64_t start_levels_position = levels_position_; |
| |
| // To be updated by the function calls below for each of the repetition |
| // types. |
| int64_t records_read = 0; |
| int64_t values_to_read = 0; |
| int64_t null_count = 0; |
| if (this->max_rep_level_ > 0) { |
| // Repeated fields may be nullable or not. |
| // This call updates levels_position_. |
| records_read = ReadRepeatedRecords(num_records, &values_to_read, &null_count); |
| } else if (this->max_def_level_ > 0) { |
| // Non-repeated optional values are always nullable. |
| // This call updates levels_position_. |
| ARROW_DCHECK(nullable_values()); |
| records_read = ReadOptionalRecords(num_records, &values_to_read, &null_count); |
| } else { |
| ARROW_DCHECK(!nullable_values()); |
| records_read = ReadRequiredRecords(num_records, &values_to_read); |
| // We don't need to update null_count, since it is 0. |
| } |
| |
| ARROW_DCHECK_GE(records_read, 0); |
| ARROW_DCHECK_GE(values_to_read, 0); |
| ARROW_DCHECK_GE(null_count, 0); |
| |
| if (read_dense_for_nullable_) { |
| values_written_ += values_to_read; |
| ARROW_DCHECK_EQ(null_count, 0); |
| } else { |
| values_written_ += values_to_read + null_count; |
| null_count_ += null_count; |
| } |
| // Total values, including null spaces, if any |
| if (this->max_def_level_ > 0) { |
| // Optional, repeated, or some mix thereof |
| this->ConsumeBufferedValues(levels_position_ - start_levels_position); |
| } else { |
| // Flat, non-repeated |
| this->ConsumeBufferedValues(values_to_read); |
| } |
| |
| return records_read; |
| } |
| |
| void DebugPrintState() override { |
| const int16_t* def_levels = this->def_levels(); |
| const int16_t* rep_levels = this->rep_levels(); |
| const int64_t total_levels_read = levels_position_; |
| |
| const T* vals = reinterpret_cast<const T*>(this->values()); |
| |
| if (leaf_info_.def_level > 0) { |
| std::cout << "def levels: "; |
| for (int64_t i = 0; i < total_levels_read; ++i) { |
| std::cout << def_levels[i] << " "; |
| } |
| std::cout << std::endl; |
| } |
| |
| if (leaf_info_.rep_level > 0) { |
| std::cout << "rep levels: "; |
| for (int64_t i = 0; i < total_levels_read; ++i) { |
| std::cout << rep_levels[i] << " "; |
| } |
| std::cout << std::endl; |
| } |
| |
| std::cout << "values: "; |
| for (int64_t i = 0; i < this->values_written(); ++i) { |
| std::cout << vals[i] << " "; |
| } |
| std::cout << std::endl; |
| } |
| |
| void ResetValues() { |
| if (values_written_ > 0) { |
| // Resize to 0, but do not shrink to fit |
| if (uses_values_) { |
| PARQUET_THROW_NOT_OK(values_->Resize(0, /*shrink_to_fit=*/false)); |
| } |
| PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, /*shrink_to_fit=*/false)); |
| values_written_ = 0; |
| values_capacity_ = 0; |
| null_count_ = 0; |
| } |
| } |
| |
| protected: |
| template <typename T> |
| T* ValuesHead() { |
| return values_->mutable_data_as<T>() + values_written_; |
| } |
| LevelInfo leaf_info_; |
| }; |
| |
| /// In FLBARecordReader, we read fixed length byte array values. |
| /// |
| /// Unlike other fixed length types, the `values_` buffer is not used to store |
| /// values, instead we use `data_builder_` to store the values, and `null_bitmap_builder_` |
| /// is used to store the null bitmap. |
| /// |
| /// The `values_` buffer is used to store the temporary values for `Decode`, and it would |
| /// be Reset after each `Decode` call. The `valid_bits_` buffer is never used. |
| class FLBARecordReader final : public TypedRecordReader<FLBAType>, |
| virtual public BinaryRecordReader { |
| public: |
| FLBARecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, |
| ::arrow::MemoryPool* pool, bool read_dense_for_nullable) |
| : TypedRecordReader<FLBAType>(descr, leaf_info, pool, read_dense_for_nullable), |
| byte_width_(descr_->type_length()), |
| type_(::arrow::fixed_size_binary(byte_width_)), |
| array_builder_(type_, pool) { |
| ARROW_DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY); |
| } |
| |
| ::arrow::ArrayVector GetBuilderChunks() override { |
| PARQUET_ASSIGN_OR_THROW(auto chunk, array_builder_.Finish()); |
| return ::arrow::ArrayVector{std::move(chunk)}; |
| } |
| |
| void ReserveValues(int64_t extra_values) override { |
| ARROW_DCHECK(!uses_values_); |
| TypedRecordReader::ReserveValues(extra_values); |
| PARQUET_THROW_NOT_OK(array_builder_.Reserve(extra_values)); |
| } |
| |
| void ReadValuesDense(int64_t values_to_read) override { |
| int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( |
| static_cast<int>(values_to_read), &array_builder_); |
| CheckNumberDecoded(num_decoded, values_to_read); |
| ResetValues(); |
| } |
| |
| void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { |
| int64_t num_decoded = this->current_decoder_->DecodeArrow( |
| static_cast<int>(values_to_read), static_cast<int>(null_count), |
| valid_bits_->mutable_data(), values_written_, &array_builder_); |
| CheckNumberDecoded(num_decoded, values_to_read - null_count); |
| ResetValues(); |
| } |
| |
| private: |
| const int byte_width_; |
| std::shared_ptr<::arrow::DataType> type_; |
| ::arrow::FixedSizeBinaryBuilder array_builder_; |
| }; |
| |
| /// ByteArrayRecordReader reads variable length byte array values. |
| /// |
| /// It only calls `DecodeArrowNonNull` and `DecodeArrow` to read values, and |
| /// `Decode` and `DecodeSpaced` are not used. |
| /// |
| /// The `values_` buffers are never used, and the `accumulator_` |
| /// is used to store the values. |
| class ByteArrayChunkedRecordReader final : public TypedRecordReader<ByteArrayType>, |
| virtual public BinaryRecordReader { |
| public: |
| ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, |
| ::arrow::MemoryPool* pool, bool read_dense_for_nullable, |
| const std::shared_ptr<::arrow::DataType>& arrow_type) |
| : TypedRecordReader<ByteArrayType>(descr, leaf_info, pool, |
| read_dense_for_nullable) { |
| ARROW_DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); |
| auto arrow_binary_type = arrow_type ? arrow_type->id() : ::arrow::Type::BINARY; |
| switch (arrow_binary_type) { |
| case ::arrow::Type::BINARY: |
| accumulator_.builder = std::make_unique<::arrow::BinaryBuilder>(pool); |
| break; |
| case ::arrow::Type::STRING: |
| accumulator_.builder = std::make_unique<::arrow::StringBuilder>(pool); |
| break; |
| case ::arrow::Type::LARGE_BINARY: |
| accumulator_.builder = std::make_unique<::arrow::LargeBinaryBuilder>(pool); |
| break; |
| case ::arrow::Type::LARGE_STRING: |
| accumulator_.builder = std::make_unique<::arrow::LargeStringBuilder>(pool); |
| break; |
| case ::arrow::Type::BINARY_VIEW: |
| accumulator_.builder = std::make_unique<::arrow::BinaryViewBuilder>(pool); |
| break; |
| case ::arrow::Type::STRING_VIEW: |
| accumulator_.builder = std::make_unique<::arrow::StringViewBuilder>(pool); |
| break; |
| default: |
| throw ParquetException("cannot read Parquet BYTE_ARRAY as Arrow " + |
| arrow_type->ToString()); |
| } |
| } |
| |
| ::arrow::ArrayVector GetBuilderChunks() override { |
| ::arrow::ArrayVector result = accumulator_.chunks; |
| if (result.empty() || accumulator_.builder->length() > 0) { |
| std::shared_ptr<::arrow::Array> last_chunk; |
| PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk)); |
| result.push_back(std::move(last_chunk)); |
| } |
| accumulator_.chunks = {}; |
| return result; |
| } |
| |
| void ReserveValues(int64_t extra_values) override { |
| ARROW_DCHECK(!uses_values_); |
| TypedRecordReader::ReserveValues(extra_values); |
| PARQUET_THROW_NOT_OK(accumulator_.builder->Reserve(extra_values)); |
| } |
| |
| void ReadValuesDense(int64_t values_to_read) override { |
| int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( |
| static_cast<int>(values_to_read), &accumulator_); |
| CheckNumberDecoded(num_decoded, values_to_read); |
| ResetValues(); |
| } |
| |
| void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { |
| int64_t num_decoded = this->current_decoder_->DecodeArrow( |
| static_cast<int>(values_to_read), static_cast<int>(null_count), |
| valid_bits_->mutable_data(), values_written_, &accumulator_); |
| CheckNumberDecoded(num_decoded, values_to_read - null_count); |
| ResetValues(); |
| } |
| |
| private: |
| // Helper data structure for accumulating builder chunks |
| typename EncodingTraits<ByteArrayType>::Accumulator accumulator_; |
| }; |
| |
| /// ByteArrayDictionaryRecordReader reads into ::arrow::dictionary(index: int32, |
| /// values: binary). |
| /// |
| /// If underlying column is dictionary encoded, it will call `DecodeIndices` to read, |
| /// otherwise it will call `DecodeArrowNonNull` to read. |
| class ByteArrayDictionaryRecordReader final : public TypedRecordReader<ByteArrayType>, |
| virtual public DictionaryRecordReader { |
| public: |
| ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, |
| ::arrow::MemoryPool* pool, bool read_dense_for_nullable) |
| : TypedRecordReader<ByteArrayType>(descr, leaf_info, pool, read_dense_for_nullable), |
| builder_(pool) { |
| this->read_dictionary_ = true; |
| } |
| |
| std::shared_ptr<::arrow::ChunkedArray> GetResult() override { |
| FlushBuilder(); |
| std::vector<std::shared_ptr<::arrow::Array>> result; |
| std::swap(result, result_chunks_); |
| return std::make_shared<::arrow::ChunkedArray>(std::move(result), builder_.type()); |
| } |
| |
| void FlushBuilder() { |
| if (builder_.length() > 0) { |
| std::shared_ptr<::arrow::Array> chunk; |
| PARQUET_THROW_NOT_OK(builder_.Finish(&chunk)); |
| result_chunks_.emplace_back(std::move(chunk)); |
| |
| // Also clears the dictionary memo table |
| builder_.Reset(); |
| } |
| } |
| |
| void MaybeWriteNewDictionary() { |
| if (this->new_dictionary_) { |
| /// If there is a new dictionary, we may need to flush the builder, then |
| /// insert the new dictionary values |
| FlushBuilder(); |
| builder_.ResetFull(); |
| auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_); |
| decoder->InsertDictionary(&builder_); |
| this->new_dictionary_ = false; |
| } |
| } |
| |
| void ReadValuesDense(int64_t values_to_read) override { |
| int64_t num_decoded = 0; |
| if (current_encoding_ == Encoding::RLE_DICTIONARY) { |
| MaybeWriteNewDictionary(); |
| auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_); |
| num_decoded = decoder->DecodeIndices(static_cast<int>(values_to_read), &builder_); |
| } else { |
| num_decoded = this->current_decoder_->DecodeArrowNonNull( |
| static_cast<int>(values_to_read), &builder_); |
| } |
| // Flush values since they have been copied into the builder |
| ResetValues(); |
| CheckNumberDecoded(num_decoded, values_to_read); |
| } |
| |
| void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { |
| int64_t num_decoded = 0; |
| if (current_encoding_ == Encoding::RLE_DICTIONARY) { |
| MaybeWriteNewDictionary(); |
| auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_); |
| num_decoded = decoder->DecodeIndicesSpaced( |
| static_cast<int>(values_to_read), static_cast<int>(null_count), |
| valid_bits_->mutable_data(), values_written_, &builder_); |
| } else { |
| num_decoded = this->current_decoder_->DecodeArrow( |
| static_cast<int>(values_to_read), static_cast<int>(null_count), |
| valid_bits_->mutable_data(), values_written_, &builder_); |
| } |
| ARROW_DCHECK_EQ(num_decoded, values_to_read - null_count); |
| // Flush values since they have been copied into the builder |
| ResetValues(); |
| } |
| |
| private: |
| using BinaryDictDecoder = DictDecoder<ByteArrayType>; |
| |
| ::arrow::BinaryDictionary32Builder builder_; |
| std::vector<std::shared_ptr<::arrow::Array>> result_chunks_; |
| }; |
| |
| // TODO(wesm): Implement these to some satisfaction |
| template <> |
| void TypedRecordReader<Int96Type>::DebugPrintState() {} |
| |
| template <> |
| void TypedRecordReader<ByteArrayType>::DebugPrintState() {} |
| |
| template <> |
| void TypedRecordReader<FLBAType>::DebugPrintState() {} |
| |
| std::shared_ptr<RecordReader> MakeByteArrayRecordReader( |
| const ColumnDescriptor* descr, LevelInfo leaf_info, ::arrow::MemoryPool* pool, |
| bool read_dictionary, bool read_dense_for_nullable, |
| const std::shared_ptr<::arrow::DataType>& arrow_type) { |
| if (read_dictionary) { |
| return std::make_shared<ByteArrayDictionaryRecordReader>(descr, leaf_info, pool, |
| read_dense_for_nullable); |
| } else { |
| return std::make_shared<ByteArrayChunkedRecordReader>( |
| descr, leaf_info, pool, read_dense_for_nullable, arrow_type); |
| } |
| } |
| |
| } // namespace |
| |
| std::shared_ptr<RecordReader> RecordReader::Make( |
| const ColumnDescriptor* descr, LevelInfo leaf_info, MemoryPool* pool, |
| bool read_dictionary, bool read_dense_for_nullable, |
| const std::shared_ptr<::arrow::DataType>& arrow_type) { |
| switch (descr->physical_type()) { |
| case Type::BOOLEAN: |
| return std::make_shared<TypedRecordReader<BooleanType>>(descr, leaf_info, pool, |
| read_dense_for_nullable); |
| case Type::INT32: |
| return std::make_shared<TypedRecordReader<Int32Type>>(descr, leaf_info, pool, |
| read_dense_for_nullable); |
| case Type::INT64: |
| return std::make_shared<TypedRecordReader<Int64Type>>(descr, leaf_info, pool, |
| read_dense_for_nullable); |
| case Type::INT96: |
| return std::make_shared<TypedRecordReader<Int96Type>>(descr, leaf_info, pool, |
| read_dense_for_nullable); |
| case Type::FLOAT: |
| return std::make_shared<TypedRecordReader<FloatType>>(descr, leaf_info, pool, |
| read_dense_for_nullable); |
| case Type::DOUBLE: |
| return std::make_shared<TypedRecordReader<DoubleType>>(descr, leaf_info, pool, |
| read_dense_for_nullable); |
| case Type::BYTE_ARRAY: { |
| return MakeByteArrayRecordReader(descr, leaf_info, pool, read_dictionary, |
| read_dense_for_nullable, arrow_type); |
| } |
| case Type::FIXED_LEN_BYTE_ARRAY: |
| return std::make_shared<FLBARecordReader>(descr, leaf_info, pool, |
| read_dense_for_nullable); |
| default: { |
| // PARQUET-1481: This can occur if the file is corrupt |
| std::stringstream ss; |
| ss << "Invalid physical column type: " << static_cast<int>(descr->physical_type()); |
| throw ParquetException(ss.str()); |
| } |
| } |
| // Unreachable code, but suppress compiler warning |
| return nullptr; |
| } |
| |
| } // namespace internal |
| } // namespace parquet |