| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "parquet/file_reader.h" |
| |
| #include <algorithm> |
| #include <cstdint> |
| #include <cstring> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| #include <unordered_map> |
| #include <utility> |
| |
| #include "arrow/io/caching.h" |
| #include "arrow/io/file.h" |
| #include "arrow/io/memory.h" |
| #include "arrow/io/util_internal.h" |
| #include "arrow/util/bit_util.h" |
| #include "arrow/util/checked_cast.h" |
| #include "arrow/util/future.h" |
| #include "arrow/util/int_util_overflow.h" |
| #include "arrow/util/logging.h" |
| #include "arrow/util/ubsan.h" |
| #include "parquet/bloom_filter.h" |
| #include "parquet/bloom_filter_reader.h" |
| #include "parquet/column_reader.h" |
| #include "parquet/column_scanner.h" |
| #include "parquet/encryption/encryption_internal.h" |
| #include "parquet/encryption/internal_file_decryptor.h" |
| #include "parquet/exception.h" |
| #include "parquet/file_writer.h" |
| #include "parquet/metadata.h" |
| #include "parquet/page_index.h" |
| #include "parquet/platform.h" |
| #include "parquet/properties.h" |
| #include "parquet/schema.h" |
| #include "parquet/types.h" |
| |
| using arrow::internal::AddWithOverflow; |
| |
| namespace parquet { |
| |
| using ::arrow::Future; |
| using ::arrow::Result; |
| using ::arrow::Status; |
| |
| namespace { |
| bool IsColumnChunkFullyDictionaryEncoded(const ColumnChunkMetaData& col) { |
| // Check the encoding_stats to see if all data pages are dictionary encoded. |
| const std::vector<PageEncodingStats>& encoding_stats = col.encoding_stats(); |
| if (encoding_stats.empty()) { |
| // Some parquet files may have empty encoding_stats. In this case we are |
| // not sure whether all data pages are dictionary encoded. |
| return false; |
| } |
| // The 1st page should be the dictionary page. |
| if (encoding_stats[0].page_type != PageType::DICTIONARY_PAGE || |
| (encoding_stats[0].encoding != Encoding::PLAIN && |
| encoding_stats[0].encoding != Encoding::PLAIN_DICTIONARY)) { |
| return false; |
| } |
| // The following pages should be dictionary encoded data pages. |
| for (size_t idx = 1; idx < encoding_stats.size(); ++idx) { |
| if (!IsDictionaryIndexEncoding(encoding_stats[idx].encoding) || |
| (encoding_stats[idx].page_type != PageType::DATA_PAGE && |
| encoding_stats[idx].page_type != PageType::DATA_PAGE_V2)) { |
| // Return false if any following page is not a dictionary encoded data |
| // page. |
| return false; |
| } |
| } |
| return true; |
| } |
| } // namespace |
| |
| static constexpr uint32_t kFooterSize = 8; |
| |
| // For PARQUET-816 |
| static constexpr int64_t kMaxDictHeaderSize = 100; |
| |
| // ---------------------------------------------------------------------- |
| // RowGroupReader public API |
| |
| RowGroupReader::RowGroupReader(std::unique_ptr<Contents> contents) |
| : contents_(std::move(contents)) {} |
| |
| std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) { |
| if (i >= metadata()->num_columns()) { |
| std::stringstream ss; |
| ss << "Trying to read column index " << i << " but row group metadata has only " |
| << metadata()->num_columns() << " columns"; |
| throw ParquetException(ss.str()); |
| } |
| const ColumnDescriptor* descr = metadata()->schema()->Column(i); |
| |
| std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i); |
| return ColumnReader::Make( |
| descr, std::move(page_reader), |
| const_cast<ReaderProperties*>(contents_->properties())->memory_pool()); |
| } |
| |
| std::shared_ptr<internal::RecordReader> RowGroupReader::RecordReader( |
| int i, bool read_dictionary) { |
| if (i >= metadata()->num_columns()) { |
| std::stringstream ss; |
| ss << "Trying to read column index " << i << " but row group metadata has only " |
| << metadata()->num_columns() << " columns"; |
| throw ParquetException(ss.str()); |
| } |
| const ColumnDescriptor* descr = metadata()->schema()->Column(i); |
| |
| std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i); |
| |
| internal::LevelInfo level_info = internal::LevelInfo::ComputeLevelInfo(descr); |
| |
| auto reader = internal::RecordReader::Make( |
| descr, level_info, contents_->properties()->memory_pool(), read_dictionary, |
| contents_->properties()->read_dense_for_nullable()); |
| reader->SetPageReader(std::move(page_reader)); |
| return reader; |
| } |
| |
| std::shared_ptr<ColumnReader> RowGroupReader::ColumnWithExposeEncoding( |
| int i, ExposedEncoding encoding_to_expose) { |
| std::shared_ptr<ColumnReader> reader = Column(i); |
| |
| if (encoding_to_expose == ExposedEncoding::DICTIONARY && |
| IsColumnChunkFullyDictionaryEncoded(*metadata()->ColumnChunk(i))) { |
| // Set exposed encoding. |
| reader->SetExposedEncoding(encoding_to_expose); |
| } |
| |
| return reader; |
| } |
| |
| std::shared_ptr<internal::RecordReader> RowGroupReader::RecordReaderWithExposeEncoding( |
| int i, ExposedEncoding encoding_to_expose) { |
| return RecordReader( |
| i, |
| /*read_dictionary=*/encoding_to_expose == ExposedEncoding::DICTIONARY && |
| IsColumnChunkFullyDictionaryEncoded(*metadata()->ColumnChunk(i))); |
| } |
| |
| std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) { |
| if (i >= metadata()->num_columns()) { |
| std::stringstream ss; |
| ss << "Trying to read column index " << i << " but row group metadata has only " |
| << metadata()->num_columns() << " columns"; |
| throw ParquetException(ss.str()); |
| } |
| return contents_->GetColumnPageReader(i); |
| } |
| |
| // Returns the rowgroup metadata |
| const RowGroupMetaData* RowGroupReader::metadata() const { return contents_->metadata(); } |
| |
| namespace { |
| |
| /// Compute the section of the file that should be read for the given |
| /// row group and column chunk. |
| ::arrow::io::ReadRange ComputeColumnChunkRange(FileMetaData* file_metadata, |
| int64_t source_size, int row_group_index, |
| int column_index) { |
| std::unique_ptr<RowGroupMetaData> row_group_metadata = |
| file_metadata->RowGroup(row_group_index); |
| std::unique_ptr<ColumnChunkMetaData> column_metadata = |
| row_group_metadata->ColumnChunk(column_index); |
| |
| int64_t col_start = column_metadata->data_page_offset(); |
| if (column_metadata->has_dictionary_page() && |
| column_metadata->dictionary_page_offset() > 0 && |
| col_start > column_metadata->dictionary_page_offset()) { |
| col_start = column_metadata->dictionary_page_offset(); |
| } |
| |
| int64_t col_length = column_metadata->total_compressed_size(); |
| int64_t col_end; |
| if (col_start < 0 || col_length < 0) { |
| throw ParquetException("Invalid column metadata (corrupt file?)"); |
| } |
| |
| if (AddWithOverflow(col_start, col_length, &col_end) || col_end > source_size) { |
| throw ParquetException("Invalid column metadata (corrupt file?)"); |
| } |
| |
| // PARQUET-816 workaround for old files created by older parquet-mr |
| const ApplicationVersion& version = file_metadata->writer_version(); |
| if (version.VersionLt(ApplicationVersion::PARQUET_816_FIXED_VERSION())) { |
| // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the |
| // dictionary page header size in total_compressed_size and total_uncompressed_size |
| // (see IMPALA-694). We add padding to compensate. |
| int64_t bytes_remaining = source_size - col_end; |
| int64_t padding = std::min<int64_t>(kMaxDictHeaderSize, bytes_remaining); |
| col_length += padding; |
| } |
| |
| return {col_start, col_length}; |
| } |
| |
| } // namespace |
| |
| // RowGroupReader::Contents implementation for the Parquet file specification |
| class SerializedRowGroup : public RowGroupReader::Contents { |
| public: |
| SerializedRowGroup(std::shared_ptr<ArrowInputFile> source, |
| std::shared_ptr<::arrow::io::internal::ReadRangeCache> cached_source, |
| int64_t source_size, FileMetaData* file_metadata, |
| int row_group_number, ReaderProperties props, |
| std::shared_ptr<Buffer> prebuffered_column_chunks_bitmap) |
| : source_(std::move(source)), |
| cached_source_(std::move(cached_source)), |
| source_size_(source_size), |
| file_metadata_(file_metadata), |
| properties_(std::move(props)), |
| row_group_ordinal_(row_group_number), |
| prebuffered_column_chunks_bitmap_(std::move(prebuffered_column_chunks_bitmap)) { |
| row_group_metadata_ = file_metadata->RowGroup(row_group_number); |
| } |
| |
| const RowGroupMetaData* metadata() const override { return row_group_metadata_.get(); } |
| |
| const ReaderProperties* properties() const override { return &properties_; } |
| |
| std::unique_ptr<PageReader> GetColumnPageReader(int i) override { |
| // Read column chunk from the file |
| auto col = row_group_metadata_->ColumnChunk(i); |
| |
| ::arrow::io::ReadRange col_range = |
| ComputeColumnChunkRange(file_metadata_, source_size_, row_group_ordinal_, i); |
| std::shared_ptr<ArrowInputStream> stream; |
| if (cached_source_ && prebuffered_column_chunks_bitmap_ != nullptr && |
| ::arrow::bit_util::GetBit(prebuffered_column_chunks_bitmap_->data(), i)) { |
| // PARQUET-1698: if read coalescing is enabled, read from pre-buffered |
| // segments. |
| PARQUET_ASSIGN_OR_THROW(auto buffer, cached_source_->Read(col_range)); |
| stream = std::make_shared<::arrow::io::BufferReader>(buffer); |
| } else { |
| stream = properties_.GetStream(source_, col_range.offset, col_range.length); |
| } |
| |
| std::unique_ptr<ColumnCryptoMetaData> crypto_metadata = col->crypto_metadata(); |
| |
| // Prior to Arrow 3.0.0, is_compressed was always set to false in column headers, |
| // even if compression was used. See ARROW-17100. |
| bool always_compressed = file_metadata_->writer_version().VersionLt( |
| ApplicationVersion::PARQUET_CPP_10353_FIXED_VERSION()); |
| |
| // Column is encrypted only if crypto_metadata exists. |
| if (!crypto_metadata) { |
| return PageReader::Open(stream, col->num_values(), col->compression(), properties_, |
| always_compressed); |
| } |
| |
| // The column is encrypted |
| auto* file_decryptor = file_metadata_->file_decryptor().get(); |
| auto meta_decryptor_factory = InternalFileDecryptor::GetColumnMetaDecryptorFactory( |
| file_decryptor, crypto_metadata.get()); |
| auto data_decryptor_factory = InternalFileDecryptor::GetColumnDataDecryptorFactory( |
| file_decryptor, crypto_metadata.get()); |
| |
| constexpr auto kEncryptedOrdinalLimit = 32767; |
| if (ARROW_PREDICT_FALSE(row_group_ordinal_ > kEncryptedOrdinalLimit)) { |
| throw ParquetException("Encrypted files cannot contain more than 32767 row groups"); |
| } |
| if (ARROW_PREDICT_FALSE(i > kEncryptedOrdinalLimit)) { |
| throw ParquetException("Encrypted files cannot contain more than 32767 columns"); |
| } |
| |
| CryptoContext ctx{col->has_dictionary_page(), |
| static_cast<int16_t>(row_group_ordinal_), static_cast<int16_t>(i), |
| std::move(meta_decryptor_factory), |
| std::move(data_decryptor_factory)}; |
| return PageReader::Open(stream, col->num_values(), col->compression(), properties_, |
| always_compressed, &ctx); |
| } |
| |
| private: |
| std::shared_ptr<ArrowInputFile> source_; |
| // Will be nullptr if PreBuffer() is not called. |
| std::shared_ptr<::arrow::io::internal::ReadRangeCache> cached_source_; |
| int64_t source_size_; |
| FileMetaData* file_metadata_; |
| std::unique_ptr<RowGroupMetaData> row_group_metadata_; |
| ReaderProperties properties_; |
| int row_group_ordinal_; |
| const std::shared_ptr<const Buffer> prebuffered_column_chunks_bitmap_; |
| }; |
| |
| // ---------------------------------------------------------------------- |
| // SerializedFile: An implementation of ParquetFileReader::Contents that deals |
| // with the Parquet file structure, Thrift deserialization, and other internal |
| // matters |
| |
| // This class takes ownership of the provided data source |
| class SerializedFile : public ParquetFileReader::Contents { |
| public: |
| SerializedFile(std::shared_ptr<ArrowInputFile> source, |
| const ReaderProperties& props = default_reader_properties()) |
| : source_(std::move(source)), properties_(props) { |
| PARQUET_ASSIGN_OR_THROW(source_size_, source_->GetSize()); |
| } |
| |
| void Close() override {} |
| |
| std::shared_ptr<RowGroupReader> GetRowGroup(int i) override { |
| std::shared_ptr<Buffer> prebuffered_column_chunks_bitmap; |
| // Avoid updating the bitmap as this function can be called concurrently. The bitmap |
| // can only be updated within Prebuffer(). |
| auto prebuffered_column_chunks_iter = prebuffered_column_chunks_.find(i); |
| if (prebuffered_column_chunks_iter != prebuffered_column_chunks_.end()) { |
| prebuffered_column_chunks_bitmap = prebuffered_column_chunks_iter->second; |
| } |
| |
| std::unique_ptr<SerializedRowGroup> contents = std::make_unique<SerializedRowGroup>( |
| source_, cached_source_, source_size_, file_metadata_.get(), i, properties_, |
| std::move(prebuffered_column_chunks_bitmap)); |
| return std::make_shared<RowGroupReader>(std::move(contents)); |
| } |
| |
| std::shared_ptr<FileMetaData> metadata() const override { return file_metadata_; } |
| |
| std::shared_ptr<PageIndexReader> GetPageIndexReader() override { |
| if (!file_metadata_) { |
| // Usually this won't happen if user calls one of the static Open() functions |
| // to create a ParquetFileReader instance. But if user calls the constructor |
| // directly and calls GetPageIndexReader() before Open() then this could happen. |
| throw ParquetException( |
| "Cannot call GetPageIndexReader() due to missing file metadata. Did you " |
| "forget to call ParquetFileReader::Open() first?"); |
| } |
| if (!page_index_reader_) { |
| page_index_reader_ = |
| PageIndexReader::Make(source_.get(), file_metadata_, properties_, |
| file_metadata_->file_decryptor().get()); |
| } |
| return page_index_reader_; |
| } |
| |
| BloomFilterReader& GetBloomFilterReader() override { |
| if (!file_metadata_) { |
| // Usually this won't happen if user calls one of the static Open() functions |
| // to create a ParquetFileReader instance. But if user calls the constructor |
| // directly and calls GetBloomFilterReader() before Open() then this could happen. |
| throw ParquetException( |
| "Cannot call GetBloomFilterReader() due to missing file metadata. Did you " |
| "forget to call ParquetFileReader::Open() first?"); |
| } |
| if (!bloom_filter_reader_) { |
| bloom_filter_reader_ = BloomFilterReader::Make(source_, file_metadata_, properties_, |
| file_metadata_->file_decryptor()); |
| if (bloom_filter_reader_ == nullptr) { |
| throw ParquetException("Cannot create BloomFilterReader"); |
| } |
| } |
| return *bloom_filter_reader_; |
| } |
| |
| void set_metadata(std::shared_ptr<FileMetaData> metadata) { |
| file_metadata_ = std::move(metadata); |
| } |
| |
| void PreBuffer(const std::vector<int>& row_groups, |
| const std::vector<int>& column_indices, |
| const ::arrow::io::IOContext& ctx, |
| const ::arrow::io::CacheOptions& options) { |
| cached_source_ = |
| std::make_shared<::arrow::io::internal::ReadRangeCache>(source_, ctx, options); |
| std::vector<::arrow::io::ReadRange> ranges; |
| prebuffered_column_chunks_.clear(); |
| int num_cols = file_metadata_->num_columns(); |
| // a bitmap for buffered columns. |
| std::shared_ptr<Buffer> buffer_columns; |
| if (!row_groups.empty()) { |
| PARQUET_THROW_NOT_OK(AllocateEmptyBitmap(num_cols, properties_.memory_pool()) |
| .Value(&buffer_columns)); |
| for (int col : column_indices) { |
| ::arrow::bit_util::SetBit(buffer_columns->mutable_data(), col); |
| } |
| } |
| for (int row : row_groups) { |
| prebuffered_column_chunks_[row] = buffer_columns; |
| for (int col : column_indices) { |
| ranges.push_back( |
| ComputeColumnChunkRange(file_metadata_.get(), source_size_, row, col)); |
| } |
| } |
| PARQUET_THROW_NOT_OK(cached_source_->Cache(ranges)); |
| } |
| |
| Result<std::vector<::arrow::io::ReadRange>> GetReadRanges( |
| const std::vector<int>& row_groups, const std::vector<int>& column_indices, |
| int64_t hole_size_limit, int64_t range_size_limit) { |
| std::vector<::arrow::io::ReadRange> ranges; |
| for (int row_group : row_groups) { |
| for (int col : column_indices) { |
| ranges.push_back( |
| ComputeColumnChunkRange(file_metadata_.get(), source_size_, row_group, col)); |
| } |
| } |
| |
| return ::arrow::io::internal::CoalesceReadRanges(std::move(ranges), hole_size_limit, |
| range_size_limit); |
| } |
| |
| Future<> WhenBuffered(const std::vector<int>& row_groups, |
| const std::vector<int>& column_indices) const { |
| if (!cached_source_) { |
| return Status::Invalid("Must call PreBuffer before WhenBuffered"); |
| } |
| std::vector<::arrow::io::ReadRange> ranges; |
| for (int row : row_groups) { |
| for (int col : column_indices) { |
| ranges.push_back( |
| ComputeColumnChunkRange(file_metadata_.get(), source_size_, row, col)); |
| } |
| } |
| return cached_source_->WaitFor(ranges); |
| } |
| |
| // Metadata/footer parsing. Divided up to separate sync/async paths, and to use |
| // exceptions for error handling (with the async path converting to Future/Status). |
| |
| void ParseMetaData() { |
| int64_t footer_read_size = GetFooterReadSize(); |
| PARQUET_ASSIGN_OR_THROW( |
| auto footer_buffer, |
| source_->ReadAt(source_size_ - footer_read_size, footer_read_size)); |
| uint32_t metadata_len = ParseFooterLength(footer_buffer, footer_read_size); |
| int64_t metadata_start = source_size_ - kFooterSize - metadata_len; |
| |
| std::shared_ptr<::arrow::Buffer> metadata_buffer; |
| if (footer_read_size >= (metadata_len + kFooterSize)) { |
| metadata_buffer = SliceBuffer( |
| footer_buffer, footer_read_size - metadata_len - kFooterSize, metadata_len); |
| } else { |
| PARQUET_ASSIGN_OR_THROW(metadata_buffer, |
| source_->ReadAt(metadata_start, metadata_len)); |
| } |
| |
| // Parse the footer depending on encryption type |
| const bool is_encrypted_footer = |
| memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) == 0; |
| std::shared_ptr<InternalFileDecryptor> file_decryptor; |
| if (is_encrypted_footer) { |
| // Encrypted file with Encrypted footer. |
| const std::pair<int64_t, uint32_t> read_size = |
| ParseMetaDataOfEncryptedFileWithEncryptedFooter(metadata_buffer, metadata_len, |
| &file_decryptor); |
| // Read the actual footer |
| metadata_start = read_size.first; |
| metadata_len = read_size.second; |
| PARQUET_ASSIGN_OR_THROW(metadata_buffer, |
| source_->ReadAt(metadata_start, metadata_len)); |
| // Fall through |
| } |
| |
| ParseMetaDataFinal(std::move(metadata_buffer), metadata_len, is_encrypted_footer, |
| std::move(file_decryptor)); |
| } |
| |
| // Validate the source size and get the initial read size. |
| int64_t GetFooterReadSize() { |
| if (source_size_ == 0) { |
| throw ParquetInvalidOrCorruptedFileException("Parquet file size is 0 bytes"); |
| } else if (source_size_ < kFooterSize) { |
| throw ParquetInvalidOrCorruptedFileException( |
| "Parquet file size is ", source_size_, |
| " bytes, smaller than the minimum file footer (", kFooterSize, " bytes)"); |
| } |
| |
| return std::min(static_cast<size_t>(source_size_), properties_.footer_read_size()); |
| } |
| |
| // Validate the magic bytes and get the length of the full footer. |
| uint32_t ParseFooterLength(const std::shared_ptr<::arrow::Buffer>& footer_buffer, |
| const int64_t footer_read_size) { |
| // Check if all bytes are read. Check if last 4 bytes read have the magic bits |
| if (footer_buffer->size() != footer_read_size || |
| (memcmp(footer_buffer->data() + footer_read_size - 4, kParquetMagic, 4) != 0 && |
| memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) != 0)) { |
| throw ParquetInvalidOrCorruptedFileException( |
| "Parquet magic bytes not found in footer. Either the file is corrupted or this " |
| "is not a parquet file."); |
| } |
| // Both encrypted/unencrypted footers have the same footer length check. |
| uint32_t metadata_len = |
| ::arrow::bit_util::FromLittleEndian(::arrow::util::SafeLoadAs<uint32_t>( |
| reinterpret_cast<const uint8_t*>(footer_buffer->data()) + footer_read_size - |
| kFooterSize)); |
| if (metadata_len > source_size_ - kFooterSize) { |
| throw ParquetInvalidOrCorruptedFileException( |
| "Parquet file size is ", source_size_, |
| " bytes, smaller than the size reported by footer's (", metadata_len, "bytes)"); |
| } |
| return metadata_len; |
| } |
| |
| // Does not throw. |
| Future<> ParseMetaDataAsync() { |
| int64_t footer_read_size; |
| BEGIN_PARQUET_CATCH_EXCEPTIONS |
| footer_read_size = GetFooterReadSize(); |
| END_PARQUET_CATCH_EXCEPTIONS |
| // Assumes this is kept alive externally |
| return source_->ReadAsync(source_size_ - footer_read_size, footer_read_size) |
| .Then([this, footer_read_size]( |
| const std::shared_ptr<::arrow::Buffer>& footer_buffer) -> Future<> { |
| uint32_t metadata_len; |
| BEGIN_PARQUET_CATCH_EXCEPTIONS |
| metadata_len = ParseFooterLength(footer_buffer, footer_read_size); |
| END_PARQUET_CATCH_EXCEPTIONS |
| int64_t metadata_start = source_size_ - kFooterSize - metadata_len; |
| |
| std::shared_ptr<::arrow::Buffer> metadata_buffer; |
| if (footer_read_size >= (metadata_len + kFooterSize)) { |
| metadata_buffer = |
| SliceBuffer(footer_buffer, footer_read_size - metadata_len - kFooterSize, |
| metadata_len); |
| return ParseMaybeEncryptedMetaDataAsync(footer_buffer, |
| std::move(metadata_buffer), |
| footer_read_size, metadata_len); |
| } |
| return source_->ReadAsync(metadata_start, metadata_len) |
| .Then([this, footer_buffer, footer_read_size, metadata_len]( |
| const std::shared_ptr<::arrow::Buffer>& metadata_buffer) { |
| return ParseMaybeEncryptedMetaDataAsync(footer_buffer, metadata_buffer, |
| footer_read_size, metadata_len); |
| }); |
| }); |
| } |
| |
| // Continuation |
| Future<> ParseMaybeEncryptedMetaDataAsync( |
| std::shared_ptr<::arrow::Buffer> footer_buffer, |
| std::shared_ptr<::arrow::Buffer> metadata_buffer, int64_t footer_read_size, |
| uint32_t metadata_len) { |
| // Parse the footer depending on encryption type |
| const bool is_encrypted_footer = |
| memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) == 0; |
| std::shared_ptr<InternalFileDecryptor> file_decryptor; |
| if (is_encrypted_footer) { |
| // Encrypted file with Encrypted footer. |
| std::pair<int64_t, uint32_t> read_size; |
| BEGIN_PARQUET_CATCH_EXCEPTIONS |
| read_size = ParseMetaDataOfEncryptedFileWithEncryptedFooter( |
| metadata_buffer, metadata_len, &file_decryptor); |
| END_PARQUET_CATCH_EXCEPTIONS |
| // Read the actual footer |
| int64_t metadata_start = read_size.first; |
| metadata_len = read_size.second; |
| return source_->ReadAsync(metadata_start, metadata_len) |
| .Then([this, metadata_len, is_encrypted_footer, |
| file_decryptor = std::move(file_decryptor)]( |
| const std::shared_ptr<::arrow::Buffer>& metadata_buffer) { |
| // Continue and read the file footer |
| BEGIN_PARQUET_CATCH_EXCEPTIONS |
| ParseMetaDataFinal(metadata_buffer, metadata_len, is_encrypted_footer, |
| file_decryptor); |
| END_PARQUET_CATCH_EXCEPTIONS |
| return Status::OK(); |
| }); |
| } |
| BEGIN_PARQUET_CATCH_EXCEPTIONS |
| ParseMetaDataFinal(std::move(metadata_buffer), metadata_len, is_encrypted_footer, |
| std::move(file_decryptor)); |
| END_PARQUET_CATCH_EXCEPTIONS |
| return Status::OK(); |
| } |
| |
| // Continuation |
| void ParseMetaDataFinal(std::shared_ptr<::arrow::Buffer> metadata_buffer, |
| uint32_t metadata_len, const bool is_encrypted_footer, |
| std::shared_ptr<InternalFileDecryptor> file_decryptor) { |
| const uint32_t read_metadata_len = ParseUnencryptedFileMetadata( |
| metadata_buffer, metadata_len, std::move(file_decryptor)); |
| auto file_decryption_properties = properties_.file_decryption_properties(); |
| if (is_encrypted_footer) { |
| // Nothing else to do here. |
| return; |
| } else if (!file_metadata_->is_encryption_algorithm_set()) { // Non encrypted file. |
| if (file_decryption_properties != nullptr) { |
| if (!file_decryption_properties->plaintext_files_allowed()) { |
| throw ParquetException("Applying decryption properties on plaintext file"); |
| } |
| } |
| } else { |
| // Encrypted file with plaintext footer mode. |
| ParseMetaDataOfEncryptedFileWithPlaintextFooter( |
| file_decryption_properties, metadata_buffer, metadata_len, read_metadata_len); |
| } |
| } |
| |
| private: |
| std::shared_ptr<ArrowInputFile> source_; |
| std::shared_ptr<::arrow::io::internal::ReadRangeCache> cached_source_; |
| int64_t source_size_; |
| std::shared_ptr<FileMetaData> file_metadata_; |
| ReaderProperties properties_; |
| std::shared_ptr<PageIndexReader> page_index_reader_; |
| std::unique_ptr<BloomFilterReader> bloom_filter_reader_; |
| // Maps row group ordinal and prebuffer status of its column chunks in the form of a |
| // bitmap buffer. |
| std::unordered_map<int, std::shared_ptr<Buffer>> prebuffered_column_chunks_; |
| |
| // \return The true length of the metadata in bytes |
| uint32_t ParseUnencryptedFileMetadata( |
| const std::shared_ptr<Buffer>& footer_buffer, const uint32_t metadata_len, |
| std::shared_ptr<InternalFileDecryptor> file_decryptor); |
| |
| std::string HandleAadPrefix( |
| const std::shared_ptr<FileDecryptionProperties>& file_decryption_properties, |
| const EncryptionAlgorithm& algo); |
| |
| void ParseMetaDataOfEncryptedFileWithPlaintextFooter( |
| const std::shared_ptr<FileDecryptionProperties>& file_decryption_properties, |
| const std::shared_ptr<Buffer>& metadata_buffer, uint32_t metadata_len, |
| uint32_t read_metadata_len); |
| |
| // \return The position and size of the actual footer |
| std::pair<int64_t, uint32_t> ParseMetaDataOfEncryptedFileWithEncryptedFooter( |
| const std::shared_ptr<Buffer>& crypto_metadata_buffer, uint32_t footer_len, |
| std::shared_ptr<InternalFileDecryptor>* file_decryptor); |
| }; |
| |
| uint32_t SerializedFile::ParseUnencryptedFileMetadata( |
| const std::shared_ptr<Buffer>& metadata_buffer, const uint32_t metadata_len, |
| std::shared_ptr<InternalFileDecryptor> file_decryptor) { |
| if (metadata_buffer->size() != metadata_len) { |
| throw ParquetException("Failed reading metadata buffer (requested " + |
| std::to_string(metadata_len) + " bytes but got " + |
| std::to_string(metadata_buffer->size()) + " bytes)"); |
| } |
| uint32_t read_metadata_len = metadata_len; |
| // The encrypted read path falls through to here, so pass in the decryptor |
| file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &read_metadata_len, |
| properties_, std::move(file_decryptor)); |
| return read_metadata_len; |
| } |
| |
| std::pair<int64_t, uint32_t> |
| SerializedFile::ParseMetaDataOfEncryptedFileWithEncryptedFooter( |
| const std::shared_ptr<::arrow::Buffer>& crypto_metadata_buffer, |
| // both metadata & crypto metadata length |
| const uint32_t footer_len, std::shared_ptr<InternalFileDecryptor>* file_decryptor) { |
| // encryption with encrypted footer |
| // Check if the footer_buffer contains the entire metadata |
| if (crypto_metadata_buffer->size() != footer_len) { |
| throw ParquetException("Failed reading encrypted metadata buffer (requested " + |
| std::to_string(footer_len) + " bytes but got " + |
| std::to_string(crypto_metadata_buffer->size()) + " bytes)"); |
| } |
| auto file_decryption_properties = properties_.file_decryption_properties(); |
| if (file_decryption_properties == nullptr) { |
| throw ParquetException( |
| "Could not read encrypted metadata, no decryption found in reader's properties"); |
| } |
| uint32_t crypto_metadata_len = footer_len; |
| std::shared_ptr<FileCryptoMetaData> file_crypto_metadata = |
| FileCryptoMetaData::Make(crypto_metadata_buffer->data(), &crypto_metadata_len); |
| // Handle AAD prefix |
| EncryptionAlgorithm algo = file_crypto_metadata->encryption_algorithm(); |
| std::string file_aad = HandleAadPrefix(file_decryption_properties, algo); |
| *file_decryptor = std::make_shared<InternalFileDecryptor>( |
| file_decryption_properties, file_aad, algo.algorithm, |
| file_crypto_metadata->key_metadata(), properties_.memory_pool()); |
| |
| int64_t metadata_offset = source_size_ - kFooterSize - footer_len + crypto_metadata_len; |
| uint32_t metadata_len = footer_len - crypto_metadata_len; |
| return std::make_pair(metadata_offset, metadata_len); |
| } |
| |
| void SerializedFile::ParseMetaDataOfEncryptedFileWithPlaintextFooter( |
| const std::shared_ptr<FileDecryptionProperties>& file_decryption_properties, |
| const std::shared_ptr<Buffer>& metadata_buffer, uint32_t metadata_len, |
| uint32_t read_metadata_len) { |
| // Providing decryption properties in plaintext footer mode is not mandatory, for |
| // example when reading by legacy reader. |
| if (file_decryption_properties != nullptr) { |
| EncryptionAlgorithm algo = file_metadata_->encryption_algorithm(); |
| // Handle AAD prefix |
| std::string file_aad = HandleAadPrefix(file_decryption_properties, algo); |
| auto file_decryptor = std::make_shared<InternalFileDecryptor>( |
| file_decryption_properties, file_aad, algo.algorithm, |
| file_metadata_->footer_signing_key_metadata(), properties_.memory_pool()); |
| // Set the InternalFileDecryptor in the metadata as well, as it's used |
| // for ColumnChunkMetaData creation. |
| file_metadata_->set_file_decryptor(file_decryptor); |
| |
| if (file_decryption_properties->check_plaintext_footer_integrity()) { |
| auto serialized_metadata = |
| metadata_buffer->span_as<uint8_t>().subspan(0, read_metadata_len); |
| auto signature = metadata_buffer->span_as<uint8_t>().subspan(read_metadata_len); |
| if (!FileMetaData::VerifySignature(serialized_metadata, signature, |
| file_decryptor.get())) { |
| throw ParquetInvalidOrCorruptedFileException( |
| "Parquet crypto signature verification failed"); |
| } |
| } |
| } |
| } |
| |
| std::string SerializedFile::HandleAadPrefix( |
| const std::shared_ptr<FileDecryptionProperties>& file_decryption_properties, |
| const EncryptionAlgorithm& algo) { |
| std::string aad_prefix_in_properties = file_decryption_properties->aad_prefix(); |
| std::string aad_prefix = aad_prefix_in_properties; |
| bool file_has_aad_prefix = algo.aad.aad_prefix.empty() ? false : true; |
| std::string aad_prefix_in_file = algo.aad.aad_prefix; |
| |
| if (algo.aad.supply_aad_prefix && aad_prefix_in_properties.empty()) { |
| throw ParquetException( |
| "AAD prefix used for file encryption, " |
| "but not stored in file and not supplied " |
| "in decryption properties"); |
| } |
| |
| if (file_has_aad_prefix) { |
| if (!aad_prefix_in_properties.empty()) { |
| if (aad_prefix_in_properties.compare(aad_prefix_in_file) != 0) { |
| throw ParquetException( |
| "AAD Prefix in file and in properties " |
| "is not the same"); |
| } |
| } |
| aad_prefix = aad_prefix_in_file; |
| std::shared_ptr<AADPrefixVerifier> aad_prefix_verifier = |
| file_decryption_properties->aad_prefix_verifier(); |
| if (aad_prefix_verifier != nullptr) aad_prefix_verifier->Verify(aad_prefix); |
| } else { |
| if (!algo.aad.supply_aad_prefix && !aad_prefix_in_properties.empty()) { |
| throw ParquetException( |
| "AAD Prefix set in decryption properties, but was not used " |
| "for file encryption"); |
| } |
| std::shared_ptr<AADPrefixVerifier> aad_prefix_verifier = |
| file_decryption_properties->aad_prefix_verifier(); |
| if (aad_prefix_verifier != nullptr) { |
| throw ParquetException( |
| "AAD Prefix Verifier is set, but AAD Prefix not found in file"); |
| } |
| } |
| return aad_prefix + algo.aad.aad_file_unique; |
| } |
| |
| // ---------------------------------------------------------------------- |
| // ParquetFileReader public API |
| |
| ParquetFileReader::ParquetFileReader() {} |
| |
| ParquetFileReader::~ParquetFileReader() { |
| try { |
| Close(); |
| } catch (...) { |
| } |
| } |
| |
| // Open the file. If no metadata is passed, it is parsed from the footer of |
| // the file |
| std::unique_ptr<ParquetFileReader::Contents> ParquetFileReader::Contents::Open( |
| std::shared_ptr<ArrowInputFile> source, const ReaderProperties& props, |
| std::shared_ptr<FileMetaData> metadata) { |
| std::unique_ptr<ParquetFileReader::Contents> result( |
| new SerializedFile(std::move(source), props)); |
| |
| // Access private methods here, but otherwise unavailable |
| SerializedFile* file = static_cast<SerializedFile*>(result.get()); |
| |
| if (metadata == nullptr) { |
| // Validates magic bytes, parses metadata, and initializes the SchemaDescriptor |
| file->ParseMetaData(); |
| } else { |
| file->set_metadata(std::move(metadata)); |
| } |
| |
| return result; |
| } |
| |
| Future<std::unique_ptr<ParquetFileReader::Contents>> |
| ParquetFileReader::Contents::OpenAsync(std::shared_ptr<ArrowInputFile> source, |
| const ReaderProperties& props, |
| std::shared_ptr<FileMetaData> metadata) { |
| BEGIN_PARQUET_CATCH_EXCEPTIONS |
| std::unique_ptr<ParquetFileReader::Contents> result( |
| new SerializedFile(std::move(source), props)); |
| SerializedFile* file = static_cast<SerializedFile*>(result.get()); |
| if (metadata == nullptr) { |
| // TODO(ARROW-12259): workaround since we have Future<(move-only type)> |
| struct { |
| Result<std::unique_ptr<ParquetFileReader::Contents>> operator()() { |
| return std::move(result); |
| } |
| |
| std::unique_ptr<ParquetFileReader::Contents> result; |
| } Continuation; |
| Continuation.result = std::move(result); |
| return file->ParseMetaDataAsync().Then(std::move(Continuation)); |
| } else { |
| file->set_metadata(std::move(metadata)); |
| return Future<std::unique_ptr<ParquetFileReader::Contents>>::MakeFinished( |
| std::move(result)); |
| } |
| END_PARQUET_CATCH_EXCEPTIONS |
| } |
| |
| std::unique_ptr<ParquetFileReader> ParquetFileReader::Open( |
| std::shared_ptr<::arrow::io::RandomAccessFile> source, const ReaderProperties& props, |
| std::shared_ptr<FileMetaData> metadata) { |
| auto contents = SerializedFile::Open(std::move(source), props, std::move(metadata)); |
| std::unique_ptr<ParquetFileReader> result = std::make_unique<ParquetFileReader>(); |
| result->Open(std::move(contents)); |
| return result; |
| } |
| |
| std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile( |
| const std::string& path, bool memory_map, const ReaderProperties& props, |
| std::shared_ptr<FileMetaData> metadata) { |
| std::shared_ptr<::arrow::io::RandomAccessFile> source; |
| if (memory_map) { |
| PARQUET_ASSIGN_OR_THROW( |
| source, ::arrow::io::MemoryMappedFile::Open(path, ::arrow::io::FileMode::READ)); |
| } else { |
| PARQUET_ASSIGN_OR_THROW(source, |
| ::arrow::io::ReadableFile::Open(path, props.memory_pool())); |
| } |
| |
| return Open(std::move(source), props, std::move(metadata)); |
| } |
| |
| Future<std::unique_ptr<ParquetFileReader>> ParquetFileReader::OpenAsync( |
| std::shared_ptr<::arrow::io::RandomAccessFile> source, const ReaderProperties& props, |
| std::shared_ptr<FileMetaData> metadata) { |
| BEGIN_PARQUET_CATCH_EXCEPTIONS |
| auto fut = SerializedFile::OpenAsync(std::move(source), props, std::move(metadata)); |
| // TODO(ARROW-12259): workaround since we have Future<(move-only type)> |
| auto completed = Future<std::unique_ptr<ParquetFileReader>>::Make(); |
| fut.AddCallback( |
| [fut, completed]( |
| const Result<std::unique_ptr<ParquetFileReader::Contents>>& contents) mutable { |
| if (!contents.ok()) { |
| completed.MarkFinished(contents.status()); |
| return; |
| } |
| std::unique_ptr<ParquetFileReader> result = std::make_unique<ParquetFileReader>(); |
| result->Open(fut.MoveResult().MoveValueUnsafe()); |
| completed.MarkFinished(std::move(result)); |
| }); |
| return completed; |
| END_PARQUET_CATCH_EXCEPTIONS |
| } |
| |
| void ParquetFileReader::Open(std::unique_ptr<ParquetFileReader::Contents> contents) { |
| contents_ = std::move(contents); |
| } |
| |
| void ParquetFileReader::Close() { |
| if (contents_) { |
| contents_->Close(); |
| } |
| } |
| |
| std::shared_ptr<FileMetaData> ParquetFileReader::metadata() const { |
| return contents_->metadata(); |
| } |
| |
| std::shared_ptr<PageIndexReader> ParquetFileReader::GetPageIndexReader() { |
| return contents_->GetPageIndexReader(); |
| } |
| |
| BloomFilterReader& ParquetFileReader::GetBloomFilterReader() { |
| return contents_->GetBloomFilterReader(); |
| } |
| |
| std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) { |
| if (i >= metadata()->num_row_groups()) { |
| std::stringstream ss; |
| ss << "Trying to read row group " << i << " but file only has " |
| << metadata()->num_row_groups() << " row groups"; |
| throw ParquetException(ss.str()); |
| } |
| return contents_->GetRowGroup(i); |
| } |
| |
| void ParquetFileReader::PreBuffer(const std::vector<int>& row_groups, |
| const std::vector<int>& column_indices, |
| const ::arrow::io::IOContext& ctx, |
| const ::arrow::io::CacheOptions& options) { |
| // Access private methods here |
| SerializedFile* file = |
| ::arrow::internal::checked_cast<SerializedFile*>(contents_.get()); |
| file->PreBuffer(row_groups, column_indices, ctx, options); |
| } |
| |
| Result<std::vector<::arrow::io::ReadRange>> ParquetFileReader::GetReadRanges( |
| const std::vector<int>& row_groups, const std::vector<int>& column_indices, |
| int64_t hole_size_limit, int64_t range_size_limit) { |
| // Access private methods here |
| SerializedFile* file = |
| ::arrow::internal::checked_cast<SerializedFile*>(contents_.get()); |
| return file->GetReadRanges(row_groups, column_indices, hole_size_limit, |
| range_size_limit); |
| } |
| |
| Future<> ParquetFileReader::WhenBuffered(const std::vector<int>& row_groups, |
| const std::vector<int>& column_indices) const { |
| // Access private methods here |
| SerializedFile* file = |
| ::arrow::internal::checked_cast<SerializedFile*>(contents_.get()); |
| return file->WhenBuffered(row_groups, column_indices); |
| } |
| |
| // ---------------------------------------------------------------------- |
| // File metadata helpers |
| |
| std::shared_ptr<FileMetaData> ReadMetaData( |
| const std::shared_ptr<::arrow::io::RandomAccessFile>& source) { |
| return ParquetFileReader::Open(source)->metadata(); |
| } |
| |
| // ---------------------------------------------------------------------- |
| // File scanner for performance testing |
| |
| int64_t ScanFileContents(std::vector<int> columns, const int32_t column_batch_size, |
| ParquetFileReader* reader) { |
| std::vector<int16_t> rep_levels(column_batch_size); |
| std::vector<int16_t> def_levels(column_batch_size); |
| |
| int num_columns = static_cast<int>(columns.size()); |
| |
| // columns are not specified explicitly. Add all columns |
| if (columns.size() == 0) { |
| num_columns = reader->metadata()->num_columns(); |
| columns.resize(num_columns); |
| for (int i = 0; i < num_columns; i++) { |
| columns[i] = i; |
| } |
| } |
| if (num_columns == 0) { |
| // If we still have no columns(none in file), return early. The remainder of function |
| // expects there to be at least one column. |
| return 0; |
| } |
| |
| std::vector<int64_t> total_rows(num_columns, 0); |
| |
| for (int r = 0; r < reader->metadata()->num_row_groups(); ++r) { |
| auto group_reader = reader->RowGroup(r); |
| int col = 0; |
| for (auto i : columns) { |
| std::shared_ptr<ColumnReader> col_reader = group_reader->Column(i); |
| size_t value_byte_size = GetTypeByteSize(col_reader->descr()->physical_type()); |
| std::vector<uint8_t> values(column_batch_size * value_byte_size); |
| |
| int64_t values_read = 0; |
| while (col_reader->HasNext()) { |
| int64_t levels_read = |
| ScanAllValues(column_batch_size, def_levels.data(), rep_levels.data(), |
| values.data(), &values_read, col_reader.get()); |
| if (col_reader->descr()->max_repetition_level() > 0) { |
| for (size_t i = 0; i < static_cast<size_t>(levels_read); i++) { |
| if (rep_levels[i] == 0) { |
| total_rows[col]++; |
| } |
| } |
| } else { |
| total_rows[col] += levels_read; |
| } |
| } |
| col++; |
| } |
| } |
| |
| for (int i = 1; i < num_columns; ++i) { |
| if (total_rows[0] != total_rows[i]) { |
| throw ParquetException("Parquet error: Total rows among columns do not match"); |
| } |
| } |
| |
| return total_rows[0]; |
| } |
| |
| } // namespace parquet |