| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "parquet/metadata.h" |
| |
| #include <algorithm> |
| #include <cinttypes> |
| #include <ostream> |
| #include <string> |
| #include <string_view> |
| #include <utility> |
| #include <vector> |
| |
| #include "arrow/io/memory.h" |
| #include "arrow/util/key_value_metadata.h" |
| #include "arrow/util/logging.h" |
| #include "parquet/encryption/encryption_internal.h" |
| #include "parquet/encryption/internal_file_decryptor.h" |
| #include "parquet/exception.h" |
| #include "parquet/schema.h" |
| #include "parquet/schema_internal.h" |
| #include "parquet/thrift_internal.h" |
| |
| namespace parquet { |
| |
| const ApplicationVersion& ApplicationVersion::PARQUET_251_FIXED_VERSION() { |
| static ApplicationVersion version("parquet-mr", 1, 8, 0); |
| return version; |
| } |
| |
| const ApplicationVersion& ApplicationVersion::PARQUET_816_FIXED_VERSION() { |
| static ApplicationVersion version("parquet-mr", 1, 2, 9); |
| return version; |
| } |
| |
| const ApplicationVersion& ApplicationVersion::PARQUET_CPP_FIXED_STATS_VERSION() { |
| static ApplicationVersion version("parquet-cpp", 1, 3, 0); |
| return version; |
| } |
| |
| const ApplicationVersion& ApplicationVersion::PARQUET_MR_FIXED_STATS_VERSION() { |
| static ApplicationVersion version("parquet-mr", 1, 10, 0); |
| return version; |
| } |
| |
| const ApplicationVersion& ApplicationVersion::PARQUET_CPP_10353_FIXED_VERSION() { |
| // parquet-cpp versions released prior to Arrow 3.0 would write DataPageV2 pages |
| // with is_compressed==0 but still write compressed data. (See: ARROW-10353). |
| // Parquet 1.5.1 had this problem, and after that we switched to the |
| // application name "parquet-cpp-arrow", so this version is fake. |
| static ApplicationVersion version("parquet-cpp", 2, 0, 0); |
| return version; |
| } |
| |
| std::string ParquetVersionToString(ParquetVersion::type ver) { |
| switch (ver) { |
| case ParquetVersion::PARQUET_1_0: |
| return "1.0"; |
| ARROW_SUPPRESS_DEPRECATION_WARNING |
| case ParquetVersion::PARQUET_2_0: |
| return "pseudo-2.0"; |
| ARROW_UNSUPPRESS_DEPRECATION_WARNING |
| case ParquetVersion::PARQUET_2_4: |
| return "2.4"; |
| case ParquetVersion::PARQUET_2_6: |
| return "2.6"; |
| } |
| |
| // This should be unreachable |
| return "UNKNOWN"; |
| } |
| |
| template <typename DType> |
| static std::shared_ptr<Statistics> MakeTypedColumnStats( |
| const format::ColumnMetaData& metadata, const ColumnDescriptor* descr) { |
| // If ColumnOrder is defined, return max_value and min_value |
| if (descr->column_order().get_order() == ColumnOrder::TYPE_DEFINED_ORDER) { |
| return MakeStatistics<DType>( |
| descr, metadata.statistics.min_value, metadata.statistics.max_value, |
| metadata.num_values - metadata.statistics.null_count, |
| metadata.statistics.null_count, metadata.statistics.distinct_count, |
| metadata.statistics.__isset.max_value || metadata.statistics.__isset.min_value, |
| metadata.statistics.__isset.null_count, |
| metadata.statistics.__isset.distinct_count); |
| } |
| // Default behavior |
| return MakeStatistics<DType>( |
| descr, metadata.statistics.min, metadata.statistics.max, |
| metadata.num_values - metadata.statistics.null_count, |
| metadata.statistics.null_count, metadata.statistics.distinct_count, |
| metadata.statistics.__isset.max || metadata.statistics.__isset.min, |
| metadata.statistics.__isset.null_count, metadata.statistics.__isset.distinct_count); |
| } |
| |
| std::shared_ptr<Statistics> MakeColumnStats(const format::ColumnMetaData& meta_data, |
| const ColumnDescriptor* descr) { |
| switch (static_cast<Type::type>(meta_data.type)) { |
| case Type::BOOLEAN: |
| return MakeTypedColumnStats<BooleanType>(meta_data, descr); |
| case Type::INT32: |
| return MakeTypedColumnStats<Int32Type>(meta_data, descr); |
| case Type::INT64: |
| return MakeTypedColumnStats<Int64Type>(meta_data, descr); |
| case Type::INT96: |
| return MakeTypedColumnStats<Int96Type>(meta_data, descr); |
| case Type::DOUBLE: |
| return MakeTypedColumnStats<DoubleType>(meta_data, descr); |
| case Type::FLOAT: |
| return MakeTypedColumnStats<FloatType>(meta_data, descr); |
| case Type::BYTE_ARRAY: |
| return MakeTypedColumnStats<ByteArrayType>(meta_data, descr); |
| case Type::FIXED_LEN_BYTE_ARRAY: |
| return MakeTypedColumnStats<FLBAType>(meta_data, descr); |
| case Type::UNDEFINED: |
| break; |
| } |
| throw ParquetException("Can't decode page statistics for selected column type"); |
| } |
| |
| // MetaData Accessor |
| |
| // ColumnCryptoMetaData |
| class ColumnCryptoMetaData::ColumnCryptoMetaDataImpl { |
| public: |
| explicit ColumnCryptoMetaDataImpl(const format::ColumnCryptoMetaData* crypto_metadata) |
| : crypto_metadata_(crypto_metadata) {} |
| |
| bool encrypted_with_footer_key() const { |
| return crypto_metadata_->__isset.ENCRYPTION_WITH_FOOTER_KEY; |
| } |
| bool encrypted_with_column_key() const { |
| return crypto_metadata_->__isset.ENCRYPTION_WITH_COLUMN_KEY; |
| } |
| std::shared_ptr<schema::ColumnPath> path_in_schema() const { |
| return std::make_shared<schema::ColumnPath>( |
| crypto_metadata_->ENCRYPTION_WITH_COLUMN_KEY.path_in_schema); |
| } |
| const std::string& key_metadata() const { |
| return crypto_metadata_->ENCRYPTION_WITH_COLUMN_KEY.key_metadata; |
| } |
| |
| private: |
| const format::ColumnCryptoMetaData* crypto_metadata_; |
| }; |
| |
| std::unique_ptr<ColumnCryptoMetaData> ColumnCryptoMetaData::Make( |
| const uint8_t* metadata) { |
| return std::unique_ptr<ColumnCryptoMetaData>(new ColumnCryptoMetaData(metadata)); |
| } |
| |
| ColumnCryptoMetaData::ColumnCryptoMetaData(const uint8_t* metadata) |
| : impl_(std::make_unique<ColumnCryptoMetaDataImpl>( |
| reinterpret_cast<const format::ColumnCryptoMetaData*>(metadata))) {} |
| |
| ColumnCryptoMetaData::~ColumnCryptoMetaData() = default; |
| |
| std::shared_ptr<schema::ColumnPath> ColumnCryptoMetaData::path_in_schema() const { |
| return impl_->path_in_schema(); |
| } |
| bool ColumnCryptoMetaData::encrypted_with_footer_key() const { |
| return impl_->encrypted_with_footer_key(); |
| } |
| const std::string& ColumnCryptoMetaData::key_metadata() const { |
| return impl_->key_metadata(); |
| } |
| |
| // ColumnChunk metadata |
| class ColumnChunkMetaData::ColumnChunkMetaDataImpl { |
| public: |
| explicit ColumnChunkMetaDataImpl(const format::ColumnChunk* column, |
| const ColumnDescriptor* descr, |
| int16_t row_group_ordinal, int16_t column_ordinal, |
| const ReaderProperties& properties, |
| const ApplicationVersion* writer_version, |
| std::shared_ptr<InternalFileDecryptor> file_decryptor) |
| : column_(column), |
| descr_(descr), |
| properties_(properties), |
| writer_version_(writer_version) { |
| column_metadata_ = &column->meta_data; |
| if (column->__isset.crypto_metadata) { // column metadata is encrypted |
| format::ColumnCryptoMetaData ccmd = column->crypto_metadata; |
| |
| if (ccmd.__isset.ENCRYPTION_WITH_COLUMN_KEY) { |
| if (file_decryptor != nullptr && file_decryptor->properties() != nullptr) { |
| // should decrypt metadata |
| std::shared_ptr<schema::ColumnPath> path = std::make_shared<schema::ColumnPath>( |
| ccmd.ENCRYPTION_WITH_COLUMN_KEY.path_in_schema); |
| std::string key_metadata = ccmd.ENCRYPTION_WITH_COLUMN_KEY.key_metadata; |
| |
| std::string aad_column_metadata = encryption::CreateModuleAad( |
| file_decryptor->file_aad(), encryption::kColumnMetaData, row_group_ordinal, |
| column_ordinal, static_cast<int16_t>(-1)); |
| auto decryptor = file_decryptor->GetColumnMetaDecryptor( |
| path->ToDotString(), key_metadata, aad_column_metadata); |
| auto len = static_cast<uint32_t>(column->encrypted_column_metadata.size()); |
| ThriftDeserializer deserializer(properties_); |
| deserializer.DeserializeMessage( |
| reinterpret_cast<const uint8_t*>(column->encrypted_column_metadata.c_str()), |
| &len, &decrypted_metadata_, decryptor); |
| column_metadata_ = &decrypted_metadata_; |
| } else { |
| throw ParquetException( |
| "Cannot decrypt ColumnMetadata." |
| " FileDecryption is not setup correctly"); |
| } |
| } |
| } |
| for (const auto& encoding : column_metadata_->encodings) { |
| encodings_.push_back(LoadEnumSafe(&encoding)); |
| } |
| for (const auto& encoding_stats : column_metadata_->encoding_stats) { |
| encoding_stats_.push_back({LoadEnumSafe(&encoding_stats.page_type), |
| LoadEnumSafe(&encoding_stats.encoding), |
| encoding_stats.count}); |
| } |
| possible_stats_ = nullptr; |
| } |
| |
| bool Equals(const ColumnChunkMetaDataImpl& other) const { |
| return *column_metadata_ == *other.column_metadata_; |
| } |
| |
| // column chunk |
| inline int64_t file_offset() const { return column_->file_offset; } |
| inline const std::string& file_path() const { return column_->file_path; } |
| |
| inline Type::type type() const { return LoadEnumSafe(&column_metadata_->type); } |
| |
| inline int64_t num_values() const { return column_metadata_->num_values; } |
| |
| std::shared_ptr<schema::ColumnPath> path_in_schema() { |
| return std::make_shared<schema::ColumnPath>(column_metadata_->path_in_schema); |
| } |
| |
| // Check if statistics are set and are valid |
| // 1) Must be set in the metadata |
| // 2) Statistics must not be corrupted |
| inline bool is_stats_set() const { |
| DCHECK(writer_version_ != nullptr); |
| // If the column statistics don't exist or column sort order is unknown |
| // we cannot use the column stats |
| if (!column_metadata_->__isset.statistics || |
| descr_->sort_order() == SortOrder::UNKNOWN) { |
| return false; |
| } |
| if (possible_stats_ == nullptr) { |
| possible_stats_ = MakeColumnStats(*column_metadata_, descr_); |
| } |
| EncodedStatistics encodedStatistics = possible_stats_->Encode(); |
| return writer_version_->HasCorrectStatistics(type(), encodedStatistics, |
| descr_->sort_order()); |
| } |
| |
| inline std::shared_ptr<Statistics> statistics() const { |
| return is_stats_set() ? possible_stats_ : nullptr; |
| } |
| |
| inline Compression::type compression() const { |
| return LoadEnumSafe(&column_metadata_->codec); |
| } |
| |
| const std::vector<Encoding::type>& encodings() const { return encodings_; } |
| |
| const std::vector<PageEncodingStats>& encoding_stats() const { return encoding_stats_; } |
| |
| inline std::optional<int64_t> bloom_filter_offset() const { |
| if (column_metadata_->__isset.bloom_filter_offset) { |
| return column_metadata_->bloom_filter_offset; |
| } |
| return std::nullopt; |
| } |
| |
| inline bool has_dictionary_page() const { |
| return column_metadata_->__isset.dictionary_page_offset; |
| } |
| |
| inline int64_t dictionary_page_offset() const { |
| return column_metadata_->dictionary_page_offset; |
| } |
| |
| inline int64_t data_page_offset() const { return column_metadata_->data_page_offset; } |
| |
| inline bool has_index_page() const { |
| return column_metadata_->__isset.index_page_offset; |
| } |
| |
| inline int64_t index_page_offset() const { return column_metadata_->index_page_offset; } |
| |
| inline int64_t total_compressed_size() const { |
| return column_metadata_->total_compressed_size; |
| } |
| |
| inline int64_t total_uncompressed_size() const { |
| return column_metadata_->total_uncompressed_size; |
| } |
| |
| inline std::unique_ptr<ColumnCryptoMetaData> crypto_metadata() const { |
| if (column_->__isset.crypto_metadata) { |
| return ColumnCryptoMetaData::Make( |
| reinterpret_cast<const uint8_t*>(&column_->crypto_metadata)); |
| } else { |
| return nullptr; |
| } |
| } |
| |
| std::optional<IndexLocation> GetColumnIndexLocation() const { |
| if (column_->__isset.column_index_offset && column_->__isset.column_index_length) { |
| return IndexLocation{column_->column_index_offset, column_->column_index_length}; |
| } |
| return std::nullopt; |
| } |
| |
| std::optional<IndexLocation> GetOffsetIndexLocation() const { |
| if (column_->__isset.offset_index_offset && column_->__isset.offset_index_length) { |
| return IndexLocation{column_->offset_index_offset, column_->offset_index_length}; |
| } |
| return std::nullopt; |
| } |
| |
| private: |
| mutable std::shared_ptr<Statistics> possible_stats_; |
| std::vector<Encoding::type> encodings_; |
| std::vector<PageEncodingStats> encoding_stats_; |
| const format::ColumnChunk* column_; |
| const format::ColumnMetaData* column_metadata_; |
| format::ColumnMetaData decrypted_metadata_; |
| const ColumnDescriptor* descr_; |
| const ReaderProperties properties_; |
| const ApplicationVersion* writer_version_; |
| }; |
| |
| std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make( |
| const void* metadata, const ColumnDescriptor* descr, |
| const ReaderProperties& properties, const ApplicationVersion* writer_version, |
| int16_t row_group_ordinal, int16_t column_ordinal, |
| std::shared_ptr<InternalFileDecryptor> file_decryptor) { |
| return std::unique_ptr<ColumnChunkMetaData>( |
| new ColumnChunkMetaData(metadata, descr, row_group_ordinal, column_ordinal, |
| properties, writer_version, std::move(file_decryptor))); |
| } |
| |
| std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make( |
| const void* metadata, const ColumnDescriptor* descr, |
| const ApplicationVersion* writer_version, int16_t row_group_ordinal, |
| int16_t column_ordinal, std::shared_ptr<InternalFileDecryptor> file_decryptor) { |
| return std::unique_ptr<ColumnChunkMetaData>(new ColumnChunkMetaData( |
| metadata, descr, row_group_ordinal, column_ordinal, default_reader_properties(), |
| writer_version, std::move(file_decryptor))); |
| } |
| |
| ColumnChunkMetaData::ColumnChunkMetaData( |
| const void* metadata, const ColumnDescriptor* descr, int16_t row_group_ordinal, |
| int16_t column_ordinal, const ReaderProperties& properties, |
| const ApplicationVersion* writer_version, |
| std::shared_ptr<InternalFileDecryptor> file_decryptor) |
| : impl_{new ColumnChunkMetaDataImpl( |
| reinterpret_cast<const format::ColumnChunk*>(metadata), descr, |
| row_group_ordinal, column_ordinal, properties, writer_version, |
| std::move(file_decryptor))} {} |
| |
| ColumnChunkMetaData::~ColumnChunkMetaData() = default; |
| |
| // column chunk |
| int64_t ColumnChunkMetaData::file_offset() const { return impl_->file_offset(); } |
| |
| const std::string& ColumnChunkMetaData::file_path() const { return impl_->file_path(); } |
| |
| Type::type ColumnChunkMetaData::type() const { return impl_->type(); } |
| |
| int64_t ColumnChunkMetaData::num_values() const { return impl_->num_values(); } |
| |
| std::shared_ptr<schema::ColumnPath> ColumnChunkMetaData::path_in_schema() const { |
| return impl_->path_in_schema(); |
| } |
| |
| std::shared_ptr<Statistics> ColumnChunkMetaData::statistics() const { |
| return impl_->statistics(); |
| } |
| |
| bool ColumnChunkMetaData::is_stats_set() const { return impl_->is_stats_set(); } |
| |
| std::optional<int64_t> ColumnChunkMetaData::bloom_filter_offset() const { |
| return impl_->bloom_filter_offset(); |
| } |
| |
| bool ColumnChunkMetaData::has_dictionary_page() const { |
| return impl_->has_dictionary_page(); |
| } |
| |
| int64_t ColumnChunkMetaData::dictionary_page_offset() const { |
| return impl_->dictionary_page_offset(); |
| } |
| |
| int64_t ColumnChunkMetaData::data_page_offset() const { |
| return impl_->data_page_offset(); |
| } |
| |
| bool ColumnChunkMetaData::has_index_page() const { return impl_->has_index_page(); } |
| |
| int64_t ColumnChunkMetaData::index_page_offset() const { |
| return impl_->index_page_offset(); |
| } |
| |
| Compression::type ColumnChunkMetaData::compression() const { |
| return impl_->compression(); |
| } |
| |
| bool ColumnChunkMetaData::can_decompress() const { |
| return ::arrow::util::Codec::IsAvailable(compression()); |
| } |
| |
| const std::vector<Encoding::type>& ColumnChunkMetaData::encodings() const { |
| return impl_->encodings(); |
| } |
| |
| const std::vector<PageEncodingStats>& ColumnChunkMetaData::encoding_stats() const { |
| return impl_->encoding_stats(); |
| } |
| |
| int64_t ColumnChunkMetaData::total_uncompressed_size() const { |
| return impl_->total_uncompressed_size(); |
| } |
| |
| int64_t ColumnChunkMetaData::total_compressed_size() const { |
| return impl_->total_compressed_size(); |
| } |
| |
| std::unique_ptr<ColumnCryptoMetaData> ColumnChunkMetaData::crypto_metadata() const { |
| return impl_->crypto_metadata(); |
| } |
| |
| std::optional<IndexLocation> ColumnChunkMetaData::GetColumnIndexLocation() const { |
| return impl_->GetColumnIndexLocation(); |
| } |
| |
| std::optional<IndexLocation> ColumnChunkMetaData::GetOffsetIndexLocation() const { |
| return impl_->GetOffsetIndexLocation(); |
| } |
| |
| bool ColumnChunkMetaData::Equals(const ColumnChunkMetaData& other) const { |
| return impl_->Equals(*other.impl_); |
| } |
| |
| // row-group metadata |
| class RowGroupMetaData::RowGroupMetaDataImpl { |
| public: |
| explicit RowGroupMetaDataImpl(const format::RowGroup* row_group, |
| const SchemaDescriptor* schema, |
| const ReaderProperties& properties, |
| const ApplicationVersion* writer_version, |
| std::shared_ptr<InternalFileDecryptor> file_decryptor) |
| : row_group_(row_group), |
| schema_(schema), |
| properties_(properties), |
| writer_version_(writer_version), |
| file_decryptor_(std::move(file_decryptor)) { |
| if (ARROW_PREDICT_FALSE(row_group_->columns.size() > |
| static_cast<size_t>(std::numeric_limits<int>::max()))) { |
| throw ParquetException("Row group had too many columns: ", |
| row_group_->columns.size()); |
| } |
| } |
| |
| bool Equals(const RowGroupMetaDataImpl& other) const { |
| return *row_group_ == *other.row_group_; |
| } |
| |
| inline int num_columns() const { return static_cast<int>(row_group_->columns.size()); } |
| |
| inline int64_t num_rows() const { return row_group_->num_rows; } |
| |
| inline int64_t total_byte_size() const { return row_group_->total_byte_size; } |
| |
| inline int64_t total_compressed_size() const { |
| return row_group_->total_compressed_size; |
| } |
| |
| inline int64_t file_offset() const { return row_group_->file_offset; } |
| |
| inline const SchemaDescriptor* schema() const { return schema_; } |
| |
| std::unique_ptr<ColumnChunkMetaData> ColumnChunk(int i) { |
| if (i >= 0 && i < num_columns()) { |
| return ColumnChunkMetaData::Make(&row_group_->columns[i], schema_->Column(i), |
| properties_, writer_version_, row_group_->ordinal, |
| i, file_decryptor_); |
| } |
| throw ParquetException("The file only has ", num_columns(), |
| " columns, requested metadata for column: ", i); |
| } |
| |
| std::vector<SortingColumn> sorting_columns() const { |
| std::vector<SortingColumn> sorting_columns; |
| if (!row_group_->__isset.sorting_columns) { |
| return sorting_columns; |
| } |
| sorting_columns.resize(row_group_->sorting_columns.size()); |
| for (size_t i = 0; i < sorting_columns.size(); ++i) { |
| sorting_columns[i] = FromThrift(row_group_->sorting_columns[i]); |
| } |
| return sorting_columns; |
| } |
| |
| private: |
| const format::RowGroup* row_group_; |
| const SchemaDescriptor* schema_; |
| const ReaderProperties properties_; |
| const ApplicationVersion* writer_version_; |
| std::shared_ptr<InternalFileDecryptor> file_decryptor_; |
| }; |
| |
| std::unique_ptr<RowGroupMetaData> RowGroupMetaData::Make( |
| const void* metadata, const SchemaDescriptor* schema, |
| const ApplicationVersion* writer_version, |
| std::shared_ptr<InternalFileDecryptor> file_decryptor) { |
| return std::unique_ptr<parquet::RowGroupMetaData>( |
| new RowGroupMetaData(metadata, schema, default_reader_properties(), writer_version, |
| std::move(file_decryptor))); |
| } |
| |
| std::unique_ptr<RowGroupMetaData> RowGroupMetaData::Make( |
| const void* metadata, const SchemaDescriptor* schema, |
| const ReaderProperties& properties, const ApplicationVersion* writer_version, |
| std::shared_ptr<InternalFileDecryptor> file_decryptor) { |
| return std::unique_ptr<parquet::RowGroupMetaData>(new RowGroupMetaData( |
| metadata, schema, properties, writer_version, std::move(file_decryptor))); |
| } |
| |
| RowGroupMetaData::RowGroupMetaData(const void* metadata, const SchemaDescriptor* schema, |
| const ReaderProperties& properties, |
| const ApplicationVersion* writer_version, |
| std::shared_ptr<InternalFileDecryptor> file_decryptor) |
| : impl_{new RowGroupMetaDataImpl(reinterpret_cast<const format::RowGroup*>(metadata), |
| schema, properties, writer_version, |
| std::move(file_decryptor))} {} |
| |
| RowGroupMetaData::~RowGroupMetaData() = default; |
| |
| bool RowGroupMetaData::Equals(const RowGroupMetaData& other) const { |
| return impl_->Equals(*other.impl_); |
| } |
| |
| int RowGroupMetaData::num_columns() const { return impl_->num_columns(); } |
| |
| int64_t RowGroupMetaData::num_rows() const { return impl_->num_rows(); } |
| |
| int64_t RowGroupMetaData::total_byte_size() const { return impl_->total_byte_size(); } |
| |
| int64_t RowGroupMetaData::total_compressed_size() const { |
| return impl_->total_compressed_size(); |
| } |
| |
| int64_t RowGroupMetaData::file_offset() const { return impl_->file_offset(); } |
| |
| const SchemaDescriptor* RowGroupMetaData::schema() const { return impl_->schema(); } |
| |
| std::unique_ptr<ColumnChunkMetaData> RowGroupMetaData::ColumnChunk(int i) const { |
| return impl_->ColumnChunk(i); |
| } |
| |
| bool RowGroupMetaData::can_decompress() const { |
| int n_columns = num_columns(); |
| for (int i = 0; i < n_columns; i++) { |
| if (!ColumnChunk(i)->can_decompress()) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| std::vector<SortingColumn> RowGroupMetaData::sorting_columns() const { |
| return impl_->sorting_columns(); |
| } |
| |
| // file metadata |
| class FileMetaData::FileMetaDataImpl { |
| public: |
| FileMetaDataImpl() = default; |
| |
| explicit FileMetaDataImpl( |
| const void* metadata, uint32_t* metadata_len, ReaderProperties properties, |
| std::shared_ptr<InternalFileDecryptor> file_decryptor = nullptr) |
| : properties_(std::move(properties)), file_decryptor_(std::move(file_decryptor)) { |
| metadata_ = std::make_unique<format::FileMetaData>(); |
| |
| auto footer_decryptor = |
| file_decryptor_ != nullptr ? file_decryptor_->GetFooterDecryptor() : nullptr; |
| |
| ThriftDeserializer deserializer(properties_); |
| deserializer.DeserializeMessage(reinterpret_cast<const uint8_t*>(metadata), |
| metadata_len, metadata_.get(), footer_decryptor); |
| metadata_len_ = *metadata_len; |
| |
| if (metadata_->__isset.created_by) { |
| writer_version_ = ApplicationVersion(metadata_->created_by); |
| } else { |
| writer_version_ = ApplicationVersion("unknown 0.0.0"); |
| } |
| |
| InitSchema(); |
| InitColumnOrders(); |
| InitKeyValueMetadata(); |
| } |
| |
| bool VerifySignature(const void* signature) { |
| // verify decryption properties are set |
| if (file_decryptor_ == nullptr) { |
| throw ParquetException("Decryption not set properly. cannot verify signature"); |
| } |
| // serialize the footer |
| uint8_t* serialized_data; |
| uint32_t serialized_len = metadata_len_; |
| ThriftSerializer serializer; |
| serializer.SerializeToBuffer(metadata_.get(), &serialized_len, &serialized_data); |
| |
| // encrypt with nonce |
| auto nonce = const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(signature)); |
| auto tag = const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(signature)) + |
| encryption::kNonceLength; |
| |
| std::string key = file_decryptor_->GetFooterKey(); |
| std::string aad = encryption::CreateFooterAad(file_decryptor_->file_aad()); |
| |
| auto aes_encryptor = encryption::AesEncryptor::Make( |
| file_decryptor_->algorithm(), static_cast<int>(key.size()), true, |
| false /*write_length*/, nullptr); |
| |
| std::shared_ptr<Buffer> encrypted_buffer = std::static_pointer_cast<ResizableBuffer>( |
| AllocateBuffer(file_decryptor_->pool(), |
| aes_encryptor->CiphertextSizeDelta() + serialized_len)); |
| uint32_t encrypted_len = aes_encryptor->SignedFooterEncrypt( |
| serialized_data, serialized_len, str2bytes(key), static_cast<int>(key.size()), |
| str2bytes(aad), static_cast<int>(aad.size()), nonce, |
| encrypted_buffer->mutable_data()); |
| // Delete AES encryptor object. It was created only to verify the footer signature. |
| aes_encryptor->WipeOut(); |
| delete aes_encryptor; |
| return 0 == |
| memcmp(encrypted_buffer->data() + encrypted_len - encryption::kGcmTagLength, |
| tag, encryption::kGcmTagLength); |
| } |
| |
| inline uint32_t size() const { return metadata_len_; } |
| inline int num_columns() const { return schema_.num_columns(); } |
| inline int64_t num_rows() const { return metadata_->num_rows; } |
| inline int num_row_groups() const { |
| return static_cast<int>(metadata_->row_groups.size()); |
| } |
| inline int32_t version() const { return metadata_->version; } |
| inline const std::string& created_by() const { return metadata_->created_by; } |
| inline int num_schema_elements() const { |
| return static_cast<int>(metadata_->schema.size()); |
| } |
| |
| inline bool is_encryption_algorithm_set() const { |
| return metadata_->__isset.encryption_algorithm; |
| } |
| inline EncryptionAlgorithm encryption_algorithm() { |
| return FromThrift(metadata_->encryption_algorithm); |
| } |
| inline const std::string& footer_signing_key_metadata() { |
| return metadata_->footer_signing_key_metadata; |
| } |
| |
| const ApplicationVersion& writer_version() const { return writer_version_; } |
| |
| void WriteTo(::arrow::io::OutputStream* dst, |
| const std::shared_ptr<Encryptor>& encryptor) const { |
| ThriftSerializer serializer; |
| // Only in encrypted files with plaintext footers the |
| // encryption_algorithm is set in footer |
| if (is_encryption_algorithm_set()) { |
| uint8_t* serialized_data; |
| uint32_t serialized_len; |
| serializer.SerializeToBuffer(metadata_.get(), &serialized_len, &serialized_data); |
| |
| // encrypt the footer key |
| std::vector<uint8_t> encrypted_data(encryptor->CiphertextSizeDelta() + |
| serialized_len); |
| unsigned encrypted_len = |
| encryptor->Encrypt(serialized_data, serialized_len, encrypted_data.data()); |
| |
| // write unencrypted footer |
| PARQUET_THROW_NOT_OK(dst->Write(serialized_data, serialized_len)); |
| // Write signature (nonce and tag) |
| PARQUET_THROW_NOT_OK( |
| dst->Write(encrypted_data.data() + 4, encryption::kNonceLength)); |
| PARQUET_THROW_NOT_OK( |
| dst->Write(encrypted_data.data() + encrypted_len - encryption::kGcmTagLength, |
| encryption::kGcmTagLength)); |
| } else { // either plaintext file (when encryptor is null) |
| // or encrypted file with encrypted footer |
| serializer.Serialize(metadata_.get(), dst, encryptor); |
| } |
| } |
| |
| std::unique_ptr<RowGroupMetaData> RowGroup(int i) { |
| if (!(i >= 0 && i < num_row_groups())) { |
| std::stringstream ss; |
| ss << "The file only has " << num_row_groups() |
| << " row groups, requested metadata for row group: " << i; |
| throw ParquetException(ss.str()); |
| } |
| return RowGroupMetaData::Make(&metadata_->row_groups[i], &schema_, properties_, |
| &writer_version_, file_decryptor_); |
| } |
| |
| bool Equals(const FileMetaDataImpl& other) const { |
| return *metadata_ == *other.metadata_; |
| } |
| |
| const SchemaDescriptor* schema() const { return &schema_; } |
| |
| const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const { |
| return key_value_metadata_; |
| } |
| |
| void set_file_path(const std::string& path) { |
| for (format::RowGroup& row_group : metadata_->row_groups) { |
| for (format::ColumnChunk& chunk : row_group.columns) { |
| chunk.__set_file_path(path); |
| } |
| } |
| } |
| |
| format::RowGroup& row_group(int i) { |
| if (!(i >= 0 && i < num_row_groups())) { |
| std::stringstream ss; |
| ss << "The file only has " << num_row_groups() |
| << " row groups, requested metadata for row group: " << i; |
| throw ParquetException(ss.str()); |
| } |
| return metadata_->row_groups[i]; |
| } |
| |
| void AppendRowGroups(const std::unique_ptr<FileMetaDataImpl>& other) { |
| std::ostringstream diff_output; |
| if (!schema()->Equals(*other->schema(), &diff_output)) { |
| auto msg = "AppendRowGroups requires equal schemas.\n" + diff_output.str(); |
| throw ParquetException(msg); |
| } |
| |
| // ARROW-13654: `other` may point to self, be careful not to enter an infinite loop |
| const int n = other->num_row_groups(); |
| // ARROW-16613: do not use reserve() as that may suppress overallocation |
| // and incur O(n²) behavior on repeated calls to AppendRowGroups(). |
| // (see https://en.cppreference.com/w/cpp/container/vector/reserve |
| // about inappropriate uses of reserve()). |
| const auto start = metadata_->row_groups.size(); |
| metadata_->row_groups.resize(start + n); |
| for (int i = 0; i < n; i++) { |
| metadata_->row_groups[start + i] = other->row_group(i); |
| metadata_->num_rows += metadata_->row_groups[start + i].num_rows; |
| } |
| } |
| |
| std::shared_ptr<FileMetaData> Subset(const std::vector<int>& row_groups) { |
| for (int i : row_groups) { |
| if (i < num_row_groups()) continue; |
| |
| throw ParquetException( |
| "The file only has ", num_row_groups(), |
| " row groups, but requested a subset including row group: ", i); |
| } |
| |
| std::shared_ptr<FileMetaData> out(new FileMetaData()); |
| out->impl_ = std::make_unique<FileMetaDataImpl>(); |
| out->impl_->metadata_ = std::make_unique<format::FileMetaData>(); |
| |
| auto metadata = out->impl_->metadata_.get(); |
| metadata->version = metadata_->version; |
| metadata->schema = metadata_->schema; |
| |
| metadata->row_groups.resize(row_groups.size()); |
| int i = 0; |
| for (int selected_index : row_groups) { |
| metadata->num_rows += row_group(selected_index).num_rows; |
| metadata->row_groups[i++] = row_group(selected_index); |
| } |
| |
| metadata->key_value_metadata = metadata_->key_value_metadata; |
| metadata->created_by = metadata_->created_by; |
| metadata->column_orders = metadata_->column_orders; |
| metadata->encryption_algorithm = metadata_->encryption_algorithm; |
| metadata->footer_signing_key_metadata = metadata_->footer_signing_key_metadata; |
| metadata->__isset = metadata_->__isset; |
| |
| out->impl_->schema_ = schema_; |
| out->impl_->writer_version_ = writer_version_; |
| out->impl_->key_value_metadata_ = key_value_metadata_; |
| out->impl_->file_decryptor_ = file_decryptor_; |
| |
| return out; |
| } |
| |
| void set_file_decryptor(std::shared_ptr<InternalFileDecryptor> file_decryptor) { |
| file_decryptor_ = file_decryptor; |
| } |
| |
| private: |
| friend FileMetaDataBuilder; |
| uint32_t metadata_len_ = 0; |
| std::unique_ptr<format::FileMetaData> metadata_; |
| SchemaDescriptor schema_; |
| ApplicationVersion writer_version_; |
| std::shared_ptr<const KeyValueMetadata> key_value_metadata_; |
| const ReaderProperties properties_; |
| std::shared_ptr<InternalFileDecryptor> file_decryptor_; |
| |
| void InitSchema() { |
| if (metadata_->schema.empty()) { |
| throw ParquetException("Empty file schema (no root)"); |
| } |
| schema_.Init(schema::Unflatten(&metadata_->schema[0], |
| static_cast<int>(metadata_->schema.size()))); |
| } |
| |
| void InitColumnOrders() { |
| // update ColumnOrder |
| std::vector<parquet::ColumnOrder> column_orders; |
| if (metadata_->__isset.column_orders) { |
| column_orders.reserve(metadata_->column_orders.size()); |
| for (auto column_order : metadata_->column_orders) { |
| if (column_order.__isset.TYPE_ORDER) { |
| column_orders.push_back(ColumnOrder::type_defined_); |
| } else { |
| column_orders.push_back(ColumnOrder::undefined_); |
| } |
| } |
| } else { |
| column_orders.resize(schema_.num_columns(), ColumnOrder::undefined_); |
| } |
| |
| schema_.updateColumnOrders(column_orders); |
| } |
| |
| void InitKeyValueMetadata() { |
| std::shared_ptr<KeyValueMetadata> metadata = nullptr; |
| if (metadata_->__isset.key_value_metadata) { |
| metadata = std::make_shared<KeyValueMetadata>(); |
| for (const auto& it : metadata_->key_value_metadata) { |
| metadata->Append(it.key, it.value); |
| } |
| } |
| key_value_metadata_ = std::move(metadata); |
| } |
| }; |
| |
| std::shared_ptr<FileMetaData> FileMetaData::Make( |
| const void* metadata, uint32_t* metadata_len, const ReaderProperties& properties, |
| std::shared_ptr<InternalFileDecryptor> file_decryptor) { |
| // This FileMetaData ctor is private, not compatible with std::make_shared |
| return std::shared_ptr<FileMetaData>( |
| new FileMetaData(metadata, metadata_len, properties, std::move(file_decryptor))); |
| } |
| |
| std::shared_ptr<FileMetaData> FileMetaData::Make( |
| const void* metadata, uint32_t* metadata_len, |
| std::shared_ptr<InternalFileDecryptor> file_decryptor) { |
| return std::shared_ptr<FileMetaData>(new FileMetaData( |
| metadata, metadata_len, default_reader_properties(), file_decryptor)); |
| } |
| |
| FileMetaData::FileMetaData(const void* metadata, uint32_t* metadata_len, |
| const ReaderProperties& properties, |
| std::shared_ptr<InternalFileDecryptor> file_decryptor) |
| : impl_(new FileMetaDataImpl(metadata, metadata_len, properties, file_decryptor)) {} |
| |
| FileMetaData::FileMetaData() : impl_(new FileMetaDataImpl()) {} |
| |
| FileMetaData::~FileMetaData() = default; |
| |
| bool FileMetaData::Equals(const FileMetaData& other) const { |
| return impl_->Equals(*other.impl_); |
| } |
| |
| std::unique_ptr<RowGroupMetaData> FileMetaData::RowGroup(int i) const { |
| return impl_->RowGroup(i); |
| } |
| |
| bool FileMetaData::VerifySignature(const void* signature) { |
| return impl_->VerifySignature(signature); |
| } |
| |
| uint32_t FileMetaData::size() const { return impl_->size(); } |
| |
| int FileMetaData::num_columns() const { return impl_->num_columns(); } |
| |
| int64_t FileMetaData::num_rows() const { return impl_->num_rows(); } |
| |
| int FileMetaData::num_row_groups() const { return impl_->num_row_groups(); } |
| |
| bool FileMetaData::can_decompress() const { |
| int n_row_groups = num_row_groups(); |
| for (int i = 0; i < n_row_groups; i++) { |
| if (!RowGroup(i)->can_decompress()) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| bool FileMetaData::is_encryption_algorithm_set() const { |
| return impl_->is_encryption_algorithm_set(); |
| } |
| |
| EncryptionAlgorithm FileMetaData::encryption_algorithm() const { |
| return impl_->encryption_algorithm(); |
| } |
| |
| const std::string& FileMetaData::footer_signing_key_metadata() const { |
| return impl_->footer_signing_key_metadata(); |
| } |
| |
| void FileMetaData::set_file_decryptor( |
| std::shared_ptr<InternalFileDecryptor> file_decryptor) { |
| impl_->set_file_decryptor(file_decryptor); |
| } |
| |
| ParquetVersion::type FileMetaData::version() const { |
| switch (impl_->version()) { |
| case 1: |
| return ParquetVersion::PARQUET_1_0; |
| case 2: |
| return ParquetVersion::PARQUET_2_LATEST; |
| default: |
| // Improperly set version, assuming Parquet 1.0 |
| break; |
| } |
| return ParquetVersion::PARQUET_1_0; |
| } |
| |
| const ApplicationVersion& FileMetaData::writer_version() const { |
| return impl_->writer_version(); |
| } |
| |
| const std::string& FileMetaData::created_by() const { return impl_->created_by(); } |
| |
| int FileMetaData::num_schema_elements() const { return impl_->num_schema_elements(); } |
| |
| const SchemaDescriptor* FileMetaData::schema() const { return impl_->schema(); } |
| |
| const std::shared_ptr<const KeyValueMetadata>& FileMetaData::key_value_metadata() const { |
| return impl_->key_value_metadata(); |
| } |
| |
| void FileMetaData::set_file_path(const std::string& path) { impl_->set_file_path(path); } |
| |
| void FileMetaData::AppendRowGroups(const FileMetaData& other) { |
| impl_->AppendRowGroups(other.impl_); |
| } |
| |
| std::shared_ptr<FileMetaData> FileMetaData::Subset( |
| const std::vector<int>& row_groups) const { |
| return impl_->Subset(row_groups); |
| } |
| |
| void FileMetaData::WriteTo(::arrow::io::OutputStream* dst, |
| const std::shared_ptr<Encryptor>& encryptor) const { |
| return impl_->WriteTo(dst, encryptor); |
| } |
| |
| class FileCryptoMetaData::FileCryptoMetaDataImpl { |
| public: |
| FileCryptoMetaDataImpl() = default; |
| |
| explicit FileCryptoMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len, |
| const ReaderProperties& properties) { |
| ThriftDeserializer deserializer(properties); |
| deserializer.DeserializeMessage(metadata, metadata_len, &metadata_); |
| metadata_len_ = *metadata_len; |
| } |
| |
| EncryptionAlgorithm encryption_algorithm() const { |
| return FromThrift(metadata_.encryption_algorithm); |
| } |
| |
| const std::string& key_metadata() const { return metadata_.key_metadata; } |
| |
| void WriteTo(::arrow::io::OutputStream* dst) const { |
| ThriftSerializer serializer; |
| serializer.Serialize(&metadata_, dst); |
| } |
| |
| private: |
| friend FileMetaDataBuilder; |
| format::FileCryptoMetaData metadata_; |
| uint32_t metadata_len_; |
| }; |
| |
| EncryptionAlgorithm FileCryptoMetaData::encryption_algorithm() const { |
| return impl_->encryption_algorithm(); |
| } |
| |
| const std::string& FileCryptoMetaData::key_metadata() const { |
| return impl_->key_metadata(); |
| } |
| |
| std::shared_ptr<FileCryptoMetaData> FileCryptoMetaData::Make( |
| const uint8_t* serialized_metadata, uint32_t* metadata_len, |
| const ReaderProperties& properties) { |
| return std::shared_ptr<FileCryptoMetaData>( |
| new FileCryptoMetaData(serialized_metadata, metadata_len, properties)); |
| } |
| |
| FileCryptoMetaData::FileCryptoMetaData(const uint8_t* serialized_metadata, |
| uint32_t* metadata_len, |
| const ReaderProperties& properties) |
| : impl_(new FileCryptoMetaDataImpl(serialized_metadata, metadata_len, properties)) {} |
| |
| FileCryptoMetaData::FileCryptoMetaData() : impl_(new FileCryptoMetaDataImpl()) {} |
| |
| FileCryptoMetaData::~FileCryptoMetaData() = default; |
| |
| void FileCryptoMetaData::WriteTo(::arrow::io::OutputStream* dst) const { |
| impl_->WriteTo(dst); |
| } |
| |
| std::string FileMetaData::SerializeToString() const { |
| // We need to pass in an initial size. Since it will automatically |
| // increase the buffer size to hold the metadata, we just leave it 0. |
| PARQUET_ASSIGN_OR_THROW(auto serializer, ::arrow::io::BufferOutputStream::Create(0)); |
| WriteTo(serializer.get()); |
| PARQUET_ASSIGN_OR_THROW(auto metadata_buffer, serializer->Finish()); |
| return metadata_buffer->ToString(); |
| } |
| |
| ApplicationVersion::ApplicationVersion(std::string application, int major, int minor, |
| int patch) |
| : application_(std::move(application)), version{major, minor, patch, "", "", ""} {} |
| |
| namespace { |
| // Parse the application version format and set parsed values to |
| // ApplicationVersion. |
| // |
| // The application version format must be compatible parquet-mr's |
| // one. See also: |
| // * https://github.com/apache/parquet-mr/blob/master/parquet-common/src/main/java/org/apache/parquet/VersionParser.java |
| // * https://github.com/apache/parquet-mr/blob/master/parquet-common/src/main/java/org/apache/parquet/SemanticVersion.java |
| // |
| // The application version format: |
| // "${APPLICATION_NAME}" |
| // "${APPLICATION_NAME} version ${VERSION}" |
| // "${APPLICATION_NAME} version ${VERSION} (build ${BUILD_NAME})" |
| // |
| // Eg: |
| // parquet-cpp |
| // parquet-cpp version 1.5.0ab-xyz5.5.0+cd |
| // parquet-cpp version 1.5.0ab-xyz5.5.0+cd (build abcd) |
| // |
| // The VERSION format: |
| // "${MAJOR}" |
| // "${MAJOR}.${MINOR}" |
| // "${MAJOR}.${MINOR}.${PATCH}" |
| // "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}" |
| // "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}-${PRE_RELEASE}" |
| // "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}-${PRE_RELEASE}+${BUILD_INFO}" |
| // "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}+${BUILD_INFO}" |
| // "${MAJOR}.${MINOR}.${PATCH}-${PRE_RELEASE}" |
| // "${MAJOR}.${MINOR}.${PATCH}-${PRE_RELEASE}+${BUILD_INFO}" |
| // "${MAJOR}.${MINOR}.${PATCH}+${BUILD_INFO}" |
| // |
| // Eg: |
| // 1 |
| // 1.5 |
| // 1.5.0 |
| // 1.5.0ab |
| // 1.5.0ab-cdh5.5.0 |
| // 1.5.0ab-cdh5.5.0+cd |
| // 1.5.0ab+cd |
| // 1.5.0-cdh5.5.0 |
| // 1.5.0-cdh5.5.0+cd |
| // 1.5.0+cd |
| class ApplicationVersionParser { |
| public: |
| ApplicationVersionParser(const std::string& created_by, |
| ApplicationVersion& application_version) |
| : created_by_(created_by), |
| application_version_(application_version), |
| spaces_(" \t\v\r\n\f"), |
| digits_("0123456789") {} |
| |
| void Parse() { |
| application_version_.application_ = "unknown"; |
| application_version_.version = {0, 0, 0, "", "", ""}; |
| |
| if (!ParseApplicationName()) { |
| return; |
| } |
| if (!ParseVersion()) { |
| return; |
| } |
| if (!ParseBuildName()) { |
| return; |
| } |
| } |
| |
| private: |
| bool IsSpace(const std::string& string, const size_t& offset) { |
| auto target = ::std::string_view(string).substr(offset, 1); |
| return target.find_first_of(spaces_) != ::std::string_view::npos; |
| } |
| |
| void RemovePrecedingSpaces(const std::string& string, size_t& start, |
| const size_t& end) { |
| while (start < end && IsSpace(string, start)) { |
| ++start; |
| } |
| } |
| |
| void RemoveTrailingSpaces(const std::string& string, const size_t& start, size_t& end) { |
| while (start < (end - 1) && (end - 1) < string.size() && IsSpace(string, end - 1)) { |
| --end; |
| } |
| } |
| |
| bool ParseApplicationName() { |
| std::string version_mark(" version "); |
| auto version_mark_position = created_by_.find(version_mark); |
| size_t application_name_end; |
| // No VERSION and BUILD_NAME. |
| if (version_mark_position == std::string::npos) { |
| version_start_ = std::string::npos; |
| application_name_end = created_by_.size(); |
| } else { |
| version_start_ = version_mark_position + version_mark.size(); |
| application_name_end = version_mark_position; |
| } |
| |
| size_t application_name_start = 0; |
| RemovePrecedingSpaces(created_by_, application_name_start, application_name_end); |
| RemoveTrailingSpaces(created_by_, application_name_start, application_name_end); |
| application_version_.application_ = created_by_.substr( |
| application_name_start, application_name_end - application_name_start); |
| |
| return true; |
| } |
| |
| bool ParseVersion() { |
| // No VERSION. |
| if (version_start_ == std::string::npos) { |
| return false; |
| } |
| |
| RemovePrecedingSpaces(created_by_, version_start_, created_by_.size()); |
| version_end_ = created_by_.find(" (", version_start_); |
| // No BUILD_NAME. |
| if (version_end_ == std::string::npos) { |
| version_end_ = created_by_.size(); |
| } |
| RemoveTrailingSpaces(created_by_, version_start_, version_end_); |
| // No VERSION. |
| if (version_start_ == version_end_) { |
| return false; |
| } |
| version_string_ = created_by_.substr(version_start_, version_end_ - version_start_); |
| |
| if (!ParseVersionMajor()) { |
| return false; |
| } |
| if (!ParseVersionMinor()) { |
| return false; |
| } |
| if (!ParseVersionPatch()) { |
| return false; |
| } |
| if (!ParseVersionUnknown()) { |
| return false; |
| } |
| if (!ParseVersionPreRelease()) { |
| return false; |
| } |
| if (!ParseVersionBuildInfo()) { |
| return false; |
| } |
| |
| return true; |
| } |
| |
| bool ParseVersionMajor() { |
| size_t version_major_start = 0; |
| auto version_major_end = version_string_.find_first_not_of(digits_); |
| // MAJOR only. |
| if (version_major_end == std::string::npos) { |
| version_major_end = version_string_.size(); |
| version_parsing_position_ = version_major_end; |
| } else { |
| // No ".". |
| if (version_string_[version_major_end] != '.') { |
| return false; |
| } |
| // No MAJOR. |
| if (version_major_end == version_major_start) { |
| return false; |
| } |
| version_parsing_position_ = version_major_end + 1; // +1 is for '.'. |
| } |
| auto version_major_string = version_string_.substr( |
| version_major_start, version_major_end - version_major_start); |
| application_version_.version.major = atoi(version_major_string.c_str()); |
| return true; |
| } |
| |
| bool ParseVersionMinor() { |
| auto version_minor_start = version_parsing_position_; |
| auto version_minor_end = |
| version_string_.find_first_not_of(digits_, version_minor_start); |
| // MAJOR.MINOR only. |
| if (version_minor_end == std::string::npos) { |
| version_minor_end = version_string_.size(); |
| version_parsing_position_ = version_minor_end; |
| } else { |
| // No ".". |
| if (version_string_[version_minor_end] != '.') { |
| return false; |
| } |
| // No MINOR. |
| if (version_minor_end == version_minor_start) { |
| return false; |
| } |
| version_parsing_position_ = version_minor_end + 1; // +1 is for '.'. |
| } |
| auto version_minor_string = version_string_.substr( |
| version_minor_start, version_minor_end - version_minor_start); |
| application_version_.version.minor = atoi(version_minor_string.c_str()); |
| return true; |
| } |
| |
| bool ParseVersionPatch() { |
| auto version_patch_start = version_parsing_position_; |
| auto version_patch_end = |
| version_string_.find_first_not_of(digits_, version_patch_start); |
| // No UNKNOWN, PRE_RELEASE and BUILD_INFO. |
| if (version_patch_end == std::string::npos) { |
| version_patch_end = version_string_.size(); |
| } |
| // No PATCH. |
| if (version_patch_end == version_patch_start) { |
| return false; |
| } |
| auto version_patch_string = version_string_.substr( |
| version_patch_start, version_patch_end - version_patch_start); |
| application_version_.version.patch = atoi(version_patch_string.c_str()); |
| version_parsing_position_ = version_patch_end; |
| return true; |
| } |
| |
| bool ParseVersionUnknown() { |
| // No UNKNOWN. |
| if (version_parsing_position_ == version_string_.size()) { |
| return true; |
| } |
| auto version_unknown_start = version_parsing_position_; |
| auto version_unknown_end = version_string_.find_first_of("-+", version_unknown_start); |
| // No PRE_RELEASE and BUILD_INFO |
| if (version_unknown_end == std::string::npos) { |
| version_unknown_end = version_string_.size(); |
| } |
| application_version_.version.unknown = version_string_.substr( |
| version_unknown_start, version_unknown_end - version_unknown_start); |
| version_parsing_position_ = version_unknown_end; |
| return true; |
| } |
| |
| bool ParseVersionPreRelease() { |
| // No PRE_RELEASE. |
| if (version_parsing_position_ == version_string_.size() || |
| version_string_[version_parsing_position_] != '-') { |
| return true; |
| } |
| |
| auto version_pre_release_start = version_parsing_position_ + 1; // +1 is for '-'. |
| auto version_pre_release_end = |
| version_string_.find_first_of("+", version_pre_release_start); |
| // No BUILD_INFO |
| if (version_pre_release_end == std::string::npos) { |
| version_pre_release_end = version_string_.size(); |
| } |
| application_version_.version.pre_release = version_string_.substr( |
| version_pre_release_start, version_pre_release_end - version_pre_release_start); |
| version_parsing_position_ = version_pre_release_end; |
| return true; |
| } |
| |
| bool ParseVersionBuildInfo() { |
| // No BUILD_INFO. |
| if (version_parsing_position_ == version_string_.size() || |
| version_string_[version_parsing_position_] != '+') { |
| return true; |
| } |
| |
| auto version_build_info_start = version_parsing_position_ + 1; // +1 is for '+'. |
| application_version_.version.build_info = |
| version_string_.substr(version_build_info_start); |
| return true; |
| } |
| |
| bool ParseBuildName() { |
| std::string build_mark(" (build "); |
| auto build_mark_position = created_by_.find(build_mark, version_end_); |
| // No BUILD_NAME. |
| if (build_mark_position == std::string::npos) { |
| return false; |
| } |
| auto build_name_start = build_mark_position + build_mark.size(); |
| RemovePrecedingSpaces(created_by_, build_name_start, created_by_.size()); |
| auto build_name_end = created_by_.find_first_of(")", build_name_start); |
| // No end ")". |
| if (build_name_end == std::string::npos) { |
| return false; |
| } |
| RemoveTrailingSpaces(created_by_, build_name_start, build_name_end); |
| application_version_.build_ = |
| created_by_.substr(build_name_start, build_name_end - build_name_start); |
| |
| return true; |
| } |
| |
| const std::string& created_by_; |
| ApplicationVersion& application_version_; |
| |
| // For parsing. |
| std::string spaces_; |
| std::string digits_; |
| size_t version_parsing_position_; |
| size_t version_start_; |
| size_t version_end_; |
| std::string version_string_; |
| }; |
| } // namespace |
| |
| ApplicationVersion::ApplicationVersion(const std::string& created_by) { |
| ApplicationVersionParser parser(created_by, *this); |
| parser.Parse(); |
| } |
| |
| bool ApplicationVersion::VersionLt(const ApplicationVersion& other_version) const { |
| if (application_ != other_version.application_) return false; |
| |
| if (version.major < other_version.version.major) return true; |
| if (version.major > other_version.version.major) return false; |
| DCHECK_EQ(version.major, other_version.version.major); |
| if (version.minor < other_version.version.minor) return true; |
| if (version.minor > other_version.version.minor) return false; |
| DCHECK_EQ(version.minor, other_version.version.minor); |
| return version.patch < other_version.version.patch; |
| } |
| |
| bool ApplicationVersion::VersionEq(const ApplicationVersion& other_version) const { |
| return application_ == other_version.application_ && |
| version.major == other_version.version.major && |
| version.minor == other_version.version.minor && |
| version.patch == other_version.version.patch; |
| } |
| |
| // Reference: |
| // parquet-mr/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java |
| // PARQUET-686 has more discussion on statistics |
| bool ApplicationVersion::HasCorrectStatistics(Type::type col_type, |
| EncodedStatistics& statistics, |
| SortOrder::type sort_order) const { |
| // parquet-cpp version 1.3.0 and parquet-mr 1.10.0 onwards stats are computed |
| // correctly for all types |
| if ((application_ == "parquet-cpp" && VersionLt(PARQUET_CPP_FIXED_STATS_VERSION())) || |
| (application_ == "parquet-mr" && VersionLt(PARQUET_MR_FIXED_STATS_VERSION()))) { |
| // Only SIGNED are valid unless max and min are the same |
| // (in which case the sort order does not matter) |
| bool max_equals_min = statistics.has_min && statistics.has_max |
| ? statistics.min() == statistics.max() |
| : false; |
| if (SortOrder::SIGNED != sort_order && !max_equals_min) { |
| return false; |
| } |
| |
| // Statistics of other types are OK |
| if (col_type != Type::FIXED_LEN_BYTE_ARRAY && col_type != Type::BYTE_ARRAY) { |
| return true; |
| } |
| } |
| // created_by is not populated, which could have been caused by |
| // parquet-mr during the same time as PARQUET-251, see PARQUET-297 |
| if (application_ == "unknown") { |
| return true; |
| } |
| |
| // Unknown sort order has incorrect stats |
| if (SortOrder::UNKNOWN == sort_order) { |
| return false; |
| } |
| |
| // PARQUET-251 |
| if (VersionLt(PARQUET_251_FIXED_VERSION())) { |
| return false; |
| } |
| |
| return true; |
| } |
| |
| // MetaData Builders |
| // row-group metadata |
| class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { |
| public: |
| explicit ColumnChunkMetaDataBuilderImpl(std::shared_ptr<WriterProperties> props, |
| const ColumnDescriptor* column) |
| : owned_column_chunk_(new format::ColumnChunk), |
| properties_(std::move(props)), |
| column_(column) { |
| Init(owned_column_chunk_.get()); |
| } |
| |
| explicit ColumnChunkMetaDataBuilderImpl(std::shared_ptr<WriterProperties> props, |
| const ColumnDescriptor* column, |
| format::ColumnChunk* column_chunk) |
| : properties_(std::move(props)), column_(column) { |
| Init(column_chunk); |
| } |
| |
| const void* contents() const { return column_chunk_; } |
| |
| // column chunk |
| void set_file_path(const std::string& val) { column_chunk_->__set_file_path(val); } |
| |
| // column metadata |
| void SetStatistics(const EncodedStatistics& val) { |
| column_chunk_->meta_data.__set_statistics(ToThrift(val)); |
| } |
| |
| void Finish(int64_t num_values, int64_t dictionary_page_offset, |
| int64_t index_page_offset, int64_t data_page_offset, |
| int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary, |
| bool dictionary_fallback, |
| const std::map<Encoding::type, int32_t>& dict_encoding_stats, |
| const std::map<Encoding::type, int32_t>& data_encoding_stats, |
| const std::shared_ptr<Encryptor>& encryptor) { |
| if (dictionary_page_offset > 0) { |
| column_chunk_->meta_data.__set_dictionary_page_offset(dictionary_page_offset); |
| column_chunk_->__set_file_offset(dictionary_page_offset + compressed_size); |
| } else { |
| column_chunk_->__set_file_offset(data_page_offset + compressed_size); |
| } |
| column_chunk_->__isset.meta_data = true; |
| column_chunk_->meta_data.__set_num_values(num_values); |
| if (index_page_offset >= 0) { |
| column_chunk_->meta_data.__set_index_page_offset(index_page_offset); |
| } |
| column_chunk_->meta_data.__set_data_page_offset(data_page_offset); |
| column_chunk_->meta_data.__set_total_uncompressed_size(uncompressed_size); |
| column_chunk_->meta_data.__set_total_compressed_size(compressed_size); |
| |
| std::vector<format::Encoding::type> thrift_encodings; |
| std::vector<format::PageEncodingStats> thrift_encoding_stats; |
| auto add_encoding = [&thrift_encodings](format::Encoding::type value) { |
| auto it = std::find(thrift_encodings.begin(), thrift_encodings.end(), value); |
| if (it == thrift_encodings.end()) { |
| thrift_encodings.push_back(value); |
| } |
| }; |
| // Add dictionary page encoding stats |
| if (has_dictionary) { |
| for (const auto& entry : dict_encoding_stats) { |
| format::PageEncodingStats dict_enc_stat; |
| dict_enc_stat.__set_page_type(format::PageType::DICTIONARY_PAGE); |
| // Dictionary Encoding would be PLAIN_DICTIONARY in v1 and |
| // PLAIN in v2. |
| format::Encoding::type dict_encoding = ToThrift(entry.first); |
| dict_enc_stat.__set_encoding(dict_encoding); |
| dict_enc_stat.__set_count(entry.second); |
| thrift_encoding_stats.push_back(dict_enc_stat); |
| add_encoding(dict_encoding); |
| } |
| } |
| // Always add encoding for RL/DL. |
| // BIT_PACKED is supported in `LevelEncoder`, but would only be used |
| // in benchmark and testing. |
| // And for now, we always add RLE even if there are no levels at all, |
| // while parquet-mr is more fine-grained. |
| add_encoding(format::Encoding::RLE); |
| // Add data page encoding stats |
| for (const auto& entry : data_encoding_stats) { |
| format::PageEncodingStats data_enc_stat; |
| data_enc_stat.__set_page_type(format::PageType::DATA_PAGE); |
| format::Encoding::type data_encoding = ToThrift(entry.first); |
| data_enc_stat.__set_encoding(data_encoding); |
| data_enc_stat.__set_count(entry.second); |
| thrift_encoding_stats.push_back(data_enc_stat); |
| add_encoding(data_encoding); |
| } |
| column_chunk_->meta_data.__set_encodings(thrift_encodings); |
| column_chunk_->meta_data.__set_encoding_stats(thrift_encoding_stats); |
| |
| const auto& encrypt_md = |
| properties_->column_encryption_properties(column_->path()->ToDotString()); |
| // column is encrypted |
| if (encrypt_md != nullptr && encrypt_md->is_encrypted()) { |
| column_chunk_->__isset.crypto_metadata = true; |
| format::ColumnCryptoMetaData ccmd; |
| if (encrypt_md->is_encrypted_with_footer_key()) { |
| // encrypted with footer key |
| ccmd.__isset.ENCRYPTION_WITH_FOOTER_KEY = true; |
| ccmd.__set_ENCRYPTION_WITH_FOOTER_KEY(format::EncryptionWithFooterKey()); |
| } else { // encrypted with column key |
| format::EncryptionWithColumnKey eck; |
| eck.__set_key_metadata(encrypt_md->key_metadata()); |
| eck.__set_path_in_schema(column_->path()->ToDotVector()); |
| ccmd.__isset.ENCRYPTION_WITH_COLUMN_KEY = true; |
| ccmd.__set_ENCRYPTION_WITH_COLUMN_KEY(eck); |
| } |
| column_chunk_->__set_crypto_metadata(ccmd); |
| |
| bool encrypted_footer = |
| properties_->file_encryption_properties()->encrypted_footer(); |
| bool encrypt_metadata = |
| !encrypted_footer || !encrypt_md->is_encrypted_with_footer_key(); |
| if (encrypt_metadata) { |
| ThriftSerializer serializer; |
| // Serialize and encrypt ColumnMetadata separately |
| // Thrift-serialize the ColumnMetaData structure, |
| // encrypt it with the column key, and write to encrypted_column_metadata |
| uint8_t* serialized_data; |
| uint32_t serialized_len; |
| |
| serializer.SerializeToBuffer(&column_chunk_->meta_data, &serialized_len, |
| &serialized_data); |
| |
| std::vector<uint8_t> encrypted_data(encryptor->CiphertextSizeDelta() + |
| serialized_len); |
| unsigned encrypted_len = |
| encryptor->Encrypt(serialized_data, serialized_len, encrypted_data.data()); |
| |
| const char* temp = |
| const_cast<const char*>(reinterpret_cast<char*>(encrypted_data.data())); |
| std::string encrypted_column_metadata(temp, encrypted_len); |
| column_chunk_->__set_encrypted_column_metadata(encrypted_column_metadata); |
| |
| if (encrypted_footer) { |
| column_chunk_->__isset.meta_data = false; |
| } else { |
| // Keep redacted metadata version for old readers |
| column_chunk_->__isset.meta_data = true; |
| column_chunk_->meta_data.__isset.statistics = false; |
| column_chunk_->meta_data.__isset.encoding_stats = false; |
| } |
| } |
| } |
| } |
| |
| void WriteTo(::arrow::io::OutputStream* sink) { |
| ThriftSerializer serializer; |
| serializer.Serialize(column_chunk_, sink); |
| } |
| |
| const ColumnDescriptor* descr() const { return column_; } |
| int64_t total_compressed_size() const { |
| return column_chunk_->meta_data.total_compressed_size; |
| } |
| |
| private: |
| void Init(format::ColumnChunk* column_chunk) { |
| column_chunk_ = column_chunk; |
| |
| column_chunk_->meta_data.__set_type(ToThrift(column_->physical_type())); |
| column_chunk_->meta_data.__set_path_in_schema(column_->path()->ToDotVector()); |
| column_chunk_->meta_data.__set_codec( |
| ToThrift(properties_->compression(column_->path()))); |
| } |
| |
| format::ColumnChunk* column_chunk_; |
| std::unique_ptr<format::ColumnChunk> owned_column_chunk_; |
| const std::shared_ptr<WriterProperties> properties_; |
| const ColumnDescriptor* column_; |
| }; |
| |
| std::unique_ptr<ColumnChunkMetaDataBuilder> ColumnChunkMetaDataBuilder::Make( |
| std::shared_ptr<WriterProperties> props, const ColumnDescriptor* column, |
| void* contents) { |
| return std::unique_ptr<ColumnChunkMetaDataBuilder>( |
| new ColumnChunkMetaDataBuilder(std::move(props), column, contents)); |
| } |
| |
| std::unique_ptr<ColumnChunkMetaDataBuilder> ColumnChunkMetaDataBuilder::Make( |
| std::shared_ptr<WriterProperties> props, const ColumnDescriptor* column) { |
| return std::unique_ptr<ColumnChunkMetaDataBuilder>( |
| new ColumnChunkMetaDataBuilder(std::move(props), column)); |
| } |
| |
| ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilder( |
| std::shared_ptr<WriterProperties> props, const ColumnDescriptor* column) |
| : impl_{std::unique_ptr<ColumnChunkMetaDataBuilderImpl>( |
| new ColumnChunkMetaDataBuilderImpl(std::move(props), column))} {} |
| |
| ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilder( |
| std::shared_ptr<WriterProperties> props, const ColumnDescriptor* column, |
| void* contents) |
| : impl_{std::unique_ptr<ColumnChunkMetaDataBuilderImpl>( |
| new ColumnChunkMetaDataBuilderImpl( |
| std::move(props), column, |
| reinterpret_cast<format::ColumnChunk*>(contents)))} {} |
| |
| ColumnChunkMetaDataBuilder::~ColumnChunkMetaDataBuilder() = default; |
| |
| const void* ColumnChunkMetaDataBuilder::contents() const { return impl_->contents(); } |
| |
| void ColumnChunkMetaDataBuilder::set_file_path(const std::string& path) { |
| impl_->set_file_path(path); |
| } |
| |
| void ColumnChunkMetaDataBuilder::Finish( |
| int64_t num_values, int64_t dictionary_page_offset, int64_t index_page_offset, |
| int64_t data_page_offset, int64_t compressed_size, int64_t uncompressed_size, |
| bool has_dictionary, bool dictionary_fallback, |
| const std::map<Encoding::type, int32_t>& dict_encoding_stats, |
| const std::map<Encoding::type, int32_t>& data_encoding_stats, |
| const std::shared_ptr<Encryptor>& encryptor) { |
| impl_->Finish(num_values, dictionary_page_offset, index_page_offset, data_page_offset, |
| compressed_size, uncompressed_size, has_dictionary, dictionary_fallback, |
| dict_encoding_stats, data_encoding_stats, encryptor); |
| } |
| |
| void ColumnChunkMetaDataBuilder::WriteTo(::arrow::io::OutputStream* sink) { |
| impl_->WriteTo(sink); |
| } |
| |
| const ColumnDescriptor* ColumnChunkMetaDataBuilder::descr() const { |
| return impl_->descr(); |
| } |
| |
| void ColumnChunkMetaDataBuilder::SetStatistics(const EncodedStatistics& result) { |
| impl_->SetStatistics(result); |
| } |
| |
| int64_t ColumnChunkMetaDataBuilder::total_compressed_size() const { |
| return impl_->total_compressed_size(); |
| } |
| |
| class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl { |
| public: |
| explicit RowGroupMetaDataBuilderImpl(std::shared_ptr<WriterProperties> props, |
| const SchemaDescriptor* schema, void* contents) |
| : properties_(std::move(props)), schema_(schema), next_column_(0) { |
| row_group_ = reinterpret_cast<format::RowGroup*>(contents); |
| InitializeColumns(schema->num_columns()); |
| } |
| |
| ColumnChunkMetaDataBuilder* NextColumnChunk() { |
| if (!(next_column_ < num_columns())) { |
| std::stringstream ss; |
| ss << "The schema only has " << num_columns() |
| << " columns, requested metadata for column: " << next_column_; |
| throw ParquetException(ss.str()); |
| } |
| auto column = schema_->Column(next_column_); |
| auto column_builder = ColumnChunkMetaDataBuilder::Make( |
| properties_, column, &row_group_->columns[next_column_++]); |
| auto column_builder_ptr = column_builder.get(); |
| column_builders_.push_back(std::move(column_builder)); |
| return column_builder_ptr; |
| } |
| |
| int current_column() { return next_column_ - 1; } |
| |
| void Finish(int64_t total_bytes_written, int16_t row_group_ordinal) { |
| if (!(next_column_ == schema_->num_columns())) { |
| std::stringstream ss; |
| ss << "Only " << next_column_ - 1 << " out of " << schema_->num_columns() |
| << " columns are initialized"; |
| throw ParquetException(ss.str()); |
| } |
| |
| int64_t file_offset = 0; |
| int64_t total_compressed_size = 0; |
| for (int i = 0; i < schema_->num_columns(); i++) { |
| if (!(row_group_->columns[i].file_offset >= 0)) { |
| std::stringstream ss; |
| ss << "Column " << i << " is not complete."; |
| throw ParquetException(ss.str()); |
| } |
| if (i == 0) { |
| const format::ColumnMetaData& first_col = row_group_->columns[0].meta_data; |
| // As per spec, file_offset for the row group points to the first |
| // dictionary or data page of the column. |
| if (first_col.__isset.dictionary_page_offset && |
| first_col.dictionary_page_offset > 0) { |
| file_offset = first_col.dictionary_page_offset; |
| } else { |
| file_offset = first_col.data_page_offset; |
| } |
| } |
| // sometimes column metadata is encrypted and not available to read, |
| // so we must get total_compressed_size from column builder |
| total_compressed_size += column_builders_[i]->total_compressed_size(); |
| } |
| |
| const auto& sorting_columns = properties_->sorting_columns(); |
| if (!sorting_columns.empty()) { |
| std::vector<format::SortingColumn> thrift_sorting_columns(sorting_columns.size()); |
| for (size_t i = 0; i < sorting_columns.size(); ++i) { |
| thrift_sorting_columns[i] = ToThrift(sorting_columns[i]); |
| } |
| row_group_->__set_sorting_columns(std::move(thrift_sorting_columns)); |
| } |
| |
| row_group_->__set_file_offset(file_offset); |
| row_group_->__set_total_compressed_size(total_compressed_size); |
| row_group_->__set_total_byte_size(total_bytes_written); |
| row_group_->__set_ordinal(row_group_ordinal); |
| } |
| |
| void set_num_rows(int64_t num_rows) { row_group_->num_rows = num_rows; } |
| |
| int num_columns() { return static_cast<int>(row_group_->columns.size()); } |
| |
| int64_t num_rows() { return row_group_->num_rows; } |
| |
| private: |
| void InitializeColumns(int ncols) { row_group_->columns.resize(ncols); } |
| |
| format::RowGroup* row_group_; |
| const std::shared_ptr<WriterProperties> properties_; |
| const SchemaDescriptor* schema_; |
| std::vector<std::unique_ptr<ColumnChunkMetaDataBuilder>> column_builders_; |
| int next_column_; |
| }; |
| |
| std::unique_ptr<RowGroupMetaDataBuilder> RowGroupMetaDataBuilder::Make( |
| std::shared_ptr<WriterProperties> props, const SchemaDescriptor* schema_, |
| void* contents) { |
| return std::unique_ptr<RowGroupMetaDataBuilder>( |
| new RowGroupMetaDataBuilder(std::move(props), schema_, contents)); |
| } |
| |
| RowGroupMetaDataBuilder::RowGroupMetaDataBuilder(std::shared_ptr<WriterProperties> props, |
| const SchemaDescriptor* schema_, |
| void* contents) |
| : impl_{new RowGroupMetaDataBuilderImpl(std::move(props), schema_, contents)} {} |
| |
| RowGroupMetaDataBuilder::~RowGroupMetaDataBuilder() = default; |
| |
| ColumnChunkMetaDataBuilder* RowGroupMetaDataBuilder::NextColumnChunk() { |
| return impl_->NextColumnChunk(); |
| } |
| |
| int RowGroupMetaDataBuilder::current_column() const { return impl_->current_column(); } |
| |
| int RowGroupMetaDataBuilder::num_columns() { return impl_->num_columns(); } |
| |
| int64_t RowGroupMetaDataBuilder::num_rows() { return impl_->num_rows(); } |
| |
| void RowGroupMetaDataBuilder::set_num_rows(int64_t num_rows) { |
| impl_->set_num_rows(num_rows); |
| } |
| |
| void RowGroupMetaDataBuilder::Finish(int64_t total_bytes_written, |
| int16_t row_group_ordinal) { |
| impl_->Finish(total_bytes_written, row_group_ordinal); |
| } |
| |
| // file metadata |
| class FileMetaDataBuilder::FileMetaDataBuilderImpl { |
| public: |
| explicit FileMetaDataBuilderImpl( |
| const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props, |
| std::shared_ptr<const KeyValueMetadata> key_value_metadata) |
| : metadata_(new format::FileMetaData()), |
| properties_(std::move(props)), |
| schema_(schema), |
| key_value_metadata_(std::move(key_value_metadata)) { |
| if (properties_->file_encryption_properties() != nullptr && |
| properties_->file_encryption_properties()->encrypted_footer()) { |
| crypto_metadata_.reset(new format::FileCryptoMetaData()); |
| } |
| } |
| |
| RowGroupMetaDataBuilder* AppendRowGroup() { |
| row_groups_.emplace_back(); |
| current_row_group_builder_ = |
| RowGroupMetaDataBuilder::Make(properties_, schema_, &row_groups_.back()); |
| return current_row_group_builder_.get(); |
| } |
| |
| void SetPageIndexLocation(const PageIndexLocation& location) { |
| auto set_index_location = |
| [this](size_t row_group_ordinal, |
| const PageIndexLocation::FileIndexLocation& file_index_location, |
| bool column_index) { |
| auto& row_group_metadata = this->row_groups_.at(row_group_ordinal); |
| auto iter = file_index_location.find(row_group_ordinal); |
| if (iter != file_index_location.cend()) { |
| const auto& row_group_index_location = iter->second; |
| for (size_t i = 0; i < row_group_index_location.size(); ++i) { |
| if (i >= row_group_metadata.columns.size()) { |
| throw ParquetException("Cannot find metadata for column ordinal ", i); |
| } |
| auto& column_metadata = row_group_metadata.columns.at(i); |
| const auto& index_location = row_group_index_location.at(i); |
| if (index_location.has_value()) { |
| if (column_index) { |
| column_metadata.__set_column_index_offset(index_location->offset); |
| column_metadata.__set_column_index_length(index_location->length); |
| } else { |
| column_metadata.__set_offset_index_offset(index_location->offset); |
| column_metadata.__set_offset_index_length(index_location->length); |
| } |
| } |
| } |
| } |
| }; |
| |
| for (size_t i = 0; i < row_groups_.size(); ++i) { |
| set_index_location(i, location.column_index_location, true); |
| set_index_location(i, location.offset_index_location, false); |
| } |
| } |
| |
| std::unique_ptr<FileMetaData> Finish( |
| const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) { |
| int64_t total_rows = 0; |
| for (auto row_group : row_groups_) { |
| total_rows += row_group.num_rows; |
| } |
| metadata_->__set_num_rows(total_rows); |
| metadata_->__set_row_groups(row_groups_); |
| |
| if (key_value_metadata_ || key_value_metadata) { |
| if (!key_value_metadata_) { |
| key_value_metadata_ = key_value_metadata; |
| } else if (key_value_metadata) { |
| key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata); |
| } |
| metadata_->key_value_metadata.clear(); |
| metadata_->key_value_metadata.reserve(key_value_metadata_->size()); |
| for (int64_t i = 0; i < key_value_metadata_->size(); ++i) { |
| format::KeyValue kv_pair; |
| kv_pair.__set_key(key_value_metadata_->key(i)); |
| kv_pair.__set_value(key_value_metadata_->value(i)); |
| metadata_->key_value_metadata.push_back(kv_pair); |
| } |
| metadata_->__isset.key_value_metadata = true; |
| } |
| |
| int32_t file_version = 0; |
| switch (properties_->version()) { |
| case ParquetVersion::PARQUET_1_0: |
| file_version = 1; |
| break; |
| default: |
| file_version = 2; |
| break; |
| } |
| metadata_->__set_version(file_version); |
| metadata_->__set_created_by(properties_->created_by()); |
| |
| // Users cannot set the `ColumnOrder` since we do not have user defined sort order |
| // in the spec yet. |
| // We always default to `TYPE_DEFINED_ORDER`. We can expose it in |
| // the API once we have user defined sort orders in the Parquet format. |
| // TypeDefinedOrder implies choose SortOrder based on ConvertedType/PhysicalType |
| format::TypeDefinedOrder type_defined_order; |
| format::ColumnOrder column_order; |
| column_order.__set_TYPE_ORDER(type_defined_order); |
| column_order.__isset.TYPE_ORDER = true; |
| metadata_->column_orders.resize(schema_->num_columns(), column_order); |
| metadata_->__isset.column_orders = true; |
| |
| // if plaintext footer, set footer signing algorithm |
| auto file_encryption_properties = properties_->file_encryption_properties(); |
| if (file_encryption_properties && !file_encryption_properties->encrypted_footer()) { |
| EncryptionAlgorithm signing_algorithm; |
| EncryptionAlgorithm algo = file_encryption_properties->algorithm(); |
| signing_algorithm.aad.aad_file_unique = algo.aad.aad_file_unique; |
| signing_algorithm.aad.supply_aad_prefix = algo.aad.supply_aad_prefix; |
| if (!algo.aad.supply_aad_prefix) { |
| signing_algorithm.aad.aad_prefix = algo.aad.aad_prefix; |
| } |
| signing_algorithm.algorithm = ParquetCipher::AES_GCM_V1; |
| |
| metadata_->__set_encryption_algorithm(ToThrift(signing_algorithm)); |
| const std::string& footer_signing_key_metadata = |
| file_encryption_properties->footer_key_metadata(); |
| if (footer_signing_key_metadata.size() > 0) { |
| metadata_->__set_footer_signing_key_metadata(footer_signing_key_metadata); |
| } |
| } |
| |
| ToParquet(static_cast<parquet::schema::GroupNode*>(schema_->schema_root().get()), |
| &metadata_->schema); |
| auto file_meta_data = std::unique_ptr<FileMetaData>(new FileMetaData()); |
| file_meta_data->impl_->metadata_ = std::move(metadata_); |
| file_meta_data->impl_->InitSchema(); |
| file_meta_data->impl_->InitKeyValueMetadata(); |
| return file_meta_data; |
| } |
| |
| std::unique_ptr<FileCryptoMetaData> BuildFileCryptoMetaData() { |
| if (crypto_metadata_ == nullptr) { |
| return nullptr; |
| } |
| |
| auto file_encryption_properties = properties_->file_encryption_properties(); |
| |
| crypto_metadata_->__set_encryption_algorithm( |
| ToThrift(file_encryption_properties->algorithm())); |
| std::string key_metadata = file_encryption_properties->footer_key_metadata(); |
| |
| if (!key_metadata.empty()) { |
| crypto_metadata_->__set_key_metadata(key_metadata); |
| } |
| |
| std::unique_ptr<FileCryptoMetaData> file_crypto_metadata(new FileCryptoMetaData()); |
| file_crypto_metadata->impl_->metadata_ = std::move(*crypto_metadata_); |
| return file_crypto_metadata; |
| } |
| |
| protected: |
| std::unique_ptr<format::FileMetaData> metadata_; |
| std::unique_ptr<format::FileCryptoMetaData> crypto_metadata_; |
| |
| private: |
| const std::shared_ptr<WriterProperties> properties_; |
| std::vector<format::RowGroup> row_groups_; |
| |
| std::unique_ptr<RowGroupMetaDataBuilder> current_row_group_builder_; |
| const SchemaDescriptor* schema_; |
| std::shared_ptr<const KeyValueMetadata> key_value_metadata_; |
| }; |
| |
| std::unique_ptr<FileMetaDataBuilder> FileMetaDataBuilder::Make( |
| const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props, |
| std::shared_ptr<const KeyValueMetadata> key_value_metadata) { |
| return std::unique_ptr<FileMetaDataBuilder>( |
| new FileMetaDataBuilder(schema, std::move(props), std::move(key_value_metadata))); |
| } |
| |
| std::unique_ptr<FileMetaDataBuilder> FileMetaDataBuilder::Make( |
| const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props) { |
| return std::unique_ptr<FileMetaDataBuilder>( |
| new FileMetaDataBuilder(schema, std::move(props))); |
| } |
| |
| FileMetaDataBuilder::FileMetaDataBuilder( |
| const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props, |
| std::shared_ptr<const KeyValueMetadata> key_value_metadata) |
| : impl_{std::unique_ptr<FileMetaDataBuilderImpl>(new FileMetaDataBuilderImpl( |
| schema, std::move(props), std::move(key_value_metadata)))} {} |
| |
| FileMetaDataBuilder::~FileMetaDataBuilder() = default; |
| |
| RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup() { |
| return impl_->AppendRowGroup(); |
| } |
| |
| void FileMetaDataBuilder::SetPageIndexLocation(const PageIndexLocation& location) { |
| impl_->SetPageIndexLocation(location); |
| } |
| |
| std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish( |
| const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) { |
| return impl_->Finish(key_value_metadata); |
| } |
| |
| std::unique_ptr<FileCryptoMetaData> FileMetaDataBuilder::GetCryptoMetaData() { |
| return impl_->BuildFileCryptoMetaData(); |
| } |
| |
| } // namespace parquet |